关于Flume拦截器的问题:
flume官方文档:https://flume.apache.org/documentation.html
当我们采用flume - kafka - flume的中间件:
第一层flume:
#为各组件命名 a1.sources = r1 a1.channels = c1 #描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.ETLInterceptor$MyBuilder #描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadooop104:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #绑定source和channel以及sink和channel的关系 a1.sources.r1.channels = c1
taildir source:
taildir_position.json存储的文件格式: [{"inode":529114,"pos":774728,"file":"/opt/module/applog/log/app.2021-10-22.log"}]
这里我们使用自定义非JSON格式拦截器,过滤掉非JSON格式的信息
kafka channel:
减少take事务,提高效率
在kafka 1.6 中 ,kafka channel 中存在bug:
parseAsFlumeEvent 这个参数无论设置ture还是false,都会为对source来的数据进行解析,解析完会把头部信息加到数据前面,因此这种情况,下游会需要做额外的截取工作
在kafka 1.7 中,解决了这个bug,默认是ture,我们设置false
第二层Flume:
## 组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.TimeStampInterceptor$MyBuilder ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.round = false #解决小文件问题 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/ ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
kafka source:
因为我们要设置时间戳拦截器,防止零点漂移问题,所以我们不能采用Kafka channel 来省去source
时间戳拦截器就是提取数据本身的timestamp,把它放到头部信息header中
file Chanel:
落盘
hdfs sink:
useLocalTimeStamp:是否使用当地时间。默认值:flase
所以我们的hdfs路径上落盘文件就按照头部信息的时间戳落盘
codeC:文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
fileType:文件格式,包括:SequenceFile, DataStream,CompressedStream,默认值:SequenceFile
当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)