第二步,根据访问记录生成相应的Session信息记录,假设Session的过期时间是30分钟
[plain] view plain copy
1. package com.guludada.clickstream; 2.
3. import java.io.IOException; 4. import java.text.ParseException; 5. import java.text.SimpleDateFormat; 6. import java.util.ArrayList; 7. import java.util.Collections; 8. import java.util.Comparator; 9. import java.util.Date; 10. import java.util.HashMap; 11. import java.util.Locale; 12. import java.util.UUID; 13.
14. import org.apache.hadoop.conf.Configuration; 15. import org.apache.hadoop.fs.Path; 16. import org.apache.hadoop.io.IntWritable; 17. import org.apache.hadoop.io.NullWritable; 18. import org.apache.hadoop.io.Text; 19. import org.apache.hadoop.mapreduce.Job; 20. import org.apache.hadoop.mapreduce.Mapper; 21. import org.apache.hadoop.mapreduce.Reducer; 22. import org.apache.hadoop.mapreduce.Mapper.Context;
23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 25.
26. import com.guludada.clickstream.logClean.cleanMap; 27. import com.guludada.dataparser.SessionParser; 28. import com.guludada.dataparser.WebLogParser; 29. import com.guludada.javabean.WebLogSessionBean; 30.
31. public class logSession { 32.
33. public static class sessionMapper extends Mapper
{ 34.
35. private Text IPAddr = new Text(); 36. private Text content = new Text();
37. private NullWritable v = NullWritable.get(); 38. WebLogParser webLogParser = new WebLogParser(); 39.
40. public void map(Object key,Text value,Context context) { 41.
42. //将一行内容转成string
43. String line = value.toString(); 44.
45. String[] weblogArry = line.split(\ 46.
47. IPAddr.set(weblogArry[0]); 48. content.set(line); 49. try {
50. context.write(IPAddr,content); 51. } catch (IOException e) {
52. // TODO Auto-generated catch block 53. e.printStackTrace();
54. } catch (InterruptedException e) { 55. // TODO Auto-generated catch block 56. e.printStackTrace(); 57. } 58. } 59. } 60.
61. static class sessionReducer extends Reducer le>{ 62. 63. private Text IPAddr = new Text(); 64. private Text content = new Text(); 65. private NullWritable v = NullWritable.get(); 66. WebLogParser webLogParser = new WebLogParser(); 67. SimpleDateFormat sdf = new SimpleDateFormat(\ 68. SessionParser sessionParser = new SessionParser(); 69. 70. @Override 71. protected void reduce(Text key, Iterable xt) throws IOException, InterruptedException { 72. 73. Date sessionStartTime = null; 74. String sessionID = UUID.randomUUID().toString(); 75. 76. 77. //将IP地址所对应的用户的所有浏览记录按时间排序 78. ArrayList ebLogSessionBean>(); 79. for(Text browseHistory : values) { 80. WebLogSessionBean sessionBean = sessionParser.loadBean(brows eHistory.toString()); 81. sessionBeanGroup.add(sessionBean); 82. } 83. Collections.sort(sessionBeanGroup,new Comparator an>() { 84. 85. public int compare(WebLogSessionBean sessionBean1, WebLogSes sionBean sessionBean2) { 86. Date date1 = sessionBean1.getTimeWithDateFormat(); 87. Date date2 = sessionBean2.getTimeWithDateFormat(); 88. if(date1 == null && date2 == null) return 0; 89. return date1.compareTo(date2); 90. } 91. }); 92. 93. for(WebLogSessionBean sessionBean : sessionBeanGroup) { 94. 95. if(sessionStartTime == null) { 96. //当天日志中某用户第一次访问网站的时间 97. sessionStartTime = timeTransform(sessionBean.getTime()); 98. content.set(sessionParser.parser(sessionBean, sessionID) ); 99. try { 100. context.write(content,v); 101. } catch (IOException e) { 102. // TODO Auto-generated catch block 103. e.printStackTrace(); 104. } catch (InterruptedException e) { 105. // TODO Auto-generated catch block 106. e.printStackTrace(); 107. } 108. 109. } else { 110. 111. Date sessionEndTime = timeTransform(sessionBean.getTime ()); 112. long sessionStayTime = timeDiffer(sessionStartTime,sess ionEndTime); 113. if(sessionStayTime > 30 * 60 * 1000) { 114. //将当前浏览记录的时间设为下一个session的开始时间 115. sessionStartTime = timeTransform(sessionBean.getTim e()); 116. sessionID = UUID.randomUUID().toString(); 117. continue; 118. } 119. content.set(sessionParser.parser(sessionBean, sessionID )); 120. try { 121. context.write(content,v); 122. } catch (IOException e) { 123. // TODO Auto-generated catch block 124. e.printStackTrace(); 125. } catch (InterruptedException e) { 126. // TODO Auto-generated catch block 127. e.printStackTrace(); 128. } 129. } 130. } 131. } 132. 133. private Date timeTransform(String time) { 134. 135. Date standard_time = null; 136. try { 137. standard_time = sdf.parse(time); 138. } catch (ParseException e) { 139. // TODO Auto-generated catch block 140. e.printStackTrace(); 141. } 142. return standard_time; 143. } 144. 145. private long timeDiffer(Date start_time,Date end_time) { 146. 147. long diffTime = 0; 148. diffTime = end_time.getTime() - start_time.getTime(); 149. 150. return diffTime; 151. } 152. 153. } 154. 155. 156. public static void main(String[] args) throws Exception { 157. 158. Configuration conf = new Configuration(); 159. 160. conf.set(\ 161. 162. Job job = Job.getInstance(conf); 163. 164. job.setJarByClass(logClean.class); 165. 166. //指定本业务job要使用的mapper/Reducer业务类 167. job.setMapperClass(sessionMapper.class); 168. job.setReducerClass(sessionReducer.class); 169. 170. //指定mapper输出数据的kv类型 171. job.setMapOutputKeyClass(Text.class); 172. job.setMapOutputValueClass(Text.class); 173. 174. //指定最终输出的数据的kv类型 175. job.setOutputKeyClass(Text.class); 176. job.setOutputValueClass(NullWritable.class); 177. 178. Date curDate = new Date(); 179. SimpleDateFormat sdf = new SimpleDateFormat(\ 180. String dateStr = sdf.format(curDate); 181. 182. //指定job的输入原始文件所在目录 183. FileInputFormat.setInputPaths(job, new Path(\ /\ 184. //指定job的输出结果所在目录 185. FileOutputFormat.setOutputPath(job, new Path(\ ata/\ 186. 187. //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn 去运行 188. 189. boolean res = job.waitForCompletion(true); 190. System.exit(res?0:1); 191. 百度搜索“77cn”或“免费范文网”即可找到本站免费阅读全部范文。收藏本站方便下次阅读,免费范文网,提供经典小说教育文库Flume+Hadoop+Hive的离线分析系统基本架构 - 图文(4)在线全文阅读。
相关推荐: