ClickHouse-Kafka引擎,如何将同一消息流中不同的消息分拆到不同的目标表
目录
《ClickHouse-Kafka引擎,如何将同一消息流中不同的消息分拆到不同的目标表》,ClickHouse物化视图提供了一种非常通用的方式来使Kafka消息适应目标表数据。我们可以定义多个实例化视图,以将消息流拆分到不同的目标表中。
一、创建存储消费Kafka的数据表,按照level类别分类
- 这里level取值范围“A”, “B”,首先分别创建数据表cppla.kafka_readings_A, cppla.kafka_readings_B。
- dt为具体业务时间,标准北京时间。
1.1 创建数据表cppla.kafka_readings_A
1 2 3 4 5 6 7 8 |
CREATE TABLE cppla.kafka_readings_A( msg String, platform String, level String, name String, phone Int32, dt DateTime ) Engine = MergeTree PARTITION BY toYYYYMMDD(dt) ORDER BY (dt); |
1.2创建数据表cppla.kafka_readings_B
1 2 3 4 5 6 7 8 |
CREATE TABLE cppla.kafka_readings_B( msg String, platform String, level String, name String, phone Int32, dt DateTime ) Engine = MergeTree PARTITION BY toYYYYMMDD(dt) ORDER BY (dt); |
二、创建消费Kafka数据表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
CREATE TABLE cppla.kafka_readings_queue ( msg String, platform String, level String, name String, phone Int32, dt DateTime ) ENGINE = Kafka SETTINGS kafka_broker_list = '172.21.0.5:49153,172.21.0.5:49154,172.21.0.5:49155', kafka_topic_list = 'test_3', kafka_group_name = 'consumer_group3', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 20000, kafka_num_consumers = 1; |
三、创建物化视图分拣不同的消息到数据表中
1 2 3 4 5 6 7 8 9 |
# 分拣消息类别level=A CREATE MATERIALIZED VIEW cppla.kafka_readings_view_A TO cppla.kafka_readings_A AS SELECT msg, platform, level, name, phone, dt FROM cppla.kafka_readings_queue where level='A'; # 分拣消息类别level=B CREATE MATERIALIZED VIEW cppla.kafka_readings_view_B TO cppla.kafka_readings_B AS SELECT msg, platform, level, name, phone, dt FROM cppla.kafka_readings_queue where level='B'; |
四、往Kafka写入消息,测试分拣消息成功
1 2 3 4 5 |
# docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-console-producer.sh --broker-list 172.21.0.5:49154 --topic test_3 {"msg":"TT","platform":"TEST","level":"A","name":"zhangsan","phone":10086,"dt":"2021-08-24 14:27:48"} {"msg":"TT","platform":"TEST","level":"B","name":"lisi","phone":10010,"dt":"2021-08-24 14:27:54"} {"msg":"TT","platform":"TEST","level":"B","name":"waner","phone":10011,"dt":"2021-08-24 14:28:00"} {"msg":"TT","platform":"TEST","level":"A","name":"mazi","phone":10087,"dt":"2021-08-24 14:28:06"} |
五、总结
我们甚至也可以对同一个Kafka 全量列 消息源,创建多个物化视图按照类别拆分消息, 根据不同的列子集分类出不同结构的数据表存储。另外就是生产中最好用复制表+去重+分布式表的方式落地,保证数据的高可用性、安全性、可靠性,避免ClickHouse单分一片意外宕机造成的异常。
自动分拣同一个Kafka 全量列 消息源,存储到不同的结构表示例(demo by:cpp.la):
《ClickHouse-Kafka引擎,如何将同一消息流中不同的消息分拆到不同的目标表》,有参阅ClickHouse和相关文献和altinity.com等,by: cpp.la