开源项目

知识点

相关文章

更多

最近更新

更多

RabbitMQ 路由模式(Routing)-使用 direct Exchange

2019-03-06 22:33|来源: 网路


任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1、处理路由键
2、需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
3、一般情况可以使用rabbitMQ自带的Exchange:" "(该Exchange的名字为空字符串,下文称其为default Exchange)。
4、这种模式下不需要将Exchange进行任何绑定(binding)操作
5、消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
6、如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);



生产者

1、创建一个 direct模式的exchange(交换机)
channel.exchange_declare(exchange='direct_logs',type='direct')
2、向交换机中发送消息
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
private final static String EXCHANGE_NAME = "test_exchange_direct";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明exchange
    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    // 消息内容
    String message = "这是消息A";
    channel.basicPublish(EXCHANGE_NAME, "A", null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
}


消息者
1、声明队列
channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
2、创建消息与交换机的绑定
channel.queue_bind(exchange=exchange_name,queue=queue_name)
channel.queue_bind(exchange=exchange_name,queue=queue_name, routing_key='black')
3、生产者发出消息后,匹配的路由key会接收到消息


消费者1

private final static String QUEUE_NAME_1 = "test_queue_direct_1";
@Test
public void testRecv1() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "A");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_1, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


消息者2
private final static String QUEUE_NAME_2 = "test_queue_direct_2";
@Test
public void testRecv2() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
    // 绑定队列到交换机
    channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "B");
    channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "A");
    // 同一时刻服务器只会发一条消息给消费者
    channel.basicQos(1);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列,手动返回完成
    channel.basicConsume(QUEUE_NAME_2, false, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
        Thread.sleep(10);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
}


书生整理于网络


相关问答

更多
  • 在绑定和路由键中找不到您的最佳选择。 相反,我会看看备用交易所。 https://www.rabbitmq.com/ae.html 使用AE,您基本上可以为任何与发布的交换中的某个绑定不匹配的消息进行“全部捕获”交换。 在您的情况下,您的交换将只有all.specific绑定将消息发送到specific队列。 备用交换将直接将消息传递到generic队列。 这可以通过主题来完成,以重用相同的路由键。 或者它可以作为扇出交换完成,并且只有一个队列绑定它。 your best option for this w ...
  • 你rabbitmq的参数似乎不够,用户名,密码和端口尚未配置。 您可以配置两个输出,一个是rabbitmq,另一个是文件,用于放置日志的创建,日志存储是可以的。 注意logstash的版本(log stash,rabbitmq插件),它在我的试用之前给了我很多麻烦(登录到另一个redis服务器等)。 你可以调试rabbitmq的日志。 ps -ef|grep erl你可以在参数中找到日志文件的路径。 确保启用了rabbitmq的web管理器插件,并正确配置了防火墙,然后打开rabbitmq的web管理器,i ...
  • 有没有办法如何进行排除模式匹配 不。 路由键只匹配,不排除 Is there any way how to make exclude pattern matching nope. routing keys only match, not exclude
  • 如果要捕获与任何绑定不匹配的所有消息,可以使用备用Exchange完成。 为现有交换添加备用交换并从备用交换中收集所有消息: standard workflow --> [main exchange (topic)] | --> via binding *.foo --> [foo queue] | --> via binding *.bar --> [bar queue] ...
  • 通常,发件人不需要知道交换的类型,但发送到扇出意味着路由密钥被忽略,因此可以是任何值。 您可以按如下方式申报交易所: @Bean public FanoutExchange exchange() { return new FanoutExchange("logs"); } 如果有@RabbitAdmin bean,交换将根据bean类型自动声明。 Spring Boot自动配置管理员; 如果你不使用引导,你需要声明你自己的。 In general, the sender doesn't need ...
  • 不,这是不可配置的,但是也存在一个默认的扇出交换。 No, this isn't configurable, but there is a default fanout exchange that is also created.
  • 无论使用路由密钥还是绑定,扇出交换都将始终向每个绑定队列传递消息。 在扇出交换中完全忽略路由密钥。 根据您的需求描述,您所寻找的不是粉丝交换。 如果要根据所使用的路由密钥将消息传递到特定队列,则需要使用直接交换(用于路由密钥的简单匹配)或主题交换(用于复杂的路由密钥模式匹配) a fanout exchange will always deliver a message to every bound queue, no matter the routing key used or binding. the ...
  • 这可以通过直接或主题交换来完成,并且在您的交换中需要4个绑定。 假设以名为“MainEx”的交换机为例,路由键将如下设置: | exchange | binding | queue | | -------- | ---------------- | ------ | | MainEx | MainRoute.Route1 | Queue1 | | MainEx | MainRoute.Route2 | Queue2 | | MainEx | MainRoute ...
  • 您可以通过使用单个QueueA以及订阅此队列的多个使用者来实现您的目标: Direct exchange | |-- ["TypeA"]--> QueueA | |-- Consumer A1 | `-- Consumer A2 | `-- ["TypeB"]--> QueueB 在这种情况下, QueueA排队的消息将仅传递给一个消费者。 然而,获取消息的消费者是未定义的:它们以循环方式被选中。 You can achieve what you ...
  • 总是。 实际上,即使队列严格地是消费者方实体,它们也应该在生成交换时被生产者声明并绑定到直接交换。 Always. In fact, even though queues are strictly a consumer-side entity, they should be declared & bound to the direct exchange by the producer(s) at the time they create the exchange.