如何构建高效的storm计算模型

2019-03-02 23:56|来源: 网路

计算机制简介

        Storm采用流式计算的模型,和shell类似让数据在一个个“管道”中进行处理。

  • Spout负责从数据源拉取数据,相当于整个系统的生产者。
  • Bolt负责消费数据并将tuple发送给下一个计算单元。Bolt可以接受多个spout和bolt的数据。
  • 每个spout,bolt可以设置并行度excuter相当于多进程,每个excuter可以设置多个task  
  • shuffle grouping,它随机将tuple发给任何一个task;fields grouping,相同field值的tuple发送给同一个task。

A Storm topology

数据完整性

        当spout发送一个数据的时候为每一个tuple产生一个唯一的message id。当数据被完整处理的时候bolt会产生一个应答ack(成功)或fail(失败),如果数据超过(默认30s)则视为超时然后丢弃掉(可以通过操纵fail方法重新发送数据,不过这带来很高的计算成本)。同时受spout发射tuple最大数的限制bole的处理速度会影响spout的发射速度。因此如果保证数据被快速消费掉成为影响流式计算速度的关键所在。

 

stom计算模型

        一个简单的storm计算模型基本包括3部分:从数据源拉取数据,关联离线的维表,将结果写入数据库。

我们假设需要统计一个购物网站商品分类目的点击人数次数,而且这个网站数据量非常大。大致步骤如下:

    A. FF负责产生商品点击数据

    B. 关联商品类目

    C.将结果写入hbase

    商品id:auc_id   用户id:user_id

 

A.拉取数据

         你的任务跑的很快,资源占用也少但是数据为啥数据也这么少呢?不好,数据全堆积在FF数据源了。ok,加大spout的task数,并行度为1。但是为啥数据还是这么少,来看看我们的代码。

    public  void nextTuple() {     

        while (true) {
            LogData log = null;
            try {
                log = queue.take(); 
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
                return;
            }            

            if(log.getType() == null)
                continue;            
            
            collector.emit(streamId, new Values(log.getData(), uid));
        }
    }

        问题的关键在于take为阻塞方法,而storm的多线程由同一个excuter来控制,相当于一个循环在多个task之间切换。当一个task阻塞的时候其他task也无法执行,但大部分线程是可以拿到数据的,整体相当于只有单线程在执行。解决办法是改用无阻塞的poll方法从队列中拿到数据。

        如果增加并行度呢?改成多  excuter单task之后即使单个task的take  方法阻塞也不会对其他task产生影响,而且效率也比多task高。但随之引来一个新问题:由于task的阻塞导致任务超时失败率增加部分数据被丢弃,因此乖乖改成无阻塞的poll方法吧。

        如果还想快一点呢?那就直接去掉应答,因为应答本身也消耗资源,但是统计不到失败率的,慎用。

   conf.setNumAckers(0);

        总结起来就是拉数据提高并行度,task数设为1,取数据采用无阻塞方法,数据量大去掉应答。

B. 关联商品类目

        离线商品表?听起来很大的样子。这时候我们需要一个缓存,LRUCache是个不错的选择,他是一个双向链表的数据结构,查询次数越高会靠前,查询次数低会排在后面,甚至舍弃。商品表太大导入hbase很慢?分表吧。我们需要做的就是将商品表哈希到n个小表然后批量导入。查询的的时候如果没有命中缓存则将auc_id哈希到对应的商品表进行查询。

        这时候你会发现查询商品表,累加,然后将结果存入hbase是一个很长的过程,而这很可能造成你的处理超时然后数据被丢掉。这里我们引入BlockingQueue,如果BlockingQueue是空的,取数操作会阻断进入等待状态,直到有值才被唤醒,存数时如果队列是满的,则阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒。我们将user_id哈希到n个BlockingQueue(为最大利用cpu n为cpu数),将用户数据插入到对应的BlockingQueue后直接应答,这样storm就可以快速进行下一步处理。同时该BlockingQueue对应的线程负责消费数据,所有线程共享LRUCach商品表缓存。

C.将结果写入hbase

        这时我们需要一个队列存储结果,ConcurrentHashMap是一个线程安全的无阻塞数组。前面说到数据是分散到不同的线程进行计算的。每个线程将结果插入到同一个ConcurrentHashMap(插入读写无阻塞),然后通过ScheduledThreadPoolExecutor定时将输入批量导入到hbase。

       

D.计算点击人数

        这里讨论另外一个问题大数据去重,比较简单的方法是直接建立user_id缓存,但是这样很耗资源。通过bloomfilter可以损失很小的准确性的情况下完成去重。具体参考http://blog.csdn.net/jiaomeng/article/details/1495500


转自:http://blog.csdn.net/z_l_l_m/article/details/8251785

相关问答

更多
  • bg4.png storm的,这方面不多,有资料可以参考 Storm入门指南 基于Storm进行实时网络攻击检测及数据挖掘文档下载 hadoop、storm、数据挖掘等文档分享 storm实时流计算应用开发框架-天罡从需求到技术方案介绍 hadoop、hive、storm文档、电子书籍分享 storm编程入门:基本概念 hadoop、storm、hbase面试题、工作日常问答 Storm相关20文档与相关包 Storm入门教程汇总
  • Hi: 下列还有三个短语相同意思: cook up a storm烹饪上露一手 dance up a storm舞姿翩翩 talk up a storm 侃侃而谈 [MAINLY US INFORMAL:主要用于美国,非正式用语] to do something with a lot of energy and often skill:[以非凡的能力和技能做某事] Rob was in the kitchen cooking up a storm. Rob在厨房里大显身手. (from Cambridge ...
  • 当下雨的时候,风暴都到了
  • bg4.png 1、大数据核心是什么? 我觉得大数据的核心,首先是有其价值,如果数据量很大,而没有价值,那么大数据也就没什么特别了,所以大数据的最重要的就是我们能从大量数据中分析、挖掘出对组织有益的信息,当然了,到底有没有用,还得经过实际验证。 另外,就是速度得快,市场机会稍纵即逝,所以如果分析那么多数据,需要一个星期,或者一个月,那么可能意义也不大了。 2、Storm, Spark, Hadoop三个大数据处理工具谁将成为主流? 其实这些只是表面不同的工具而已,本质上的思想是一致,我相信未来还会有更多的工 ...
  • Storm是什么文件[2023-06-19]

    Storm译为汉语即‘暴风雨’、“暴风雪”,是暴风影音软件的英文名,是一种媒体播放器。   Storm还是一个分布式的、容错的实时计算系统,由BackType开发,广泛用于进行实时日志处理,实时统计、实时风控、实时推荐等场景中,目前最新版本是Storm 0.8.0。   Storm还是外文歌曲的名字,具体可在百度音乐中搜索。
  • “值”类型接受任何类型的对象和任何数字。 所以你可以简单地从一个Bolt的execute方法或者一个Spout的nextTuple方法发送一个List: List words = new ArrayList<>(); words.add("one word"); words.add("another word"); _collector.emit(new Values(words)); 您也可以添加一个新的Field,只需确保在declareOutputFields方法中声明它 _coll ...
  • 那么我找到了解决方案。 以下是我所做的: 1)在zookeeper VM上运行sudo nano /etc/environment并将内容更改为以下内容: JAVA_HOME="/usr/lib/jvm/java-7-openjdk-i386" ZOOKEEPER_HOME="/usr/share/zookeeper-3.4.9" PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:$JAVA_HOME/ ...
  • Maven正试图从https repo“ https://clojars.org/repo/ ”下载jar,但java没有客户端证书。 按照以下步骤导入客户端证书 第1步:下载客户端证书 在浏览器中打开https://clojars.org/repo/ URL(即firefox) 单击URL栏右侧的锁定图标 显示服务器URL,单击以获取右箭头,然后单击“更多信息” 弹出窗口以查看服务器的证书。 单击“查看证书”,在“详细信息”表中导出到文件CERT_FILE_NAME.crt 步骤2:将客户端证书导入mav ...
  • 错误是由于事实 - BlockingQueue未在输出收集器中初始化; The error is due to the fact -- The BlockingQueue was not initilaized in the output collector;
  • @Matthias J. Sax和大家,感谢您的帮助。 我在这里犯的错误是,我所遵循的部署过程是错误的。 要部署toplogy构建,我必须遵循以下流程, Jar必须被推入风暴AWS文件夹,然后必须在命令下运行以使其被Storm识别 rm -f * .out (nohup bin / storm nimbus> nimubus.out)& (nohup bin / storm supervisor> supervisor.out)& (nohup bin / storm jar topos / IoT.jar ...