本系统中每一个FTP服务器以及Hadoop的name node服务器上都要部署一个Flume Agent;FTP的Flume Agent采集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到分布式的文件存储系统HDFS上面。
需要注意的是Flume的Source在本文的系统中选择的是Spooling Directory Source,而没有选
择Exec Source,因为当Flume服务down掉的时候Spooling Directory Source能记录上一次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。当然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架一层FTP服务器也是为了避免Flume“污染”生产环境。Spooling Directory Source另外一个比较大的缺点就是无法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志采集工具,像logstash等。
FTP服务器上的Flume配置文件如下:
[plain] view plain copy
1. agent.channels = memorychannel 2. agent.sinks = target 3.
4. agent.sources.origin.type = spooldir
5. agent.sources.origin.spoolDir = /export/data/trivial/weblogs 6. agent.sources.origin.channels = memorychannel
7. agent.sources.origin.deserializer.maxLineLength = 2048 8.
9. agent.sources.origin.interceptors = i2
10. agent.sources.origin.interceptors.i2.type = host
11. agent.sources.origin.interceptors.i2.hostHeader = hostname 12.
13. agent.sinks.loggerSink.type = logger
14. agent.sinks.loggerSink.channel = memorychannel 15.
16. agent.channels.memorychannel.type = memory 17. agent.channels.memorychannel.capacity = 10000 18.
19. agent.sinks.target.type = avro
20. agent.sinks.target.channel = memorychannel 21. agent.sinks.target.hostname = 172.16.124.130 22. agent.sinks.target.port = 4545
这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每个Event的大小,默认是每个Event是2048个byte。Flume Agent Channel的大小默认等于于本地服务器上JVM所获取到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。
在Hadoop服务器上的配置文件如下:
[plain] view plain copy
1. agent.sources = origin
2. agent.channels = memorychannel 3. agent.sinks = target 4.
5. agent.sources.origin.type = avro
6. agent.sources.origin.channels = memorychannel 7. agent.sources.origin.bind = 0.0.0.0 8. agent.sources.origin.port = 4545 9.
10. #agent.sources.origin.interceptors = i1 i2
11. #agent.sources.origin.interceptors.i1.type = timestamp 12. #agent.sources.origin.interceptors.i2.type = host
13. #agent.sources.origin.interceptors.i2.hostHeader = hostname 14.
15. agent.sinks.loggerSink.type = logger
16. agent.sinks.loggerSink.channel = memorychannel 17.
18. agent.channels.memorychannel.type = memory 19. agent.channels.memorychannel.capacity = 5000000
20. agent.channels.memorychannel.transactionCapacity = 1000000 21.
22. agent.sinks.target.type = hdfs
23. agent.sinks.target.channel = memorychannel
24. agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S 25. agent.sinks.target.hdfs.filePrefix = data-%{hostname} 26. agent.sinks.target.hdfs.rollInterval = 60 27. agent.sinks.target.hdfs.rollSize = 1073741824 28. agent.sinks.target.hdfs.rollCount = 1000000 29. agent.sinks.target.hdfs.round = true 30. agent.sinks.target.hdfs.roundValue = 10 31. agent.sinks.target.hdfs.roundUnit = minute 32. agent.sinks.target.hdfs.useLocalTimeStamp = true 33. agent.sinks.target.hdfs.minBlockReplicas=1 34. agent.sinks.target.hdfs.writeFormat=Text 35. agent.sinks.target.hdfs.fileType=DataStream
round, roundValue,roundUnit三个参数是用来配置每10分钟在hdfs里生成一个文件夹保存从FTP服务器上拉取下来的数据。
Troubleshooting
使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB-5KB的小文件的问题
需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB-5KB的小文件存储到HDFS上,那么很可能是HDFS Sink的配置不正确,导致系统使用了默认配置。spooldir类型的source是将指定目录中的文件的每一行封装成一个event放入到channel中,默认每一行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默认30秒), rollSize(默认1KB), rollCount(默认10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过多少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示一旦.tmp文件达到一定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件一旦写入了指定数量的events就下沉到HDFS文件系统中。
使用Flume拉取到HDFS中的文件格式错乱
这是因为HDFS Sink的配置中,hdfs.writeFormat属性默认为“Writable”会将原先的文件的内容序列化成HDFS的格式,应该手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默认是“SequenceFile”类型的,是将所有event拼成一行,应该该手动设置成hdfs.fileType=“DataStream”,这样就可以是一行一个event,与原文件格式保持一致
使用Mapreduce清洗日志文件
当把日志文件中的数据拉取到HDFS文件系统后,使用Mapreduce程序去进行日志清洗 第一步,先用Mapreduce过滤掉无效的数据
[plain] view plain copy
1. package com.guludada.clickstream; 2.
3. import java.io.IOException; 4. import java.text.SimpleDateFormat; 5. import java.util.Date;
6. import java.util.StringTokenizer; 7. import java.util.regex.Matcher; 8. import java.util.regex.Pattern; 9.
10. import org.apache.hadoop.conf.Configuration; 11. import org.apache.hadoop.fs.Path; 12. import org.apache.hadoop.io.IntWritable; 13. import org.apache.hadoop.io.NullWritable; 14. import org.apache.hadoop.io.Text; 15. import org.apache.hadoop.mapreduce.Job; 16. import org.apache.hadoop.mapreduce.Mapper;
17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19.
20. import com.guludada.dataparser.WebLogParser; 21. 22.
23. public class logClean { 24.
25. public static class cleanMap extends Mapper
相关推荐: