首页 \ 问答 \ 遇到BlockingQueue java问题,实现Storm(分布式计算)?(having issues with BlockingQueue java , implementation of Storm (Distributed computing)?)

遇到BlockingQueue java问题,实现Storm(分布式计算)?(having issues with BlockingQueue java , implementation of Storm (Distributed computing)?)

这是我的输入spout的代码snippt,用于将元组发送到处理节点以进行集群上的流处理。 问题是BlockingQueue抛出了InterruptedException。

private SpoutOutputCollector collector;
public BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();

public boolean isDistributed() {
    return true;
    }


public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {

    this.collector=collector;

}

@Override
public void nextTuple() {


    try {
        //Utils.sleep(100);
        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }



}

public void readInputfile() throws IOException, InterruptedException{
    FileInputStream file = new FileInputStream("/home/529076/Desktop/Temperature");
    DataInputStream readDate=new DataInputStream(file);
    BufferedReader readText=new BufferedReader(new InputStreamReader(readDate));

    String line;
    String singleReading = null;
    while((line=readText.readLine())!=null){
         singleReading=line;
         blockingQueue.add(singleReading);

    }

}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("Single Temperature Reading"));
}

例外描述如下:---

java.lang.InterruptedException10930 [Thread-20] INFO backtype.storm.util - 异步循环中断!

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at com.tcs.storm.test.InputStreamSpout.nextTuple(InputStreamSpout.java:65)
at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:413)

而nextTuple(InputStreamSpout.java:65是------>

        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));

谢谢


This is the code snippt of my input spout for emmiting tuple to a processing noded for stream processing over a cluster. The problem is The BlockingQueue is throwing InterruptedException .

private SpoutOutputCollector collector;
public BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();

public boolean isDistributed() {
    return true;
    }


public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {

    this.collector=collector;

}

@Override
public void nextTuple() {


    try {
        //Utils.sleep(100);
        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }



}

public void readInputfile() throws IOException, InterruptedException{
    FileInputStream file = new FileInputStream("/home/529076/Desktop/Temperature");
    DataInputStream readDate=new DataInputStream(file);
    BufferedReader readText=new BufferedReader(new InputStreamReader(readDate));

    String line;
    String singleReading = null;
    while((line=readText.readLine())!=null){
         singleReading=line;
         blockingQueue.add(singleReading);

    }

}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("Single Temperature Reading"));
}

The exception description is as followes :---

java.lang.InterruptedException10930 [Thread-20] INFO backtype.storm.util - Async loop interrupted!

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:1961)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1996)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at com.tcs.storm.test.InputStreamSpout.nextTuple(InputStreamSpout.java:65)
at backtype.storm.daemon.task$fn__3349$fn__3404.invoke(task.clj:413)

