如何利用Flink实现从MySQL批量读取数据并写入Elasticsearch?

小贝
预计阅读时长 36 分钟
位置: 首页 小红书 正文

Flink 批量读取 MySQL 写入 ES(Elasticsearch)

flink批量读取mysql写入es

在大数据处理和实时分析领域,Apache Flink 是一个非常强大的工具,它可以从多种数据源中提取数据,进行实时处理,并将结果输出到不同的存储系统中,本文将介绍如何使用 Flink 从 MySQL 数据库中批量读取数据,并将其写入到 Elasticsearch(ES)。

1. 环境准备

确保你已经安装了以下软件:

Java Development Kit (JDK)

Apache Flink

MySQL

Elasticsearch

flink批量读取mysql写入es

还需要相应的连接器库:

flink-connector-jdbc

flink-connector-elasticsearch7

2. Maven 依赖配置

在你的 Flink 项目的pom.xml 文件中添加必要的依赖:

<dependencies>
    <!-Flink 核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-Flink JDBC 连接器 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <!-Flink Elasticsearch 连接器 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

3. Flink 程序实现

3.1 创建 Flink 流执行环境

flink批量读取mysql写入es
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
public class FlinkMySQLToES {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度为1
        env.setParallelism(1);
        // 调用方法进行数据处理
        process(env);
    }
}

3.2 定义数据源与数据接收器

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.types.Row;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class FlinkMySQLToES {
    private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/your_database";
    private static final String MYSQL_USERNAME = "your_username";
    private static final String MYSQL_PASSWORD = "your_password";
    private static final String ELASTICSEARCH_HOST = "localhost";
    private static final int ELASTICSEARCH_PORT = 9200;
    private static final String ELASTICSEARCH_INDEX = "your_index";
    private static final String ELASTICSEARCH_TYPE = "_doc";
    private static final String ELASTICSEARCH_ID_FIELD = "id";
    private static RestHighLevelClient createEsClient() {
        Settings settings = Settings.builder()
                .put("cluster.name", "my-cluster")
                .build();
        return new PreBuiltTransportClient(settings, new TransportAddress(TransportAddress.parse("localhost:9300")));
    }
    private static DataStream<Row> readFromMySQL(StreamExecutionEnvironment env) {
        Properties properties = new Properties();
        properties.setProperty("driver", "com.mysql.jdbc.Driver");
        properties.setProperty("url", MYSQL_URL);
        properties.setProperty("username", MYSQL_USERNAME);
        properties.setProperty("password", MYSQL_PASSWORD);
        properties.setProperty("fetchSize", "5000"); // 每次获取的记录数
        JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl(MYSQL_URL)
                .setUsername(MYSQL_USERNAME)
                .setPassword(MYSQL_PASSWORD)
                .setQuery("SELECT * FROM your_table")
                .setRowTypeInfo(new RowSqlType(new String[]{"id", "name", "value"}, new TypeInformation[]{Types.INT, Types.STRING, Types.STRING}))
                .finish();
        return env.createInput(jdbcInputFormat);
    }
    private static void writeToElasticSearch(DataStream<Row> dataStream, StreamExecutionEnvironment env) {
        RestHighLevelClient esClient = createEsClient();
        List<HttpHost> hosts = new ArrayList<>();
        hosts.add(new HttpHost(ELASTICSEARCH_HOST, ELASTICSEARCH_PORT, "http"));
        ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, new ElasticsearchSinkFunction<Row>() {
            public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
                Map<String, Object> jsonMap = new HashMap<>();
                jsonMap.put("id", element.getField(0)); // id field
                jsonMap.put("name", element.getField(1)); // name field
                jsonMap.put("value", element.getField(2)); // value field
                indexer.add(Requests.indexRequest()
                        .index(ELASTICSEARCH_INDEX)
                        .type(ELASTICSEARCH_TYPE)
                        .source(jsonMap, XContentType.JSON));
            }
        });
        esSinkBuilder.setBulkFlushMaxActions(1); // 每条记录立即刷新到ES
        dataStream.addSink(esSinkBuilder.build());
    }
}

3.3 主程序入口

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.elasticsearch.client.RestHighLevelClient;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.elasticsearch.client.Requests;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.*;
import org.apache.http.HttpHost;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.*;
import org.elasticsearch.*;
import org.elasticsearch.*;
import org.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java.*;
import java();]]]--}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}$$

小伙伴们,上文介绍了“flink批量读取mysql写入es”的内容,你了解清楚吗?希望对你有所帮助,任何问题可以给我留言,让我们下期再见吧。

-- 展开阅读全文 --
头像
如何有效地存储与读取对象数组?
« 上一篇 2024-12-13
分布式存储如何走近5G应用时代?
下一篇 » 2024-12-13
取消
微信二维码
支付宝二维码

发表评论

暂无评论,1人围观

目录[+]