Spout的实现步骤:
· 对文件的改变进行分开的监听,并监视文件夹下有无新日志文件加入。
· 在数据得到了字段的说明后,将其转换成tuple。
· 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。
Spout的详细编码在Listing Three中显示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。 1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector ) 2. { 3. _collector = collector; 4. try 5. { 6. fileReader = new BufferedReader(new FileReader(new File(file))); 7. } 8. catch (FileNotFoundException e) 9. { 10. System.exit(1); 11. } 12. } 13. 14. public void nextTuple() 15. { 16. protected void ListenFile(File file) 17. { 18. Utils.sleep(2000); 19. RandomAccessFile access = null; 20. String line = null; 21. try 22. { 23. while ((line = access.readLine()) != null) 24. { 25. if (line !=null) 26. { 27. String[] fields=null; 28. if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter()); 29. else 30. fields = line.split (tupleInfo.getDelimiter()); 31. if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields)); 32. } 33. } 34. } 35. catch (IOException ex){ } 36. } 37. } 38. 39. public void declareOutputFields(OutputFieldsDeclarer declarer) 40. { 41. String[] fieldsArr = new String [tupleInfo.getFieldList().size()]; 42. for(int i=0; i<tupleInfo.getFieldList().size(); i++) 43. { 44. fieldsArr = tupleInfo.getFieldList().get(i).getColumnName(); 45. } 46. declarer.declare(new Fields(fieldsArr)); 47. }
declareOutputFileds()决定了tuple发射的格式,这种话Bolt就能够用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有加入Spout就会进行读入而且发送给Bolt进行处理。
很多其它精彩内容请关注:http://bbs.superwu.cn 关注超人学院微信二维码: |