FlinkCDC消费MySQL Binlog 并推送到Kafka(三)(Canal CDC Format: canal-json)
目录
Docker快速拉起Flink集群 、MySQL服务并打开GTID
FlinkCDC消费MySQL Binlog 并推送到Kafka(一)
FlinkCDC消费MySQL Binlog 并推送到Kafka(二)(Debezium CDC Format: debezium-json)
FlinkCDC消费MySQL Binlog 并推送到Kafka(三)(Canal CDC Format: canal-json),这个格式比较符合预期。
一、FlinkCDC MySQL to Kafka , Source,canal-json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
CREATE TABLE source_mysql_test( db_name STRING METADATA FROM 'database_name' VIRTUAL, table_name STRING METADATA FROM 'table_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, foo INT, bar STRING, PRIMARY KEY (foo) NOT ENFORCED ) WITH( 'connector' = 'mysql-cdc', 'hostname' = '172.20.0.6', 'port' = '3306', 'database-name' = 'test', 'table-name' = 'pokes', 'username' = 'root', 'password' = '123456', 'scan.startup.mode' = 'initial', 'debezium.heartbeat.interval' = '60000', 'debezium.snapshot.mode' = 'initial', 'debezium.parallelism' = '1' #并行度为1,则严格有序 ); |
二、FlinkCDC MySQL to Kafka , Sink, canal-json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
CREATE TABLE sink_kafka_test ( db_name STRING, table_name STRING, operation_ts TIMESTAMP_LTZ(3), foo INT, bar STRING, PRIMARY KEY (foo) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'flinkcdc_v1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'canal-json', 'properties.bootstrap.servers' = '10.0.0.16:32768,10.0.0.16:32769,10.0.0.16:32770' ); |
三、FlinkCDC MySQL to Kafka 创建任务,canal-json
1 |
insert into sink_kafka_test select * from source_mysql_test; |
四、去kafka观察topic flinkcdc_v1消息,canal-json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
# 插入,Kafka接到的消息为 {"data":[{"db_name":"test","table_name":"pokes","operation_ts":"2024-04-10 10:58:07Z","foo":30,"bar":"message30"}],"type":"INSERT"} # 修改,Kafka接到的消息为 {"data":[{"db_name":"test","table_name":"pokes","operation_ts":"2024-04-10 10:58:51Z","foo":30,"bar":"message30"}],"type":"DELETE"} {"data":[{"db_name":"test","table_name":"pokes","operation_ts":"2024-04-10 10:58:51Z","foo":30,"bar":"message30update"}],"type":"INSERT"} # 删除,Kafka接到的消息 {"data":[{"db_name":"test","table_name":"pokes","operation_ts":"2024-04-10 10:59:20Z","foo":30,"bar":"message30update"}],"type":"DELETE"} # 新增列,修改列仍然需要重新拖数据。 |
五、更多格式可参考
https://nightlies.apache.org/flink/flink-docs-stable/zh/docs/connectors/table/formats/overview/
六、FlinkCDC Apache Kafka 连接器参数和配置选项
https://nightlies.apache.org/flink/flink-docs-stable/zh/docs/connectors/table/kafka/