【Flume教程四】Flume根据事件输出到不同的hdfs目录
目录
【Flume教程四】Flume根据事件输出到不同的hdfs目录
记录一下flume to hdfs 所依赖的jar包 和 根据不同事件输出到不同的hdfs目录的过程。转载请注明:https://cpp.la/523.html,转载请注明:https://cpp.la/523.html,转载请注明:https://cpp.la/523.html
一、Flume to HDFS依赖
HDFS配置依赖
1 2 |
scp core-site.xml root@10.10.10.103:/opt/flume1.9/conf scp hdfs-site.xml root@10.10.10.103:/opt/flume1.9/conf |
然后Flume写入HDFS配置文件就可以直接使用HDFS上的PATH路径了。例如:/flume_dd/
HDFS JAR包依赖
1 2 3 4 5 6 7 |
scp hadoop-common-2.7.4.jar root@10.10.10.102:/opt/flume1.9/lib/ scp commons-configuration-1.6.jar root@10.10.10.102:/opt/flume1.9/lib/ scp hadoop-annotations-2.7.4.jar oot@10.10.10.102:/opt/flume1.9/lib/ scp hadoop-auth-2.7.4.jar root@10.10.10.102:/opt/flume1.9/lib/ scp hadoop-hdfs-2.7.4.jar root@10.10.10.102:/opt/flume1.9/lib/ scp htrace-core-3.1.0-incubating.jar root@10.10.10.102:/opt/flume1.9/lib/ scp commons-io-2.4.jar root@10.10.10.102:/opt/flume1.9/lib/ |
二、Flume根据事件输出到不同的hdfs目录
需要正则过滤器提取不同【事件变量】的具体值,然后添加到header里面。然后即可配置使用该【事件变量】。
被采集事件数据一:弹幕埋点,dd-barrage
1 2 3 4 5 |
{ "event": "dd-barrage", "key1": "value1", "key2": "value2" } |
被采集事件数据二:礼物埋点,dd-barrage
1 2 3 4 5 6 |
{ "event": "dd-gift", "key3": "value1", "key4": "value2", "key5": "xx" } |
消费Kafka的Flume提取event变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# kafka consumer a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 a1.sinkgroups = g1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = 10.10.10.103:49153,10.10.10.103:49154,10.10.10.103:49155 a1.sources.r1.kafka.consumer.group.id = flume_api_dd a1.sources.r1.kafka.topics = bi_dd # get event , then add to headers. by cpp.la a1.sources.r1.interceptors.i1.type=regex_extractor a1.sources.r1.interceptors.i1.regex = \"event\":\\s*\"(.*?)\" a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.name = event a1.sources.r1.interceptors.i1.serializers.s1.type=org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer a1.sources.r1.interceptors=i1 # set sink1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 10.10.10.102 a1.sinks.k1.port = 52020 # set sink2 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 10.10.10.103 a1.sinks.k2.port = 52020 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.keep-alive = 60 #set sink group a1.sinkgroups.g1.sinks = k1 k2 #set failover a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 10 a1.sinkgroups.g1.processor.priority.k2 = 1 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k2.channel = c1 a1.sinks.k1.channel = c1 |
写入HDFS的Flume使用event变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# flume to hdfs a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 10.10.10.102 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = 10.10.10.102 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /bi_dd/%{event}/%Y-%m-%d a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = logs-10.10.10.102- a1.sinks.k1.hdfs.fileSuffix = .data a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 1800 a1.sinks.k1.hdfs.rollSize = 12800000000 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.roundUnit = hour a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.batchSize = 2000 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.callTimeout=900000 a1.sinks.k1.hdfs.closeTries=60 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
三、具体效果
1 2 3 4 |
[hadoop@centos-6 root]$ hadoop fs -ls /bi_dd Found 2 items drwxrwxrwx - root supergroup 0 2020-12-18 00:00 /bi_dd/dd-barrage drwxrwxrwx - root supergroup 0 2020-12-18 00:00 /bi_dd/dd-gift |
Flume to HDFS with HA配置参考以下Refer:
转载请注明:https://cpp.la/523.html