首页 \ 问答 \ 生产者消费者 - ExecutorService和ArrayBlockingQueue(producer consumer - ExecutorService & ArrayBlockingQueue)

生产者消费者 - ExecutorService和ArrayBlockingQueue(producer consumer - ExecutorService & ArrayBlockingQueue)

我想通过使用ExecutorService和ArrayBlockingQueue来了解我对生产者消费者设计的理解是否正确。 我知道有不同的方法来实现这个设计,但我想,最后,它取决于问题本身。

我不得不面对的问题是:我有一个生产者从大文件中读取(6GB); 它逐行读取并将每一行转换为一个对象。 它将对象放在ArrayBlockingQueue中。

消费者(少数)从ArrayBlockingQueue中获取对象并将其持久保存到数据库中。

现在,显然生产者比消费者快得多; 将每一行转换为对象需要几分之一秒,但对于消费者来说需要更长的时间。

所以...如果我希望通过这样做来加速这个过程:我创建了两个分类为'ProducerThread'和'ConsumerThread'的共享ArrayBlockingQueue。 在2之间协调的线程如下所示:

@Override
public void run()
{
    try{

        ArrayBlockingQueue<Ticket> queue = new ArrayBlockingQueue<Ticket>(40);
        ExecutorService threadPool = Executors.newFixedThreadPool(8);

        threadPool.execute(new SaleConsumerThread("NEW YORK", queue)); 
        threadPool.execute(new SaleConsumerThread("PARIS", queue));
        threadPool.execute(new SaleConsumerThread("TEL AVIV", queue));
        threadPool.execute(new SaleConsumerThread("HONG KONG", queue));
        threadPool.execute(new SaleConsumerThread("LONDON", queue));
        threadPool.execute(new SaleConsumerThread("BERLIN", queue));
        threadPool.execute(new SaleConsumerThread("AMSTERDAM", queue));

        Future producerStatus = threadPool.submit(new SaleProducerThread(progressBar, file, queue)); 
        producerStatus.get(); 
        threadPool.shutdown();   

    }catch(Exception exp)
    {
        exp.printStackTrace();
    }
}

我的问题是:

  1. 上面的设计是否会同时使用每个线程? 我的电脑是两个2.4GHz四核。

  2. 我不确定Future和.get()的用途是什么?

顺便说一句,结果是快速的(考虑第一个版本是连续的,需要3小时),现在需要约40分钟(但可能还有改进的余地)。

谢谢你的指针


I wish to know if my understanding of the producer consumer design is correct by using the ExecutorService & ArrayBlockingQueue. I understand there are different ways to implement this design but I guess, at the end, it depends on the problem itself.

The problem I had to confront is this: I have a ONE producer who reads from a big file (6GB); it reads line by line and converts every line to an object. It places the object in an ArrayBlockingQueue.

The consumers (few) take the object from the ArrayBlockingQueue and persist this to the database.

Now, obviously the producer is much faster than the consumer; it takes fractions of seconds to convert each line to an object but for the consumers it takes longer time.

So...if I wish to speedup this process by doing this: I created 2 classed 'ProducerThread' and 'ConsumerThread' they share the ArrayBlockingQueue. The Thread that coordinate between the 2 looks like this:

@Override
public void run()
{
    try{

        ArrayBlockingQueue<Ticket> queue = new ArrayBlockingQueue<Ticket>(40);
        ExecutorService threadPool = Executors.newFixedThreadPool(8);

        threadPool.execute(new SaleConsumerThread("NEW YORK", queue)); 
        threadPool.execute(new SaleConsumerThread("PARIS", queue));
        threadPool.execute(new SaleConsumerThread("TEL AVIV", queue));
        threadPool.execute(new SaleConsumerThread("HONG KONG", queue));
        threadPool.execute(new SaleConsumerThread("LONDON", queue));
        threadPool.execute(new SaleConsumerThread("BERLIN", queue));
        threadPool.execute(new SaleConsumerThread("AMSTERDAM", queue));

        Future producerStatus = threadPool.submit(new SaleProducerThread(progressBar, file, queue)); 
        producerStatus.get(); 
        threadPool.shutdown();   

    }catch(Exception exp)
    {
        exp.printStackTrace();
    }
}

My questions are:

  1. Would the design above actually use each thread concurrently? My computer is Two 2.4GHz Quad-Core.

  2. I'm not sure what does the Future and the .get() are for?

