开源项目

知识点

相关文章

更多

最近更新

更多

RabbitMQ topic Exchange使用

2019-03-06 22:38|来源: 网路



 任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
 1、这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
 2、这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
 3、在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
 4、“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
 5、同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。


生产者
private final static String EXCHANGE_NAME = "test_exchange_topic";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    // 消息内容
    String message = "用户数据操作";
    channel.basicPublish(EXCHANGE_NAME, "user.insert", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
}


消费者1
private final static String QUEUE_NAME_1 = "test_queue_topic_1";
@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "user.update");
    channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "user.delete");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_1, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


消费者2
private final static String QUEUE_NAME_2 = "test_queue_topic_2";
@Test
public void testRecv2() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "user.#");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_2, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


书生整理于网络


相关问答

更多
  • 在绑定和路由键中找不到您的最佳选择。 相反,我会看看备用交易所。 https://www.rabbitmq.com/ae.html 使用AE,您基本上可以为任何与发布的交换中的某个绑定不匹配的消息进行“全部捕获”交换。 在您的情况下,您的交换将只有all.specific绑定将消息发送到specific队列。 备用交换将直接将消息传递到generic队列。 这可以通过主题来完成,以重用相同的路由键。 或者它可以作为扇出交换完成,并且只有一个队列绑定它。 your best option for this w ...
  • 如果没有连接的消费者可以在发布消息时处理消息,则需要持久的队列来存储消息。 交换不存储消息,但队列可以。 令人困惑的部分是交易所可以被标记为“持久”,但是真正意义的是,如果您重新启动经纪人, 交易所本身将仍然存在,但并不意味着发送到该交易所的任何消息将自动持续存在。 鉴于此,有两个选择: 在您启动发布商之前自行创建队列,执行管理步骤 。 您可以使用Web UI或命令行工具来执行此操作。 确保将其创建为耐用队列,以便即使没有活动的消费者,它也会存储路由到它的消息。 假设您的消费者被编码为在启动时始终声明(并因 ...
  • 这不是什么topic ,所以如果你必须使用这种类型的交换,你不能从绑定中排除一个路由。 您可能会发现rtopic exchange有用: 我们的想法是能够在发布消息时指定路由模式。 使用默认主题交换模式仅在将队列绑定到交换时才被接受。 It is not what topic do, so if you have to use this type of exchange, you can't exclude one route from binding. You may find rtopic exchan ...
  • 现在可以'foo'从'bar'拉出来? 是 - “foo”创建与节点A的连接并从队列中消耗,就像任何其他消费者一样。 RabbitMQ是一种代理模型,通常有一个中央服务器或集群,所有消息生产者和消费者都使用它。 您不需要每个应用程序的rabbitmq实例。 Now can 'foo' pull from 'bar'? yes - "foo" creates a connection to Node A and consumes from a queue, just like any other consum ...
  • 如果要捕获与任何绑定不匹配的所有消息,可以使用备用Exchange完成。 为现有交换添加备用交换并从备用交换中收集所有消息: standard workflow --> [main exchange (topic)] | --> via binding *.foo --> [foo queue] | --> via binding *.bar --> [bar queue] ...
  • 我期望实施 其中三人得到两个消息。 那么简单地使用主题交换并让每个消费者用适当的路由密钥声明它自己的队列。 当你发布的时候,你正在发布一个话题交换(我们称之为E1),例如'request.user.add'和所有绑定到E1的队列都使用匹配的路由密钥(因为我们在这里讨论话题)会得到消息。 或者可以这样说:一条消息从一个队列中消耗一次,并且从一次交换中消耗多少次与队列绑定的消息(使用适当的路由密钥)。 @hirikarate后编辑添加解决方案的问题 那么,我不使用JavaScript,但我会尽力帮助:)与exc ...
  • 每个队列也绑定到默认交换(“”),路由密钥等于队列名称。 您可以简单地将过期的死信直接路由到原始队列。 Every queue is also bound to the default exchange ("") with the routing key equal to the queue name. You can simply route your expired dead letters directly to the original queue.
  • 这可以通过直接或主题交换来完成,并且在您的交换中需要4个绑定。 假设以名为“MainEx”的交换机为例,路由键将如下设置: | exchange | binding | queue | | -------- | ---------------- | ------ | | MainEx | MainRoute.Route1 | Queue1 | | MainEx | MainRoute.Route2 | Queue2 | | MainEx | MainRoute ...
  • 显然,仅使用AMQP无法执行此任务。 我不得不使用RabbitMQ的API。 向以下地址发出GET请求: http://rabbitmq_hostname:15672/api/exchanges/%2F/worker/bindings/source返回vhost /中交换“worker”的每个使用者。 (编码为%2F) 通过单独的队列获取消费者的数量很简单: jsonResponse.Where(o => o["destination_type"] == "queue").Count(); 如果我向主题交 ...
  • 在:topic-exchange上有一个internal属性,默认情况下为false 。 另一方面,有auto-declare尝试不要声明它。 只是因为无论如何那个交易所都在Broker上。 There is an internal attribute for you on the :topic-exchange which is false by default. On the other hand there is auto-declare to try do not declare it at all ...