77范文网 - 专业文章范例文档资料分享平台

Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(4)

来源:网络收集 时间:2019-08-03 下载这篇文档 手机版
说明:文章内容仅供预览,部分内容可能不全,需要完整文档或者需要复制内容,请下载word后使用。下载word有问题请添加微信号:或QQ: 处理(尽可能给您提供完整文档),感谢您的支持与谅解。点击这里给我发消息

第二步,根据访问记录生成相应的Session信息记录,假设Session的过期时间是30分钟

[plain] view plain copy

1. package com.guludada.clickstream; 2.

3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Locale; 12. import java.util.UUID; 13.

14. import org.apache.hadoop.conf.Configuration; 15. import org.apache.hadoop.fs.Path; 16. import org.apache.hadoop.io.IntWritable; 17. import org.apache.hadoop.io.NullWritable; 18. import org.apache.hadoop.io.Text; 19. import org.apache.hadoop.mapreduce.Job; 20. import org.apache.hadoop.mapreduce.Mapper; 21. import org.apache.hadoop.mapreduce.Reducer; 22. import org.apache.hadoop.mapreduce.Mapper.Context;

23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 25.

26. import com.guludada.clickstream.logClean.cleanMap; 27. import com.guludada.dataparser.SessionParser; 28. import com.guludada.dataparser.WebLogParser; 29. import com.guludada.javabean.WebLogSessionBean; 30.

