知识点
相关文章
更多最近更新
更多Storm常见模式——批处理
2019-03-02 23:44|来源: 网路
Storm对流数据进行实时处理时,一种常见场景是批量一起处理一定数量的tuple元组,而不是每接收一个tuple就立刻处理一个tuple,这样可能是性能的考虑,或者是具体业务的需要。
例如,批量查询或者更新数据库,如果每一条tuple生成一条sql执行一次数据库操作,数据量大的时候,效率会比批量处理的低很多,影响系统吞吐量。
当然,如果要使用Storm的可靠数据处理机制的话,应该使用容器将这些tuple的引用缓存到内存中,直到批量处理的时候,ack这些tuple。
下面给出一个简单的代码示例:
现在,假设我们已经有了一个DBManager数据库操作接口类,它至少有两个接口:
(1)getConnection(): 返回一个java.sql.Connection对象;
(2)getSQL(Tuple tuple): 根据tuple元组生成数据库操作语句。
为了在Bolt中缓存一定数量的tuple,构造Bolt时传递int n参数赋给Bolt的成员变量int count,指定每个n条tuple批量处理一次。
同时,为了在内存中缓存缓存Tuple,使用java concurrent中的ConcurrentLinkedQueue来存储tuple,每当攒够count条tuple,就触发批量处理。
另外,考虑到数据量小(如很长时间内都没有攒够count条tuple)或者count条数设置过大时,因此,Bolt中加入了一个定时器,保证最多每个1秒钟进行一次批量处理tuple。
下面是Bolt的完整代码(仅供参考):
import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class BatchingBolt implements IRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Queue<Tuple> tupleQueue = new ConcurrentLinkedQueue<Tuple>(); private int count; private long lastTime; private Connection conn; public BatchingBolt(int n) { count = n; //批量处理的Tuple记录条数 conn = DBManger.getConnection(); //通过DBManager获取数据库连接 lastTime = System.currentTimeMillis(); //上次批量处理的时间戳 } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { tupleQueue.add(tuple); long currentTime = System.currentTimeMillis(); // 每count条tuple批量提交一次,或者每个1秒钟提交一次 if (tupleQueue.size() >= count || currentTime >= lastTime + 1000) { Statement stmt = conn.createStatement(); conn.setAutoCommit(false); for (int i = 0; i < count; i++) { Tuple tup = (Tuple) tupleQueue.poll(); String sql = DBManager.getSQL(tup); //生成sql语句 stmt.addBatch(sql); //加入sql collector.ack(tup); //进行ack } stmt.executeBatch(); //批量提交sql conn.commit(); conn.setAutoCommit(true); System.out.println("batch insert data into database, total records: " + count); lastTime = currentTime; } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
转自:http://www.cnblogs.com/panfeng412/archive/2012/06/19/storm-common-patterns-of-batching
相关问答
更多-
什么是常见的培训模式[2022-03-18]
-
storm本地模式是不是本地程序直接可以跑起来?[2022-03-13]
1.jpg 支持的,0.9开始支持得好一些在以前的版本里,如果想在Windows平台上运行Storm,你需要安装ZeroMQ,修改Storm的源码,追加一些Windows平台特定的脚本。而在新的版本里,因为用netty替换了ZeroMQ,由于netty用纯java实现,因此使得Storm具有更好的跨平台特性,现在要在Windows上运行Storm比以前容易很多。 -
linux的常见的分区类型有哪些?[2022-05-23]
ext2,ext3比较常用,但是最新的ubuntu己经在用ext4了。我用的就是。另个,还有一些嵌入式linux系统常用jffs和yaffs文件系统。而且,linux支持几乎所有你所知道的文件系统类型(也即分类类型).包括windows下常用的fat32 ,ntfs等。 -
java中几种常见的设计模式[2022-05-10]
一共23种设计模式! 按照目的来分,设计模式可以分为创建型模式、结构型模式和行为型模式。 创建型模式用来处理对象的创建过程;结构型模式用来处理类或者对象的组合;行为型模式用来对类或对象怎样交互和怎样分配职责进行描述。 创建型模式用来处理对象的创建过程,主要包含以下5种设计模式: 工厂方法模式(Factory Method Pattern) 抽象工厂模式(Abstract Factory Pattern) 建造者模式(Builder Pattern) 原型模式(Prototype Patter ... -
JAVA中常见的设计模式都有啥[2023-11-15]
http://zhidao.baidu.com/question/20013947.html?fr=qrl3 -
web前端常见的代码设计模式有哪些[2023-02-04]
常见代码 function CreatePerson(name,age,sex) { var obj = new Object(); obj.name = name; obj.age = age; obj.sex = sex; obj.sayName = function(){ return this.name; } return obj; } var p1 = new CreatePerson("longen",'28','男'); var p2 = new CreatePerson("tugenhua" ... -
支架式教学设计的常见模式有哪些[2021-12-30]
问题支架 。问题是学习过程中最为常见的支架,相对“框架问题”而言,支架问题的系统性较弱,有经验的教师会在学生的学习过程中自然地、应机地提供此类支架。同时,在特定主题的学习中,“支架问题”往往比“框架问题”更具结构性,更加关注细节与可操作性 -
常见的教学模式有那些?[2023-04-22]
师生系统的传授和学习书本知识 教师辅导学生从活动中自己学习 折衷于俩者之间的教学模式