ClickHouse-Kafka引擎, 高可用实时落地生产方案
目录
ClickHouse-Kafka引擎,高可用实时落地生产方案。这里以抽取event=cppla的信息为例,详细生产经过如文章所述。如遇到需要抽取其他event,从同一kafka消息队列(kafka_queue)建立物化视图抽取所需event消息即可。
一、clickhouse 集群生产环境
ClickHouse server version 21.8.4
- company_cluster 172.23.0.11 9000
- company_cluster 172.23.0.12 9000
- company_cluster 172.23.0.13 9000
- company_cluster 172.23.0.14 9000
二、 clickhouse-kafka高可用实时落地生产流程
三、 创建event=cppla的本地表,存储消息流数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# 创建kafka库,做好权限控制 create database kafka on cluster company_cluster; # 创建event=cppla的本地表,严格按照业务时间create_time分区,用于ReplicatedReplacingMergeTree引擎的合并,去重等 CREATE TABLE kafka.event_cppla ON CLUSTER company_cluster ( _offset UInt64, _partition UInt64, _timestamp Nullable(DateTime), msg String, event String, data String, create_time DateTime, insert_time DateTime DEFAULT now() ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/replicated/{shard}/kafka_event_cppla_version_1.0.2', '{replica}') PARTITION BY toYYYYMMDD(toDate(create_time)) ORDER BY (_offset, _partition) TTL toDate(create_time) + toIntervalDay(365) SETTINGS index_granularity = 8192, use_minimalistic_part_header_in_zookeeper = 1; |
四、创建event=cppla的分布式表,加速查询
1 2 |
CREATE TABLE kafka.event_cppla_dis ON CLUSTER company_cluster AS kafka.event_cppla ENGINE = Distributed('company_cluster', kafka, event_cppla, rand()); |
五、创建clickhouse kafka engine消息队列
在集群上创建clickhouse kafka engine消息队列,这样做的好处是一台机器的clickhouse-kafka-engine宕机,其他分片机器的clickhouse kafka engine消息队列会抢占式接管,继而实现了高可用。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
CREATE TABLE kafka.kafka_queue on cluster company_cluster ( msg String, event String, data String, create_time DateTime ) ENGINE = Kafka SETTINGS kafka_broker_list = '172.21.0.5:49153,172.21.0.5:49154,172.21.0.5:49155', kafka_topic_list = 'ck_topic', kafka_group_name = 'ck_consumer', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 20000, kafka_num_consumers = 1; |
六、创建MATERIALIZED VIEW物化试图抽取event=cppla的消息并存储
1 2 3 |
CREATE MATERIALIZED VIEW kafka.event_cppla_view on cluster company_cluster TO kafka.event_cppla AS SELECT _offset, _partition, _timestamp, msg, event, data, create_time FROM kafka.kafka_queue where event='cppla'; |
七、在kafka集群生产ck_topic消息
1 2 3 4 5 6 7 8 |
# docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-console-producer.sh --broker-list 172.21.0.5:49154 --topic ck_topic {"msg":"cpp.la msg type, code:0001","event":"cppla","data":"{\"a\":\"a100\",\"b\":\"b100\",\"c\":100}","create_time":"2021-09-27 14:22:58"} {"msg":"cpp.la msg type, code:0002","event":"cppla","data":"{\"a\":\"a101\",\"b\":\"b101\",\"c\":101}","create_time":"2021-09-27 14:22:59"} {"msg":"msg type, code:1002","event":"cxx","data":"{ \"e\": \"100010\"}","create_time":"2021-09-27 14:23:01"} {"msg":"msg type, code:1002","event":"cxx","data":"{ \"e\": \"100086\"}","create_time":"2021-09-27 14:23:03"} {"msg":"cpp.la msg type, code:0003","event":"cppla","data":"{\"a\":\"a102\",\"b\":\"b102\",\"c\":102}","create_time":"2021-09-27 14:23:50"} {"msg":"cpp.la msg type, code:0007","event":"cppla","data":"{\"a\":\"a103\",\"b\":\"b103\",\"c\":103}","create_time":"2021-09-27 14:25:58"} {"msg":"cpp.la msg type, code:0007","event":"cppla","data":"{\"a\":\"a104\",\"b\":\"b104\",\"c\":104}","create_time":"2021-09-27 14:25:59"} |
八、查询event=cppla消息已经被即时存储
8.1 查询消息已经被存储:event_cppla_get_data_string
8.2 查询解析后的json数据:event_cppla_get_data_json
九、如何即时抽取并存储event=cxx消息?
- 创建存储event=cxx的本地表,用于存储消息流数据
- 创建event=cxx的分布式表,加速查询
- 创建物化视图从已有kafka_queue队列中抽取event=cxx的消息并存储即可。
over《ClickHouse-Kafka引擎, 高可用实时落地生产方案》。 2021-09-27 ,by cpp.la