ClickHouse-Kafka引擎,数据重新刷新,重新写回所有数据的操作步骤
目录
假如在Clickhouse-Kafka引擎中丢失了数据,需要重读Kafka数据,如何重刷数据呢?或者说丢失某天的数据如何重新消费呢?《ClickHouse-Kafka引擎,数据重新刷新,重新写回所有数据的操作步骤》
基于:
一、首先关闭Kafka消息使用
1 |
DETACH TABLE cppla.kafka_readings_queue; |
二、清空Kafka数据存储表
1 2 3 4 5 6 7 |
# 清空常规MergeTree表 TRUNCATE TABLE cppla.kafka_readings; # 注意!!! # 复制表,这里我们可以利用Replacing去重特性,可以很方便的合并从某天开始丢失的数据。 # 或生产过程中直接卸载有问题的分区数据然后从该日期新消费处理。无需清空 TRUNCATE TABLE cppla.kafka_readings_replicated on cluster company_cluster; |
三、在Kafka主题的订阅使用者组中重置分区偏移量
位移重设策略:
1 2 3 4 5 6 7 8 9 10 11 |
--to-earliest:把位移调整到分区当前最小/早位移 --to-latest:把位移调整到分区当前最新位移 --to-current:把位移调整到分区当前位移 --to-offset <offset>: 把位移调整到指定位移处 --shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动 --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000 |
1 2 3 4 5 6 7 8 9 10 11 |
# 从Kafka最早的时候消费 docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.5:49153,172.21.0.5:49154,172.21.0.5:49155 --topic test_2 --group consumer_group2 --reset-offsets --to-earliest --execute # 从Kafka指定日期消息消费 docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.5:49153,172.21.0.5:49154,172.21.0.5:49155 --topic test_2 --group consumer_group2 --reset-offsets --all-topics --to-datetime 2021-08-22T00:00:00.000 --execute # 从Kafka所有分区指定offset偏移消费 docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.5:49153,172.21.0.5:49154,172.21.0.5:49155 --topic test_2 --group consumer_group2 --reset-offsets --to-offset 5 --execute # 从Kafka 指定分区指定offset偏移消费 docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.5:49153,172.21.0.5:49154,172.21.0.5:49155 --topic test_2:1 --group consumer_group2 --reset-offsets --to-offset 6 --execute |
四、重新激活Kafka消息的使用
1 |
ATTACH TABLE cppla.kafka_readings_queue; |
五、等待消息处理完毕,重新查询数据
1 2 3 4 5 6 7 |
# 常规表MergeTree select count(1) from cppla.kafka_readings; # 注意!!! # 复制表,这里我们可以利用Replacing去重特性,可以很方便的合并从某天开始丢失的数据。 # 或生产过程中直接卸载有问题的分区数据然后从该日期新消费处理。无需清空 select count(1) from cppla.kafka_readings_distributed; |
六、结合复制表引擎ReplicatedReplacingMergeTree,去重即可得到最新的完整数据记录集
去重注意事项:
- 使用ORDER BY 排序键作为判断数据是否重复的唯一键
- 只有在合并分区时才会触发删除重复数据的逻辑
- 只能在相同分区的数据去重,跨分区不会去重。即使使用OPTIMIZE TABL也不会垮分区去重
- 数据去重策略,如果没有设置ver版本号,则保留同一组重复数据的最后一行;如果设置了ver版本号,则保留同一组重复数据中ver字段取值最大的那一行。
《ClickHouse-Kafka引擎,数据重新刷新,重新写回所有数据的操作步骤》,有参阅ClickHouse和相关文献,by: cpp.la