如何利用Flink实现从MySQL批量读取数据并写入Elasticsearch?
Flink 批量读取 MySQL 写入 ES(Elasticsearch)
在大数据处理和实时分析领域,Apache Flink 是一个非常强大的工具,它可以从多种数据源中提取数据,进行实时处理,并将结果输出到不同的存储系统中,本文将介绍如何使用 Flink 从 MySQL 数据库中批量读取数据,并将其写入到 Elasticsearch(ES)。
1. 环境准备
确保你已经安装了以下软件:
Java Development Kit (JDK)
Apache Flink
MySQL
Elasticsearch
还需要相应的连接器库:
flink-connector-jdbc
flink-connector-elasticsearch7
2. Maven 依赖配置
在你的 Flink 项目的pom.xml
文件中添加必要的依赖:
<dependencies> <!-Flink 核心依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.0</version> </dependency> <!-Flink JDBC 连接器 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.14.0</version> </dependency> <!-Flink Elasticsearch 连接器 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.12</artifactId> <version>1.14.0</version> </dependency> </dependencies>
3. Flink 程序实现
3.1 创建 Flink 流执行环境
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; public class FlinkMySQLToES { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度为1 env.setParallelism(1); // 调用方法进行数据处理 process(env); } }
3.2 定义数据源与数据接收器
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.jdbc.JdbcInputFormat; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; import org.apache.flink.types.Row; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.elasticsearch.common.xcontent.XContentType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class FlinkMySQLToES { private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/your_database"; private static final String MYSQL_USERNAME = "your_username"; private static final String MYSQL_PASSWORD = "your_password"; private static final String ELASTICSEARCH_HOST = "localhost"; private static final int ELASTICSEARCH_PORT = 9200; private static final String ELASTICSEARCH_INDEX = "your_index"; private static final String ELASTICSEARCH_TYPE = "_doc"; private static final String ELASTICSEARCH_ID_FIELD = "id"; private static RestHighLevelClient createEsClient() { Settings settings = Settings.builder() .put("cluster.name", "my-cluster") .build(); return new PreBuiltTransportClient(settings, new TransportAddress(TransportAddress.parse("localhost:9300"))); } private static DataStream<Row> readFromMySQL(StreamExecutionEnvironment env) { Properties properties = new Properties(); properties.setProperty("driver", "com.mysql.jdbc.Driver"); properties.setProperty("url", MYSQL_URL); properties.setProperty("username", MYSQL_USERNAME); properties.setProperty("password", MYSQL_PASSWORD); properties.setProperty("fetchSize", "5000"); // 每次获取的记录数 JdbcInputFormat jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(MYSQL_URL) .setUsername(MYSQL_USERNAME) .setPassword(MYSQL_PASSWORD) .setQuery("SELECT * FROM your_table") .setRowTypeInfo(new RowSqlType(new String[]{"id", "name", "value"}, new TypeInformation[]{Types.INT, Types.STRING, Types.STRING})) .finish(); return env.createInput(jdbcInputFormat); } private static void writeToElasticSearch(DataStream<Row> dataStream, StreamExecutionEnvironment env) { RestHighLevelClient esClient = createEsClient(); List<HttpHost> hosts = new ArrayList<>(); hosts.add(new HttpHost(ELASTICSEARCH_HOST, ELASTICSEARCH_PORT, "http")); ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>(hosts, new ElasticsearchSinkFunction<Row>() { public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) { Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("id", element.getField(0)); // id field jsonMap.put("name", element.getField(1)); // name field jsonMap.put("value", element.getField(2)); // value field indexer.add(Requests.indexRequest() .index(ELASTICSEARCH_INDEX) .type(ELASTICSEARCH_TYPE) .source(jsonMap, XContentType.JSON)); } }); esSinkBuilder.setBulkFlushMaxActions(1); // 每条记录立即刷新到ES dataStream.addSink(esSinkBuilder.build()); } }
3.3 主程序入口
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.elasticsearch.client.RestHighLevelClient; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Properties; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.elasticsearch.client.Requests; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.*; import org.apache.http.HttpHost; import org.elasticsearch.client.indices.*; import org.elasticsearch.common.*; import org.elasticsearch.*; import org.elasticsearch.*; import org.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java.*; import java();]]]--}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}$$
小伙伴们,上文介绍了“flink批量读取mysql写入es”的内容,你了解清楚吗?希望对你有所帮助,任何问题可以给我留言,让我们下期再见吧。
-- 展开阅读全文 --
暂无评论,1人围观