The result, by the way, are fast (consider the first version was sequential and it took 3hr) now it takes ~40 min (but maybe there're room for improvement).

Thanks for any pointer


原文:https://stackoverflow.com/questions/10392855
更新时间:2023-10-23 19:10

最满意答案

我从文档中了解到:

limit参数控制模式的应用次数,因此会影响结果数组的长度。 如果限制n大于零,那么模式将最多应用n - 1次,数组的长度将不大于n,并且数组的最后一个条目将包含超出最后一个匹配分隔符的所有输入 。 如果n是非正数,那么模式将被应用尽可能多的次数,并且数组可以具有任何长度。 如果n为零,那么模式将被应用尽可能多的次数,该数组可以具有任何长度,并且将丢弃尾随的空字符串。

这意味着在字符串s上设计或剪切到n次,因此让我们逐一分析以更好地理解:

限制1

String[] spl1 = str.split("o", 1);

这意味着将它分割或剪切在字符串o上的一个字符串上,在这种情况下,您将获得所有输入:

[boo:and:foo]
 1

限制2

String[] spl1 = str.split("o", 2);

这意味着在o上减少一次,所以我会在第一个o中断

    boo:and:foo
-----^

在这种情况下,您将得到两个结果:

[b,o:and:foo]
 1 2

限制3

String[] spl1 = str.split("o", 3);

这意味着在第一个o和第二个o上切两次

    boo:and:foo
1----^^--------------2

在这种情况下,您将得到三个结果:

[b, ,:and:foo]
 1 2  3

限制4

String[] spl1 = str.split("o", 4);

这意味着在第一,第二和第三o上三次削减它

     boo:and:foo
1_____^^      ^
       |___2  |___3

在这种情况下,您将得到四个结果:

[b, ,:and:f,o]
 1 2 3      4

限制5

String[] spl1 = str.split("o", 5);

这意味着在第一,第二,第三和第四次切割它四次

     boo:and:foo
1_____^^      ^^
       |___2  ||___4
              |____3

在这种情况下,您将获得五个结果:

[b, ,:and:f, , ]
 1 2  3     4 5

只是一个简单的动画来了解更多:

split()方法实际上如何工作?


What i understand from the documentation :

The limit parameter controls the number of times the pattern is applied and therefore affects the length of the resulting array. If the limit n is greater than zero then the pattern will be applied at most n - 1 times, the array's length will be no greater than n, and the array's last entry will contain all input beyond the last matched delimiter. If n is non-positive then the pattern will be applied as many times as possible and the array can have any length. If n is zero then the pattern will be applied as many times as possible, the array can have any length, and trailing empty strings will be discarded.

This mean devise or cut it to n time on string s, so Lets analyse one by one to understand better :

Limit 1

String[] spl1 = str.split("o", 1);

This mean split it or cut it on just one string on the string o in this case you will get all your input :

[boo:and:foo]
 1

Limit 2

String[] spl1 = str.split("o", 2);

Which mean cut it one time on o so i will put a break in the first o

    boo:and:foo
-----^

in this case you will get two results :

[b,o:and:foo]
 1 2

Limit 3

String[] spl1 = str.split("o", 3);

Which mean cut it two times on the first o and on the second o

    boo:and:foo
1----^^--------------2

in this case you will get three results :

[b, ,:and:foo]
 1 2  3

Limit 4

String[] spl1 = str.split("o", 4);

Which mean cut it three times on the first, second and third o

     boo:and:foo
1_____^^      ^
       |___2  |___3

in this case you will get four results :

[b, ,:and:f,o]
 1 2 3      4

Limit 5

String[] spl1 = str.split("o", 5);

Which mean cut it four times on first, second, third and forth o

     boo:and:foo
1_____^^      ^^
       |___2  ||___4
              |____3

in this case you will get five results :

[b, ,:and:f, , ]
 1 2  3     4 5

Just a simple animation to understand more :

How split() method actually works?

相关问答

更多

相关文章

更多

最新问答

更多
  • 获取MVC 4使用的DisplayMode后缀(Get the DisplayMode Suffix being used by MVC 4)
  • 如何通过引用返回对象?(How is returning an object by reference possible?)
  • 矩阵如何存储在内存中?(How are matrices stored in memory?)
  • 每个请求的Java新会话?(Java New Session For Each Request?)
  • css:浮动div中重叠的标题h1(css: overlapping headlines h1 in floated divs)
  • 无论图像如何,Caffe预测同一类(Caffe predicts same class regardless of image)
  • xcode语法颜色编码解释?(xcode syntax color coding explained?)
  • 在Access 2010 Runtime中使用Office 2000校对工具(Use Office 2000 proofing tools in Access 2010 Runtime)
  • 从单独的Web主机将图像传输到服务器上(Getting images onto server from separate web host)
  • 从旧版本复制文件并保留它们(旧/新版本)(Copy a file from old revision and keep both of them (old / new revision))
  • 西安哪有PLC可控制编程的培训
  • 在Entity Framework中选择基类(Select base class in Entity Framework)
  • 在Android中出现错误“数据集和渲染器应该不为null,并且应该具有相同数量的系列”(Error “Dataset and renderer should be not null and should have the same number of series” in Android)
  • 电脑二级VF有什么用
  • Datamapper Ruby如何添加Hook方法(Datamapper Ruby How to add Hook Method)
  • 金华英语角.
  • 手机软件如何制作
  • 用于Android webview中图像保存的上下文菜单(Context Menu for Image Saving in an Android webview)
  • 注意:未定义的偏移量:PHP(Notice: Undefined offset: PHP)
  • 如何读R中的大数据集[复制](How to read large dataset in R [duplicate])
  • Unity 5 Heighmap与地形宽度/地形长度的分辨率关系?(Unity 5 Heighmap Resolution relationship to terrain width / terrain length?)
  • 如何通知PipedOutputStream线程写入最后一个字节的PipedInputStream线程?(How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?)
  • python的访问器方法有哪些
  • DeviceNetworkInformation:哪个是哪个?(DeviceNetworkInformation: Which is which?)
  • 在Ruby中对组合进行排序(Sorting a combination in Ruby)
  • 网站开发的流程?
  • 使用Zend Framework 2中的JOIN sql检索数据(Retrieve data using JOIN sql in Zend Framework 2)
  • 条带格式类型格式模式编号无法正常工作(Stripes format type format pattern number not working properly)
  • 透明度错误IE11(Transparency bug IE11)
  • linux的基本操作命令。。。