And The nextTuple(InputStreamSpout.java:65 is ------>

        collector.emit(new Values("Single Temperature Reading", blockingQueue.take()));

Thanks


原文:https://stackoverflow.com/questions/11343299
更新时间:2023-11-25 17:11

最满意答案

你的问题在这里:

->  Nested Loop Left Join  (cost=0.00..22871.67 rows=1 width=16) (actual time=1025.396..221266.357 rows=2593 loops=1)
    Join Filter: ("inner".orig_acct_payment_fk = "outer".acct_account_transaction_id)
    Filter: ("inner".link_type IS NULL)
        ->  Seq Scan on acct_account_transaction t  (cost=0.00..18222.98 rows=1 width=16) (actual time=949.081..976.432 rows=2596 loops=1)
                Filter: ((("type")::text = 'debit'::text) AND ((transaction_status)::text = 'active'::text) AND (date_effective >= '2012-03-01'::date) AND (date_effective   
            Seq Scan on acct_payment_link l  (cost=0.00..4648.68 rows=1 width=15) (actual time=1.073..84.610 rows=169 loops=2596)
                Filter: ((link_type)::text ~~ 'return_%'::text)

它期望在acct_account_transaction中找到1行,而它找到2596,对于另一个表也是如此。

你没有提到你的postgres版本(可能吗?),但这应该可以解决问题:

SELECT DISTINCT
    t.date_effective,
    t.acct_account_transaction_id,
    p.method,
    t.amount,
    c.business_name,
    t.amount
FROM
    contact c inner join contact_role on (c.contact_id=contact_role.contact_fk and contact_role.exchange_fk=74),
    acct_account a, acct_payment p,
    acct_account_transaction t
WHERE
    p.acct_account_transaction_fk=t.acct_account_transaction_id
    and t.type = 'debit'
    and transaction_status = 'active'
    and p.method != 'trade'
    and t.date_effective >= '2012-03-01'
    and t.date_effective < (date '2012-03-01' + interval '1 month')
    and c.contact_id=a.contact_fk and a.acct_account_id = t.acct_account_fk
    and not exists(
         select * from acct_payment_link l 
           where orig_acct_payment_fk == acct_account_transaction_id 
           and link_type like 'return_%'
    )
ORDER BY
    t.date_effective DESC

另外,尝试为相关列设置适当的统计目标。 链接到友好的手册: http//www.postgresql.org/docs/current/static/sql-altertable.html


Your problem is here:

->  Nested Loop Left Join  (cost=0.00..22871.67 rows=1 width=16) (actual time=1025.396..221266.357 rows=2593 loops=1)
    Join Filter: ("inner".orig_acct_payment_fk = "outer".acct_account_transaction_id)
    Filter: ("inner".link_type IS NULL)
        ->  Seq Scan on acct_account_transaction t  (cost=0.00..18222.98 rows=1 width=16) (actual time=949.081..976.432 rows=2596 loops=1)
                Filter: ((("type")::text = 'debit'::text) AND ((transaction_status)::text = 'active'::text) AND (date_effective >= '2012-03-01'::date) AND (date_effective   
            Seq Scan on acct_payment_link l  (cost=0.00..4648.68 rows=1 width=15) (actual time=1.073..84.610 rows=169 loops=2596)
                Filter: ((link_type)::text ~~ 'return_%'::text)

It expects to find 1 row in acct_account_transaction, while it finds 2596, and similarly for the other table.

You did not mention Your postgres version (could You?), but this should do the trick:

SELECT DISTINCT
    t.date_effective,
    t.acct_account_transaction_id,
    p.method,
    t.amount,
    c.business_name,
    t.amount
FROM
    contact c inner join contact_role on (c.contact_id=contact_role.contact_fk and contact_role.exchange_fk=74),
    acct_account a, acct_payment p,
    acct_account_transaction t
WHERE
    p.acct_account_transaction_fk=t.acct_account_transaction_id
    and t.type = 'debit'
    and transaction_status = 'active'
    and p.method != 'trade'
    and t.date_effective >= '2012-03-01'
    and t.date_effective < (date '2012-03-01' + interval '1 month')
    and c.contact_id=a.contact_fk and a.acct_account_id = t.acct_account_fk
    and not exists(
         select * from acct_payment_link l 
           where orig_acct_payment_fk == acct_account_transaction_id 
           and link_type like 'return_%'
    )
ORDER BY
    t.date_effective DESC

Also, try setting appropriate statistics target for relevant columns. Link to the friendly manual: http://www.postgresql.org/docs/current/static/sql-altertable.html

相关问答

更多
  • 你好我试着翻译“但是(我们认为)这很难看”我相信我的回答会回答你的问题。 在你提到的同一个问题中,你会发现这个回答 其中用户使用特殊情况变量格式化获取pg-promise 在你的情况下,使用共享连接可能看起来像这样,但在你的例子中我实际上建议使用普通的db.query我只是使用共享连接来向你展示我如何扩展“丑陋”: exports.getByFileNameAndColName = function query(data,cb) { var sco; const params = []; pa ...
  • Redshift Query优化来自表格上的群集,表格设计,数据加载,数据清理和分析。 让我回答上面列表中的一些核心接触点。 1.确保你的桌子mytable,细节,客户端有正确的SORT_KEY,DIST_KEY 2.确保你所有的桌子都是正确的分析和真空。 这是以Redshift格式编写的同一SQL的另一个版本。 我做的很少调整 使用“With Clause”优化集群级别计算 使用加入正确的方式并确保基于数据的左/右连接很重要。 使用date_range和子表表示对象方向的种类。 在下面的主SQL中使用Gr ...
  • 您需要查询目录。 可能是pg_depend: http://www.postgresql.org/docs/current/static/catalog-pg-depend.html 如果您需要它,请不要错过便捷类型转换器,它允许您将表格oid和文本转换为relnames,如下所示: select 'pg_statistics'::regclass; -- 'pg_statistics' select 2619::regclass; -- 'pg_statistics' too, on ...
  • 看看这个SQL小提琴 。 您可以使用->>运算符查询到jsonb。 对于日期/时间,您需要转换结果才能进行比较。 此查询仅返回具有未来开始的值: SELECT when_col->>'start' FROM match WHERE to_date(when_col->>'start','YYYY-MM-DD"T"HH24:MI:SS') >= NOW(); 请注意,您上面提供的JSON格式不正确。 我把它分解成小提琴中的单独的行: CREATE TABLE match ( when_col jsonb ...
  • 这是查询。 您应该pc.ProductId in (1,3)插入所选汽车Ids pc.ProductId in (1,3)的列表,最后您应该将条件更正为选定的汽车数量,因此如果您选择1和3,您应该写入HAVING COUNT(DISTINCT pc.ProductId) = 2如果您选择3辆车,那么必须有3.这样的条件在HAVING给您条件所有车辆都在这些位置: SELECT Id FROM Locations l JOIN Locations_Categories lc on l.Id=lc.Locati ...
  • 在你有Hibernate映射 你没有向你展示Java代码。 我想这一年也是String。 但在数据库中, 年份是一个整数。 解决方案:更改您的映射,如下所示:另请更改您的Java类并将年份定义为整数。 In the Hibernate mapping you have You didn't show you Jav ...
  • 你的问题在这里: -> Nested Loop Left Join (cost=0.00..22871.67 rows=1 width=16) (actual time=1025.396..221266.357 rows=2593 loops=1) Join Filter: ("inner".orig_acct_payment_fk = "outer".acct_account_transaction_id) Filter: ("inner".link_type IS NULL) ...
  • 您可以使用INNER JOIN来优化查询,这样您就不必在不同的属性上两次查询'test_event'表。 SELECT t.value1, t.value2 FROM test_table t, test_event e WHERE e.event_id = 1 AND t.test_id = e.test_id AND t.sequence_id = e.sequence_id 编辑:添加了评论中提供的建议。 SELECT t.value1, t.value2 FROM tes ...
  • 数据库设计 我建议: CREATE TABLE matchversion ( matchversion_id int PRIMARY KEY , matchversion text UNIQUE NOT NULL ); CREATE TABLE matchtype ( matchtype_id int PRIMARY KEY , matchtype text UNIQUE NOT NULL ); CREATE TABLE region ( region_id int PRIMAR ...
  • 使用路径访问运算符可以更快地访问较低级别的对象: SELECT * FROM "accounts" WHERE data #>> '{data, country}' = 'UK' AND data #>> '{data, verified_at}' IS NOT NULL AND data ->> 'lastUpdatedTime' > '2016-02-28T05:49:08.511846' ORDER BY data ->> 'lastUpdatedTime' LIMIT 100 OFFSET ...

相关文章

更多

最新问答

更多
  • 获取MVC 4使用的DisplayMode后缀(Get the DisplayMode Suffix being used by MVC 4)
  • 如何通过引用返回对象?(How is returning an object by reference possible?)
  • 矩阵如何存储在内存中?(How are matrices stored in memory?)
  • 每个请求的Java新会话?(Java New Session For Each Request?)
  • css:浮动div中重叠的标题h1(css: overlapping headlines h1 in floated divs)
  • 无论图像如何,Caffe预测同一类(Caffe predicts same class regardless of image)
  • xcode语法颜色编码解释?(xcode syntax color coding explained?)
  • 在Access 2010 Runtime中使用Office 2000校对工具(Use Office 2000 proofing tools in Access 2010 Runtime)
  • 从单独的Web主机将图像传输到服务器上(Getting images onto server from separate web host)
  • 从旧版本复制文件并保留它们(旧/新版本)(Copy a file from old revision and keep both of them (old / new revision))
  • 西安哪有PLC可控制编程的培训
  • 在Entity Framework中选择基类(Select base class in Entity Framework)
  • 在Android中出现错误“数据集和渲染器应该不为null,并且应该具有相同数量的系列”(Error “Dataset and renderer should be not null and should have the same number of series” in Android)
  • 电脑二级VF有什么用
  • Datamapper Ruby如何添加Hook方法(Datamapper Ruby How to add Hook Method)
  • 金华英语角.
  • 手机软件如何制作
  • 用于Android webview中图像保存的上下文菜单(Context Menu for Image Saving in an Android webview)
  • 注意:未定义的偏移量:PHP(Notice: Undefined offset: PHP)
  • 如何读R中的大数据集[复制](How to read large dataset in R [duplicate])
  • Unity 5 Heighmap与地形宽度/地形长度的分辨率关系?(Unity 5 Heighmap Resolution relationship to terrain width / terrain length?)
  • 如何通知PipedOutputStream线程写入最后一个字节的PipedInputStream线程?(How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?)
  • python的访问器方法有哪些
  • DeviceNetworkInformation:哪个是哪个?(DeviceNetworkInformation: Which is which?)
  • 在Ruby中对组合进行排序(Sorting a combination in Ruby)
  • 网站开发的流程?
  • 使用Zend Framework 2中的JOIN sql检索数据(Retrieve data using JOIN sql in Zend Framework 2)
  • 条带格式类型格式模式编号无法正常工作(Stripes format type format pattern number not working properly)
  • 透明度错误IE11(Transparency bug IE11)
  • linux的基本操作命令。。。