上一篇
Flink CDC里flink1.17写doris的代码怎么做?
- 行业动态
- 2024-05-03
- 2
在Flink CDC中,将数据从Flink 1.17写入Doris,需要遵循以下步骤:
1、添加依赖
在项目的pom.xml文件中添加Flink CDC和Doris的依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flinkconnectordoris_2.11</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flinkconnectormysqlcdc</artifactId> <version>2.1.0</version> </dependency>
2、创建Flink CDC Source
创建一个Flink CDC Source,用于从MySQL数据库中读取数据变更事件:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; public class FlinkCDCSourceExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("mydb") // 监听的数据库名 .tableList("mydb.mytable") // 监听的表名 .username("root") .password("password") .deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式 .build(); env.addSource(sourceFunction).print(); env.execute("Flink CDC Example"); } }
3、创建Doris Sink
创建一个Doris Sink,用于将数据写入Doris数据库:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.doris.DorisSink; import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions; import org.apache.flink.types.Row; public class DorisSinkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 假设从Flink CDC Source获取的数据流为dataStream DataStream<Row> dataStream = ...; DorisSink<Row> dorisSink = DorisSink.builder() .setDorisTable("mydb.mytable") // Doris表名 .setUsername("root") .setPassword("password") .setFenodes("localhost:8030") // Doris FE节点地址 .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加载属性 .build(); dataStream.addSink(dorisSink); env.execute("Doris Sink Example"); } }
4、整合Flink CDC Source和Doris Sink
将Flink CDC Source和Doris Sink整合到一起,实现从MySQL数据库到Doris数据库的数据同步:
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.doris.DorisSink; import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions; import org.apache.flink.types.Row; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; public class FlinkCDCToDorisExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("mydb") // 监听的数据库名 .tableList("mydb.mytable") // 监听的表名 .username("root") .password("password") .deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式 .build(); DataStream<String> dataStream = env.addSource(sourceFunction); // 将数据流转换为Row类型,以便写入Doris DataStream<Row> rowDataStream = dataStream.map(json > { JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject(); String before = jsonObject.get("before").getAsString(); String after = jsonObject.get("after").getAsString(); return Row.of(before, after); }).returns(new RowTypeInfo(Types.STRING, Types.STRING)); DorisSink<Row> dorisSink = DorisSink.builder() .setDorisTable("mydb.mytable") // Doris表名 .setUsername("root") .setPassword("password") .setFenodes("localhost:8030") // Doris FE节点地址 .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加载属性 .build(); rowDataStream.addSink(dorisSink); env.execute("Flink CDC to Doris Example"); } }
这样,就完成了使用Flink CDC将数据从MySQL数据库同步到Doris数据库的过程。
本站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本站,有问题联系侵删!
本文链接:http://www.xixizhuji.com/fuzhu/206989.html