知识点
相关文章
更多最近更新
更多Storm【开发实战】- 流方式的统计系统
2019-03-02 23:58|来源: 网路
1: 初期硬件准备:
1 如果条件具备:请保证您安装好了 redis集群
2 配置好您的Storm开发环境
3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间
2:业务背景的介绍:
1 在这里我们将模拟一个 流方式的数据处理过程
2 数据的源头保存在我们的redis 集群之中
3 发射的数据格式为: ip,url,client_key
数据发射器
package storm.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Values; import backtype.storm.tuple.Fields; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import redis.clients.jedis.Jedis; import storm.utils.Conf; import java.util.Map; import org.apache.log4j.Logger; /** * click Spout 从redis中间读取所需要的数据 */ public class ClickSpout extends BaseRichSpout { private static final long serialVersionUID = -6200450568987812474L; public static Logger LOG = Logger.getLogger(ClickSpout.class); // 对于redis,我们使用的是jedis客户端 private Jedis jedis; // 主机 private String host; // 端口 private int port; // Spout 收集器 private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 这里,我们发射的格式为 // IP,URL,CLIENT_KEY outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP, storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY)); } @Override public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString()); this.collector = spoutOutputCollector; connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); } @Override public void nextTuple() { String content = jedis.rpop("count"); if (content == null || "nil".equals(content)) { try { Thread.sleep(300); } catch (InterruptedException e) { } } else { // 将jedis对象 rpop出来的字符串解析为 json对象 JSONObject obj = (JSONObject) JSONValue.parse(content); String ip = obj.get(storm.cookbook.Fields.IP).toString(); String url = obj.get(storm.cookbook.Fields.URL).toString(); String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY) .toString(); System.out.println("this is a clientKey"); // List<Object> tuple对象 collector.emit(new Values(ip, url, clientKey)); } } }
在这个过程之中,请注意:
1 我们在 OPEN 方法之中初始化 host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库
2 我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个
Values对象
让我们来看看数据的流向图:
在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt
1 : repeatVisitBolt
2 : geographyBolt
共同来读取同一个数据源的数据:clickSpout
3 细细察看 repeatVisitBolt
package storm.bolt; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import redis.clients.jedis.Jedis; import storm.utils.Conf; import java.util.Map; public class RepeatVisitBolt extends BaseRichBolt { private OutputCollector collector; private Jedis jedis; private String host; private int port; @Override public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; host = conf.get(Conf.REDIS_HOST_KEY).toString(); port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } public boolean isConnected() { if (jedis == null) return false; return jedis.isConnected(); } @Override public void execute(Tuple tuple) { String ip = tuple.getStringByField(storm.cookbook.Fields.IP); String clientKey = tuple .getStringByField(storm.cookbook.Fields.CLIENT_KEY); String url = tuple.getStringByField(storm.cookbook.Fields.URL); String key = url + ":" + clientKey; String value = jedis.get(key); // redis中取,如果redis中没有,就插入新的一条访问记录。 if (value == null) { jedis.set(key, "visited"); collector.emit(new Values(clientKey, url, Boolean.TRUE.toString())); } else { collector .emit(new Values(clientKey, url, Boolean.FALSE.toString())); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields( storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL, storm.cookbook.Fields.UNIQUE)); } }
在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】
4:
package storm.bolt; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; public class VisitStatsBolt extends BaseRichBolt { private OutputCollector collector; private int total = 0; private int uniqueCount = 0; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { //在这里,我们在上游来判断这个Fields 是否是独特和唯一的 boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE)); total++; if(unique)uniqueCount++; collector.emit(new Values(total,uniqueCount)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT, storm.cookbook.Fields.TOTAL_UNIQUE)); } }
第一次出现,uv ++
5 接下来,看看流水线2 :
package storm.bolt; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import org.json.simple.JSONObject; import storm.cookbook.IPResolver; import java.util.HashMap; import java.util.List; import java.util.Map; /** * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use * File | Settings | File Templates. */ public class GeographyBolt extends BaseRichBolt { // ip解析器 private IPResolver resolver; private OutputCollector collector; public GeographyBolt(IPResolver resolver) { this.resolver = resolver; } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { // 1 从上级的目录之中拿到我们所要使用的ip String ip = tuple.getStringByField(storm.cookbook.Fields.IP); // 将ip 转换为json JSONObject json = resolver.resolveIP(ip); // 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象 String city = (String) json.get(storm.cookbook.Fields.CITY); String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME); collector.emit(new Values(country, city)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // 确定了我们这次输出元祖的格式 outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY, storm.cookbook.Fields.CITY)); } }
以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换
package storm.bolt; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; public class GeoStatsBolt extends BaseRichBolt { private class CountryStats { // private int countryTotal = 0; private static final int COUNT_INDEX = 0; private static final int PERCENTAGE_INDEX = 1; private String countryName; public CountryStats(String countryName) { this.countryName = countryName; } private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>(); /** * @param cityName */ public void cityFound(String cityName) { countryTotal++; // 已经有了值,一个加1的操作 if (cityStats.containsKey(cityName)) { cityStats.get(cityName) .set(COUNT_INDEX, cityStats.get(cityName).get(COUNT_INDEX) .intValue() + 1); // 没有值的时候 } else { List<Integer> list = new LinkedList<Integer>(); list.add(1); list.add(0); cityStats.put(cityName, list); } double percent = (double) cityStats.get(cityName).get(COUNT_INDEX) / (double) countryTotal; cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent); } /** * @return 拿到的国家总数 */ public int getCountryTotal() { return countryTotal; } /** * @param cityName 依据传入的城市名称,拿到城市总数 * @return */ public int getCityTotal(String cityName) { return cityStats.get(cityName).get(COUNT_INDEX).intValue(); } public String toString() { return "Total Count for " + countryName + " is " + Integer.toString(countryTotal) + "\n" + "Cities: " + cityStats.toString(); } } private OutputCollector collector; // CountryStats 是一个内部类的对象 private Map<String, CountryStats> stats = new HashMap<String, CountryStats>(); @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY); String city = tuple.getStringByField(storm.cookbook.Fields.CITY); // 如果国家不存在的时候,新增加一个国家,国家的统计 if (!stats.containsKey(country)) { stats.put(country, new CountryStats(country)); } // 这里拿到新的统计,cityFound 是拿到某个城市的值 stats.get(country).cityFound(city); collector.emit(new Values(country, stats.get(country).getCountryTotal(), city, stats.get(country) .getCityTotal(city))); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields( storm.cookbook.Fields.COUNTRY, storm.cookbook.Fields.COUNTRY_TOTAL, storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL)); } }
有关地理位置的统计,附带上程序其他的使用类
package storm.cookbook; /** */ public class Fields { public static final String IP = "ip"; public static final String URL = "url"; public static final String CLIENT_KEY = "clientKey"; public static final String COUNTRY = "country"; public static final String COUNTRY_NAME = "country_name"; public static final String CITY = "city"; //唯一的,独一无二的 public static final String UNIQUE = "unique"; //城镇整数 public static final String COUNTRY_TOTAL = "countryTotal"; //城市整数 public static final String CITY_TOTAL = "cityTotal"; //总共计数 public static final String TOTAL_COUNT = "totalCount"; //总共独一无二的 public static final String TOTAL_UNIQUE = "totalUnique"; }
package storm.cookbook; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; public class HttpIPResolver implements IPResolver, Serializable { static String url = "http://api.hostip.info/get_json.php"; @Override public JSONObject resolveIP(String ip) { URL geoUrl = null; BufferedReader in = null; try { geoUrl = new URL(url + "?ip=" + ip); URLConnection connection = geoUrl.openConnection(); in = new BufferedReader(new InputStreamReader( connection.getInputStream())); String inputLine; JSONObject json = (JSONObject) JSONValue.parse(in); in.close(); return json; } catch (IOException e) { e.printStackTrace(); } finally { // 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作 if (in != null) { try { in.close(); } catch (IOException e) { } } } return null; } }
package storm.cookbook; import org.json.simple.JSONObject; /** * Created with IntelliJ IDEA. * User: admin * Date: 2012/12/07 * Time: 5:29 PM * To change this template use File | Settings | File Templates. */ public interface IPResolver { public JSONObject resolveIP(String ip); }
至此,整个流程完毕。 对于统计以后,数据如何持久,亦或是数据数据写回redis的过程,请实践~
转自:http://my.oschina.net/u/1791874/blog/284806
相关问答
更多-
现在的计算机技术员应该学些什么基本的技能?[2022-07-28]
系统 -
java开发实战宝典 pdf[2022-01-21]
java开发实战宝典 pdf web版的也有 -
1 课程介绍:课程主要内容,数据库的设计 2 自定义工作流代码研读:强大的自定义工作流中间件的代码分析 3 基础框架:包括组织机构,权限管理,个人事务等功能 4 Web建模:开发基于浏览器的Web建模工具,并配合工作流中间件进行对接,进一步丰富工作流。 5 BOM管理:物料清单业务的开发 6 任务包管理:任务包业务的开发,任务包审批流程的开发。 7 计划管理:通过使用plusgannt制定计划. 你说的是这个嘛?
-
1 课程介绍:课程主要内容,数据库的设计 2 自定义工作流代码研读:强大的自定义工作流中间件的代码分析 3 基础框架:包括组织机构,权限管理,个人事务等功能 4 Web建模:开发基于浏览器的Web建模工具,并配合工作流中间件进行对接,进一步丰富工作流。 5 BOM管理:物料清单业务的开发 6 任务包管理:任务包业务的开发,任务包审批流程的开发。 7 计划管理:通过使用plusgannt制定计划. 你说的是这个嘛?
-
PHP实战开发方面的视频教程哪里有 PHP实战开发方面的视频教程下载[2022-08-22]
你可以看下下面几套php视频教程 ,都很不错 教程1:php100视频教程 教程2:PHP项目开发全程实录 教程3:ThinkPHP教程 LAMP兄弟连与China-pub合作推出 送源码 PPT 教程4:php从入门到精通【第2版】 php+mysql +源码 教程5:PHP_MySQL入门到精通教程 PHP_MySQL教程 教程6:陶益数码Dreamweaver CS3/4设计PHP视频教程(VIP教程) 教程7:黑鹰基地php特训班php视频教程 教程地址: http://www.henanfilm. ... -
android开发实战经典,这本书怎么样?[2022-02-19]
学习编程,你还是看视频教程吧,看书本的话,太慢了;去IT教程网下载,上面的android视频教程一大把,都是免费的 -
PHP开发实战1200例第2卷什么时候出[2023-05-26]
个人估计,内部人员辞职,绝对搁浅了。 -
调试Storm Crawler(Debugging Storm Crawler)[2022-08-24]
WIKI项目有相关说明 There are instructions for that on the project WIKI -
我从来没有听说过DSMS一词,但是看一下维基百科上的描述,我认为Storm绝对可以说是DSMS。 来自维基百科: 它类似于数据库管理系统(DBMS)[...]但是,与DBMS相比,DSMS执行连续查询,该查询不仅执行一次,而且是永久安装的。 这听起来就像Storm一样。 但请注意,在Storm的情况下,它通常与DBMS结合使用。 例如,Storm可以提供One-time queries , unlimited secondary storage等,维基百科说这与DSMS结合时缺乏DSMS。 I had ne ...