FlinkCDC消费MySQL Binlog 并推送到Kafka(一)
目录
FlinkCDC消费MySQL Binlog 并推送到Kafka(一)
环境:Flink Version: 1.19.0
容器:Docker,Flink:latest
依赖Refer:Docker快速拉起Flink集群 、MySQL服务并打开GTID
一、下载以下三个CDC依赖到Flink lib目录
1 2 3 4 5 |
# 下载地址:https://repo1.maven.org/maven2/org/apache/flink/ flink-sql-connector-kafka-3.1.0-1.18.jar flink-sql-connector-mysql-cdc-3.0.1.jar flink-cdc-dist-3.0.1.jar |
二、复制依赖到各个Flink容器 /opt/flink/lib目录
1 2 3 4 5 6 7 8 9 10 11 12 |
docker cp flink-sql-connector-kafka-3.1.0-1.18.jar flink-taskmanager1:/opt/flink/lib/ docker cp flink-sql-connector-kafka-3.1.0-1.18.jar flink-taskmanager2:/opt/flink/lib/ docker cp flink-sql-connector-kafka-3.1.0-1.18.jar flink-taskmanager3:/opt/flink/lib/ docker cp flink-sql-connector-kafka-3.1.0-1.18.jar flink-jobmanager:/opt/flink/lib/ docker cp flink-sql-connector-mysql-cdc-3.0.1.jar flink-taskmanager1:/opt/flink/lib/ docker cp flink-sql-connector-mysql-cdc-3.0.1.jar flink-taskmanager2:/opt/flink/lib/ docker cp flink-sql-connector-mysql-cdc-3.0.1.jar flink-taskmanager3:/opt/flink/lib/ docker cp flink-sql-connector-mysql-cdc-3.0.1.jar flink-jobmanager:/opt/flink/lib/ docker cp flink-cdc-dist-3.0.1.jar flink-taskmanager1:/opt/flink/lib/ docker cp flink-cdc-dist-3.0.1.jar flink-taskmanager2:/opt/flink/lib/ docker cp flink-cdc-dist-3.0.1.jar flink-taskmanager3:/opt/flink/lib/ docker cp flink-cdc-dist-3.0.1.jar flink-jobmanager:/opt/flink/lib/ |
三、重启Flink集群生效
1 |
docker restart flink-jobmanager flink-taskmanager1 flink-taskmanager2 flink-taskmanager3 |
四、准备好kafka集群和 创建topic
依赖Refer: https://cpp.la/582.html#kafka-dockerKafka
1 2 3 4 5 6 |
# 一键拉起kafka base ARM docker-compose up -d docker-compose scale kafka=3 # 创建kafka topic : flinkcdc_v1 /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server=10.0.0.16:32768,10.0.0.16:32769,10.0.0.16:32770 --replication-factor 2 --partitions 3 --topic flinkcdc_v1 |
五、打开FlinkSQL客户端
1 |
docker exec -ti flink-jobmanager bash /opt/flink/bin/sql-client.sh |
六、开启MySQL Source
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
CREATE TABLE source_mysql_test( foo INT, bar STRING, PRIMARY KEY (foo) NOT ENFORCED ) WITH( 'connector' = 'mysql-cdc', 'hostname' = '172.20.0.6', 'port' = '3306', 'database-name' = 'test', 'table-name' = 'pokes', 'username' = 'root', 'password' = '123456', 'scan.startup.mode' = 'initial', 'debezium.heartbeat.interval' = '60000', 'debezium.snapshot.mode' = 'initial' ); |
七、开启Kafka Sink
1 2 3 4 5 6 7 8 9 10 11 |
CREATE TABLE sink_kafka_test ( foo INT, bar STRING, PRIMARY KEY (foo) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkcdc_v1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json', 'properties.bootstrap.servers' = '10.0.0.16:32768,10.0.0.16:32769,10.0.0.16:32770' ); |
八、执行插入
1 |
insert into sink_kafka_test select * from source_mysql_test; |
九、观察kafka消息
1 |
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server=10.0.0.16:32768,10.0.0.16:32769,10.0.0.16:32770 --topic flinkcdc_v1 --from-beginning |
By: cpp.la ,2024-04-09 , Mark ,FlinkCDC消费MySQL Binlog 并推送到Kafka(一)
One Reply to “FlinkCDC消费MySQL Binlog 并推送到Kafka(一)”