31. public class logSession { 32.

33. public static class sessionMapper extends Mapper

{ 34.

35. private Text IPAddr = new Text(); 36. private Text content = new Text();

37. private NullWritable v = NullWritable.get(); 38. WebLogParser webLogParser = new WebLogParser(); 39.

40. public void map(Object key,Text value,Context context) { 41.

42. //将一行内容转成string

43. String line = value.toString(); 44.

45. String[] weblogArry = line.split(\ 46.

47. IPAddr.set(weblogArry[0]); 48. content.set(line); 49. try {

50. context.write(IPAddr,content); 51. } catch (IOException e) {

52. // TODO Auto-generated catch block 53. e.printStackTrace();

54. } catch (InterruptedException e) { 55. // TODO Auto-generated catch block 56. e.printStackTrace(); 57. } 58. } 59. } 60.

61. static class sessionReducer extends Reducer

le>{ 62.

63. private Text IPAddr = new Text(); 64. private Text content = new Text();

65. private NullWritable v = NullWritable.get(); 66. WebLogParser webLogParser = new WebLogParser();

67. SimpleDateFormat sdf = new SimpleDateFormat(\

68. SessionParser sessionParser = new SessionParser(); 69.

70. @Override

71. protected void reduce(Text key, Iterable values, Context conte

xt) throws IOException, InterruptedException { 72.

73. Date sessionStartTime = null;

74. String sessionID = UUID.randomUUID().toString(); 75.

76.

77. //将IP地址所对应的用户的所有浏览记录按时间排序

78. ArrayList sessionBeanGroup = new ArrayList

ebLogSessionBean>();

79. for(Text browseHistory : values) {

80. WebLogSessionBean sessionBean = sessionParser.loadBean(brows

eHistory.toString());

81. sessionBeanGroup.add(sessionBean); 82. }

83. Collections.sort(sessionBeanGroup,new Comparator

an>() { 84.

85. public int compare(WebLogSessionBean sessionBean1, WebLogSes

sionBean sessionBean2) {

86. Date date1 = sessionBean1.getTimeWithDateFormat(); 87. Date date2 = sessionBean2.getTimeWithDateFormat(); 88. if(date1 == null && date2 == null) return 0; 89. return date1.compareTo(date2); 90. } 91. }); 92.

93. for(WebLogSessionBean sessionBean : sessionBeanGroup) { 94.

95. if(sessionStartTime == null) {

96. //当天日志中某用户第一次访问网站的时间

97. sessionStartTime = timeTransform(sessionBean.getTime());

98. content.set(sessionParser.parser(sessionBean, sessionID)

);

99. try {

100. context.write(content,v); 101. } catch (IOException e) {

102. // TODO Auto-generated catch block 103. e.printStackTrace();

104. } catch (InterruptedException e) { 105. // TODO Auto-generated catch block 106. e.printStackTrace(); 107. } 108. 109. } else { 110.

111. Date sessionEndTime = timeTransform(sessionBean.getTime

());

112. long sessionStayTime = timeDiffer(sessionStartTime,sess

ionEndTime);

113. if(sessionStayTime > 30 * 60 * 1000) {

114. //将当前浏览记录的时间设为下一个session的开始时间 115. sessionStartTime = timeTransform(sessionBean.getTim

e());

116. sessionID = UUID.randomUUID().toString();

117. continue; 118. }

119. content.set(sessionParser.parser(sessionBean, sessionID

)); 120. try {

121. context.write(content,v); 122. } catch (IOException e) {

123. // TODO Auto-generated catch block 124. e.printStackTrace();

125. } catch (InterruptedException e) { 126. // TODO Auto-generated catch block 127. e.printStackTrace(); 128. }

129. } 130. } 131. } 132.

133. private Date timeTransform(String time) { 134.

135. Date standard_time = null; 136. try {

137. standard_time = sdf.parse(time); 138. } catch (ParseException e) {

139. // TODO Auto-generated catch block 140. e.printStackTrace(); 141. }

142. return standard_time; 143. } 144.

145. private long timeDiffer(Date start_time,Date end_time) { 146.

147. long diffTime = 0;

148. diffTime = end_time.getTime() - start_time.getTime(); 149.

150. return diffTime;

151. } 152. 153. } 154. 155.

156. public static void main(String[] args) throws Exception { 157.

158. Configuration conf = new Configuration(); 159.

160. conf.set(\ 161.

162. Job job = Job.getInstance(conf); 163.

164. job.setJarByClass(logClean.class); 165.

166. //指定本业务job要使用的mapper/Reducer业务类 167. job.setMapperClass(sessionMapper.class); 168. job.setReducerClass(sessionReducer.class); 169.

170. //指定mapper输出数据的kv类型

171. job.setMapOutputKeyClass(Text.class); 172. job.setMapOutputValueClass(Text.class); 173.

174. //指定最终输出的数据的kv类型

175. job.setOutputKeyClass(Text.class);

176. job.setOutputValueClass(NullWritable.class); 177.

178. Date curDate = new Date();

179. SimpleDateFormat sdf = new SimpleDateFormat(\ 180. String dateStr = sdf.format(curDate); 181.

182. //指定job的输入原始文件所在目录

183. FileInputFormat.setInputPaths(job, new Path(\

/\

184. //指定job的输出结果所在目录

185. FileOutputFormat.setOutputPath(job, new Path(\

ata/\ 186.

187. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn

去运行 188.

189. boolean res = job.waitForCompletion(true); 190. System.exit(res?0:1); 191.

百度搜索“77cn”或“免费范文网”即可找到本站免费阅读全部范文。收藏本站方便下次阅读,免费范文网,提供经典小说教育文库Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(4)在线全文阅读。

Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(4).doc 将本文的Word文档下载到电脑,方便复制、编辑、收藏和打印 下载失败或者文档不完整,请联系客服人员解决!
本文链接:https://www.77cn.com.cn/wenku/jiaoyu/675783.html(转载请注明文章来源)
Copyright © 2008-2022 免费范文网 版权所有
声明 :本网站尊重并保护知识产权,根据《信息网络传播权保护条例》,如果我们转载的作品侵犯了您的权利,请在一个月内通知我们,我们会及时删除。
客服QQ: 邮箱:tiandhx2@hotmail.com
苏ICP备16052595号-18
× 注册会员免费下载(下载后可以自由复制和排版)
注册会员下载
全站内容免费自由复制
注册会员下载
全站内容免费自由复制
注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: