FlinkCDC消费MySQL Binlog 并推送到Kafka(二)(Debezium CDC Format: debezium-json)
目录
Kafka消息中字段解释
- before:表示的是修改之前的数据
- after:表示的是修改之后的数据
- op:操作类型
Refer1: https://cpp.la/828.html
Refer1: https://cpp.la/832.html
一、查看FlinkCDC MySQL->Kafka过来的消息,第一次会全量读取表里的数据
1 2 3 4 5 6 7 |
{"before":null,"after":{"foo":4,"bar":"li"},"op":"c"} {"before":null,"after":{"foo":10,"bar":"message10update"},"op":"c"} {"before":null,"after":{"foo":9,"bar":"message9update"},"op":"c"} {"before":null,"after":{"foo":6,"bar":"b"},"op":"c"} {"before":null,"after":{"foo":5,"bar":"a"},"op":"c"} {"before":null,"after":{"foo":8,"bar":"dd"},"op":"c"} {"before":null,"after":{"foo":7,"bar":"c"},"op":"c"} |
二、测试数据表新增数据(flinkCDC MySQL->Kafka)
1 2 3 4 5 6 7 |
# MySQL Insert insert into test.pokes values(11, 'message11'); insert into test.pokes values(12, 'message12'); # Kafka Receive {"before":null,"after":{"foo":11,"bar":"message11"},"op":"c"} {"before":null,"after":{"foo":12,"bar":"message12"},"op":"c"} |
三、测试数据表修改数据(flinkCDC MySQL->Kafka)
1 2 3 4 5 6 7 8 9 |
# MySQL Update update test.pokes set bar='message11update' where foo = 11; update test.pokes set bar='message12update' where foo = 12; # Kafka Receive {"before":{"foo":11,"bar":"message11"},"after":null,"op":"d"} {"before":null,"after":{"foo":11,"bar":"message11update"},"op":"c"} {"before":{"foo":12,"bar":"message12"},"after":null,"op":"d"} {"before":null,"after":{"foo":12,"bar":"message12update"},"op":"c"} |
四、测试数据表删除数据(flinkCDC MySQL->Kafka)
1 2 3 4 5 6 7 |
# MySQL Delete delete from test.pokes where foo = 4; delete from test.pokes where foo = 5; # Kafka Receive {"before":{"foo":4,"bar":"li"},"after":null,"op":"d"} {"before":{"foo":5,"bar":"a"},"after":null,"op":"d"} |
五、测试数据表新增字段(flinkCDC MySQL->Kafka)
1 2 |
# 4.1 MySQL ADD fieldName,测试新增字段 ALTER TABLE test.pokes ADD age INT DEFAULT 0; |
Kafka无消息接到,业务表新增字段考虑到安全性建议删除重拖数据。
1 2 3 4 5 6 7 |
# 4.2 MySQL ADD & Insert ,测试新增字段后插入数据 insert into test.pokes values(13, 'message13', 13); insert into test.pokes values(14, 'message14', 14); # Kafka Receive {"before":null,"after":{"foo":13,"bar":"message13"},"op":"c"} {"before":null,"after":{"foo":14,"bar":"message14"},"op":"c"} |
Kafka无法检测到新增字段age,所以建议业务表新增字段考虑到安全性建议删除重拖数据。
六、测试数据表修改字段(flinkCDC MySQL->Kafka)
同上,实际业务中修改字段的可能性较小或不存在,建议业务表修改字段考虑到安全性建议删除重拖数据。