【Flume教程一】Flume采集配置详细教程
【Flume教程一】Flume采集配置详细教程
环境:centos 8.0,4H 4G 100GB。软件目录:/opt
一、下载并配置JDK1.8.0_251
下载并解压
1 2 3 4 |
cd /opt wget https://d2.injdk.cn/oracle/8/jdk-8u251-linux-x64.tar.gz tar -zxvf jdk-8u251-linux-x64.tar.gz mv jdk1.8.0_251 jdk1.8 |
配置环境变量 vim /etc/profile
1 2 3 4 |
export JAVA_HOME=/opt/jdk1.8 export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH |
使其生效
source /etc/profile
检查jdk
java -version
二、下载并配置Flume-1.9.0
下载flume1.9
1 2 3 4 |
cd /opt wget http://mirror.cogentco.com/pub/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz tar -zxvf apache-flume-1.9.0-bin.tar.gz mv apache-flume-1.9.0-bin flume1.9 |
修改jvm的内存大小,生产环境建议8192或者更大。
1 2 3 4 |
# vim flume1.9/bin/flume-ng ...... JAVA_OPTS="-Xmx128m" ...... |
新建flume配置:监听文件收集到Flume-Collector,文件名tail2flume
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# mkdir -p /opt/apiserver/log # cd /opt/flume1.9/conf/ # vim tail2flume a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 a1.sinkgroups = g1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/apiserver/log/positionoffset.log a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/apiserver/log/dd*.log a1.sources.r1.filegroups.f2 = /opt/apiserver/log/dd*.txt # set sink1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.10.100 a1.sinks.k1.port = 52020 # set sink2 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 10.10.10.101 a1.sinks.k2.port = 52020 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 #set sink group a1.sinkgroups.g1.sinks = k1 k2 #set failover a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 10 a1.sinkgroups.g1.processor.priority.k2 = 1 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k2.channel = c1 a1.sinks.k1.channel = c1 |
新建flume配置:启动Flume-Collector服务并推送到Kafka,文件名flume2kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# cd /opt/flume1.9/conf/ # vim flume2kafka a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = avro a1.sources.r1.bind = 10.10.10.100 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = 10.10.10.100 # channel-1000000-10000 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.keep-alive = 60 # this kafka buffer api is for collection for anywhere a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = bi_dd a1.sinks.k1.kafka.bootstrap.servers = 10.10.10.103:49153,10.10.10.103:49154,10.10.10.103:49155 a1.sinks.k1.kafka.flumeBatchSize = 2000 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 100 a1.sinks.k1.kafka.producer.compression.type = snappy # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
然后再另外一台机器配置组成双机热备即可
- 机器A:10.10.10.100, agentA + collectorA
- 机器B:10.10.10.101,agentB + collectorB
启动服务
1 2 3 4 5 |
# 分别启动flume to kafka Collector nohup bin/flume-ng agent -c conf -f conf/flume2kafka -n a1 -Dflume.root.logger=ERROR,console >> /opt/apiserver/log/flume2kafka.log & # 然后启动tail to flume agent nohup bin/flume-ng agent -c conf -f conf/tail2flume -n a1 -Dflume.root.logger=ERROR,console >> /opt/apiserver/log/tail2flume.log & |
往文本写数据,查看kafka 是否接收到消息
写入测试消息
1 2 3 4 5 6 7 8 9 10 |
[root@centos-2 flume1.9]# cd /opt/apiserver/log/ [root@centos-2 log]# ls flume2kafka.log positionoffset.log tail2flume.log [root@centos-2 log]# echo "hello,msg1" >> dd.log [root@centos-2 log]# echo "hello,msg2" >> dd.log [root@centos-2 log]# echo "hello,msg3" >> dd.log [root@centos-2 log]# echo "hello,msg4" >> dd.log [root@centos-2 log]# pwd /opt/apiserver/log [root@centos-2 log]# |
查看kafka消息
1 2 3 4 5 |
[root@centos-5 ~]# docker exec -ti kafka-docker_kafka_1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.10.10.103:49153 --topic bi_dd --from-beginning hello,msg2 hello,msg1 hello,msg3 hello,msg4 |
by:cpp.la