开源项目

知识点

相关文章

更多

最近更新

更多

RabbitMQ 发布订阅模式(Publish、Subscribe)

2019-03-05 23:27|来源: 网路

1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的


消息生产者

private final static String EXCHANGE_NAME = "test_exchange_fanout";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    // 消息内容
    String message = "订阅消息";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
}

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为交换机没有存储消息的能力,消息只能存在在队列中。


消息消费者

private final static String QUEUE_NAME_1 = "test_queue_exchange_1";
private final static String QUEUE_NAME_2 = "test_queue_exchange_2";
@Test
public void testRecv1() throws Exception {
    recv(QUEUE_NAME_1,EXCHANGE_NAME);
}
@Test
public void testRecv2() throws Exception {
    recv(QUEUE_NAME_2,EXCHANGE_NAME);
}
public void recv(String queueName,String exchangeName) throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(queueName, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(queueName, exchangeName, "");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(queueName, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


注:如果交换机没有启动起来,会报异常
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test_exchange_fanout' in vhost '/testvhost', class-id=50, method-id=20)
启动消费者之后, 在管理后台可以看到队列和交换机的绑定关系:

生产者再发一次消息,两个消费者都能获取该消息。


书生整理于网络


相关问答

更多
  • 艾米, 我认为您的疑问与RabbitMQ可用的消息分发方式(或模式)和交换类型有关。 因此,我将尝试用一个简短的解释来覆盖它们,您可以决定哪种方法最适合您的场景(RabbitMQ教程以另一种方式解释)。 工作队列 使用默认交换和绑定密钥,您可以直接在队列中发布消息。 一旦消息到达队列,消费者“竞争”以获取消息,这意味着消息不被传递给多个消费者。 如果有多个消费者收听单个队列,则消息将以循环方式传递。 当您有工作要做并希望轻松扩展多个服务器/进程时,请使用此方法。 发布/订阅 在此模型中,一条发送的消息可能会 ...
  • 通常,如果您想确定发布,可以使用发布确认 。 您可以阅读,您可以使用事务或使用ConfirmListener执行此操作。 要处理代理失败,您可以在本地存储失败的消息,然后尝试在x秒后重新发送它们。 注意:我不知道EasyNetQ是如何工作的,但本机发布是异步的,因此陷阱发布异常是不够的。 这就是为什么你应该使用发布确认来处理它。 In general if you want to be sure about the publish you can use the publish confirmation. ...
  • 如果要支持ACK功能,则必须将接收器队列声明为AutoDelete = false 。 这是C#中的一个示例(可能与Java有很小的差异) private bool PushDataToQueue(byte[] data, string queueName, ref string error) { try { if (_connection == null || !_connection.IsOpen) _connection = _factory.C ...
  • 您可以使用以下一项或多项功能查找所有信息 命令行工具rabbitmqctl rabbitmq http api rabbitmq管理(web ui) There is a library to get information from rabbitmq http api https://github.com/rabbitmq/hop
  • 背景 我最初希望发布和订阅消息和队列持久性。 这在理论上并不完全适合发布和订阅: 这种模式并不关心消息是否被接收。 发布商只是鼓励消息,如果有任何订阅者倾听,好,否则它不在乎。 事实上,考虑到我的需求,我需要更多的工作队列模式,甚至是RPC模式。 分析 人们说这两者应该很容易,但这确实是主观的。 RabbitMQ总体上有更好的官方文档,大多数语言都有清晰的例子,而Redis的信息主要在第三方博客和稀疏的github回购中 - 这使得它很难找到。 至于这些例子,RabbitMQ有两个例子可以清楚地回答我的问题 ...
  • 经过您的问题域后,我理解的是 - 在运行时,多个客户端将向RabbitMQ发送“set”和“get”消息,并且每个“set”消息将由当时活动的每个服务器缓存处理。 并且“get”消息需要由任何一个服务器缓存处理,并且需要将响应消息发送回发送“get”消息的客户端。 如果我错了,请纠正我。 在这种情况下,假设客户端将有单独的触发点用于产生/发布“获取”/“设置”消息是公平的。 因此,逻辑上“获取”消息生成者和“设置”消息“发布者将是两个单独的程序/类。 因此,您对pub / sub和RPC模型的选择看起来很合 ...
  • 没有。 ServiceStack的RabbitMq支持通常基于类型名称,并且可以用作工作队列。 它被设计为无配置且易于使用,因此自动处理要使用的交换,路由密钥和队列名称的详细信息。 如果您需要高级或自定义配置,最好直接使用底层RabbitMQ.Client 。 There isn't. ServiceStack's RabbitMq support is conventionally based on Type names and is opinionated to function as a work q ...
  • 这取决于你对'可扩展性'的意思。 RabbitMQ旨在实现高稳定性和可扩展性。 还有ZeroMQ ,专为高性能(但功能较少)而设计。 您可以看到ZeroMQ 的性能测试 。 It depends on what do you mean about 'scalability'. RabbitMQ is designed for high stability with scalability. There is also ZeroMQ, designed for very high performance (b ...
  • SignalR提供的SignalR客户端库可能具有一旦失去连接就重新连接的默认行为(JS肯定会这样做)。 因此,拥有池回收只会让您的客户端重新连接到您的服务器,在中断之前再次收到消息。 SignalR client libraries provided by the SignalR might have a default behavior of reconnecting once loosing connection (the JS one definitely does). So having an P ...
  • RabbitMQ团队监视这个邮件列表,并且有时只回答StackOverflow的问题。 如果您希望使用Pika库的使用者接收MQTT消息,那么使用者必须订阅正在发布MQTT消息的相应队列。 有关MQTT和AMQP如何互操作的全面文档可在此处找到 。 然后你说“我想将相同的信息发布到我自己的交易所”。 如果您希望使用自己的交换机而不是amq.topic ,请参阅本文档的“自定义交换”部分。 您必须在rabbitmq.config文件中指定交换的名称,并在发布任何消息之前创建交换。 请注意,此自定义交换必须是主 ...