开源项目

知识点

相关文章

更多

最近更新

更多

RabbitMQ Work模式消息队列

2019-03-05 22:54|来源: 网路

一个生产者、多个消费者。 一个消息只能被一个消费者获取。


生产者发布消息

private final static String QUEUE_NAME = "test_queue_work";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    for (int i = 0; i < 50; i++) {
        // 消息内容
        String message = "" + i;
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        Thread.sleep(i * 10);
    }
    channel.close();
    connection.close();
}


消息者1
@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        //休眠
        Thread.sleep(10);
        // 返回确认状态
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


消费者2
@Test
public void testRecv2() throws Exception{
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成状态
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        // 休眠1秒
        Thread.sleep(1000);
        //反馈消息的消费状态
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


为了测试消费者获取到的消息情况,先将两个消费者启动后,再启动生产者
消费者1打印的消息
消费者2打印的消息
[x] Received '0'
[x] Received '2'
[x] Received '4'
[x] Received '6'
[x] Received '8'
[x] Received '10'
[x] Received '12'
[x] Received '14'
[x] Received '16'
[x] Received '18'
[x] Received '20'
[x] Received '22'
[x] Received '24'
[x] Received '26'
[x] Received '28'
[x] Received '30'
[x] Received '32'
[x] Received '34'
[x] Received '36'
[x] Received '38'
[x] Received '40'
[x] Received '42'
[x] Received '44'
[x] Received '46'
[x] Received '48'
[x] Received '1'
[x] Received '3'
[x] Received '5'
[x] Received '7'
[x] Received '9'
[x] Received '11'
[x] Received '13'
[x] Received '15'
[x] Received '17'
[x] Received '19'
[x] Received '21'
[x] Received '23'
[x] Received '25'
[x] Received '27'
[x] Received '29'
[x] Received '31'
[x] Received '33'
[x] Received '35'
[x] Received '37'
[x] Received '39'
[x] Received '41'
[x] Received '43'
[x] Received '45'
[x] Received '47'
[x] Received '49'
测试结果:
   消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
   消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。


但是由于消费者1每次消费后只休息10毫秒,而消费者2每次消费后只休息1秒,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。


此时可以使用channel.basicQos(1);来设置同一时刻服务器只会发一条消息给消费者,解决“能者多劳”问题,以消费者1为例:

@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        //休眠
        Thread.sleep(10);
        // 返回确认状态
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


执行结果:
消费者1打印的消息
消费者2打印的消息
[x] Received '0'
[x] Received '2'
[x] Received '3'
[x] Received '4'
[x] Received '5'
[x] Received '6'
[x] Received '7'
[x] Received '8'
[x] Received '9'
[x] Received '10'
[x] Received '11'
[x] Received '12'
[x] Received '13'
[x] Received '14'
[x] Received '15'
[x] Received '17'
[x] Received '18'
[x] Received '19'
[x] Received '20'
[x] Received '21'
[x] Received '22'
[x] Received '24'
[x] Received '25'
[x] Received '26'
[x] Received '27'
[x] Received '28'
[x] Received '30'
[x] Received '31'
[x] Received '32'
[x] Received '33'
[x] Received '35'
[x] Received '36'
[x] Received '37'
[x] Received '39'
[x] Received '40'
[x] Received '41'
[x] Received '43'
[x] Received '44'
[x] Received '45'
[x] Received '47'
[x] Received '48'
[x] Received '49'
[x] Received '1'
[x] Received '16'
[x] Received '23'
[x] Received '29'
[x] Received '34'
[x] Received '38'
[x] Received '42'
[x] Received '46'


书生整理于网络


相关问答

更多
  • 消息队列的作用:数据分发,缓存数据,一份数据拷贝出N份供别人使用。 适用场景: 常用于一个生产者多个消费者的场景。 也可以是多个消费者一个生产者。 意义:说白了就是产生数据的和消费数据的解耦合。就像是个大鱼塘,放鱼的不用管抓鱼的。抓鱼的也不用管放鱼的。 你的问题: 这种场景通常因业务而产生 这说明你看的知识课本,或者简单的例子。 如果上面的说明还是没有说清楚,那么只能说明你查阅的资料还不够,或者没有专门查他的应用场景。
  • 首先,Amazon SQS是一个伪队列,这意味着每个消息(如果它到达队列)的传递都是有保证的,但不是通常在队列中发生的FIFO方式。 如果消息的顺序对您很重要,并且您希望队列以FIFO方式工作,则Amazon SQS文档会指出在您的应用程序逻辑中处理此问题,因为来自Amazon SQS的消息将无序地传达给您。 与此相比,据我所知,你可以在RabbitMQ中实现工作队列。 如果这样可以避免在应用程序级别实现队列消息排序,那么这将是更可取的选择。 以下是一些可帮助您决定选择哪一个的因素: 队列消息序列如上所述。 ...
  • 你是对的,你根本不需要芹菜。 当你设计分布式系统时,有很多选择,没有正确的方法去做适合所有情况的事情。 许多人发现,让消息消费者池等待消息出现在他们的队列中,做一些工作并在工作完成时发送消息更加灵活。 Celery是一个将一大堆东西封装在一个包中的框架,但如果你不需要整个包,那么最好是设置RabbitMQ并且在没有所有复杂的情况下实现你所需要的。 另外,除了Celery实现的任务队列场景外,RabbitMQ还可以用于更多场景。 但是,如果你选择芹菜,那么再考虑RabbitMQ。 Celery的消息排队模型非 ...
  • 如果要支持ACK功能,则必须将接收器队列声明为AutoDelete = false 。 这是C#中的一个示例(可能与Java有很小的差异) private bool PushDataToQueue(byte[] data, string queueName, ref string error) { try { if (_connection == null || !_connection.IsOpen) _connection = _factory.C ...
  • 您可以使用路由密钥,通过创建交换机和绑定到交换机的名为Device60的队列。 channel.queueBind("Device60","myexchange","device.60") 此时,您可以使用路由键发布消息。 例如: channel.basicPublish("myexchange","device.60",mymessage); 现在, Device60将接收消息并存储它们,直到消费者在线。 对于其他设备,它是相同的: channel.queueBind("Device50","mye ...
  • 要删除所有任务,您应该为正在使用的客户端库中实现的每个队列queue.purge方法调用。 要从队列异步获取消息,您应该使用basic.consume方法。 To delete all tasks you should call for each queue queue.purge method implemented in client library you are using. To asynchronously get messages from queue you should use basic ...
  • 我每次都在重新创建频道,因此对BasicGet的调用和对BasicAck的调用都在不同的频道上。 一旦我重新使用频道,消息就成功地通过Ready - > Uacked - > Off the Q. FWIW DeliveryTag仍为1 除非有人想告诉我我不应该重复使用频道,否则我认为现在已经回答了。 我知道重用Channels可能存在多线程问题。 谢谢 拍 I was recreating Channel every time so the call to BasicGet and the call to ...
  • GAE与RabbitMQ不具有可比性,因为您的问题几乎没有意义。 RabbitMQ只是消息传递,您很可能必须设置更多基础架构来支持您的用例。 GAE是一个完整的基础设施。 也就是说,RabbitMQ每秒可以处理大量的消息。 也许你想为手机游戏连接到一个API,以及一些应用程序逻辑来处理这些消息。 您不会让游戏客户端连接到您的RabbitMQ服务器并直接与其他客户端通信。 在GAE上,您通常会有请求并立即回复。 如果您需要持续时间较长的连接并允许您将消息推送到游戏客户端(不使用APN等),您可能需要考虑XMP ...
  • SignalR提供的SignalR客户端库可能具有一旦失去连接就重新连接的默认行为(JS肯定会这样做)。 因此,拥有池回收只会让您的客户端重新连接到您的服务器,在中断之前再次收到消息。 SignalR client libraries provided by the SignalR might have a default behavior of reconnecting once loosing connection (the JS one definitely does). So having an P ...
  • 不,你做不到。 这些都是消费政策。 也许你可以停止发布。 另请阅读以下主题: https : //groups.google.com/forum/#!topic /rabbitmq-users / 68-DPZN4b_Q No you can't do it. These are consuming policies. Maybe you can stop the publish. Read also thread about: https://groups.google.com/forum/#!topic ...