开源项目

知识点

相关文章

更多

最近更新

更多

RabbitMQ 简单队列

2019-03-05 22:34|来源: 网路

生产者将消息发送到队列,消费者从队列中获取消息。


P:消息的生产者
C:消息的消费者
红色:队列
首先引用rabbitmq的客户端程序所依赖的jar包:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.2</version>
</dependency>


获取rabbitmq连接
public static Connection getConnection() throws Exception {
    //定义连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //设置服务地址
    factory.setHost("localhost");
    //端口
    factory.setPort(5672);
    //设置账号信息,用户名、密码、vhost
    factory.setVirtualHost("/testvhost");
    factory.setUsername("test");
    factory.setPassword("test");
    // 通过工程获取连接
    Connection connection = factory.newConnection();
    return connection;
}



生产者发送消息

private final static String QUEUE_NAME = "test_queue";
@Test
public void testSend() throws Exception {
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    // 从连接中创建通道
    Channel channel = connection.createChannel();
    // 声明(创建)队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 消息内容
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    //关闭通道和连接
    channel.close();
    connection.close();
}


执行流程
1、获取一个mq连接
Connection connection = ConnectionUtil.getConnection();
2、根据连接创建通道
Channel channel = connection.createChannel();
3、声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
4、消息内容
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
点击上面的队列名称,查询具体的队列中的信息


消费者从队列中获取消息

@Test
public void testRecv() throws Exception{
    // 获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    // 监听队列
    channel.basicConsume(QUEUE_NAME, true, consumer);
    // 获取消息
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");
    }
}


书生整理于网络


相关问答

更多
  • 消息队列的作用:数据分发,缓存数据,一份数据拷贝出N份供别人使用。 适用场景: 常用于一个生产者多个消费者的场景。 也可以是多个消费者一个生产者。 意义:说白了就是产生数据的和消费数据的解耦合。就像是个大鱼塘,放鱼的不用管抓鱼的。抓鱼的也不用管放鱼的。 你的问题: 这种场景通常因业务而产生 这说明你看的知识课本,或者简单的例子。 如果上面的说明还是没有说清楚,那么只能说明你查阅的资料还不够,或者没有专门查他的应用场景。
  • 首先,Amazon SQS是一个伪队列,这意味着每个消息(如果它到达队列)的传递都是有保证的,但不是通常在队列中发生的FIFO方式。 如果消息的顺序对您很重要,并且您希望队列以FIFO方式工作,则Amazon SQS文档会指出在您的应用程序逻辑中处理此问题,因为来自Amazon SQS的消息将无序地传达给您。 与此相比,据我所知,你可以在RabbitMQ中实现工作队列。 如果这样可以避免在应用程序级别实现队列消息排序,那么这将是更可取的选择。 以下是一些可帮助您决定选择哪一个的因素: 队列消息序列如上所述。 ...
  • 你是对的,你根本不需要芹菜。 当你设计分布式系统时,有很多选择,没有正确的方法去做适合所有情况的事情。 许多人发现,让消息消费者池等待消息出现在他们的队列中,做一些工作并在工作完成时发送消息更加灵活。 Celery是一个将一大堆东西封装在一个包中的框架,但如果你不需要整个包,那么最好是设置RabbitMQ并且在没有所有复杂的情况下实现你所需要的。 另外,除了Celery实现的任务队列场景外,RabbitMQ还可以用于更多场景。 但是,如果你选择芹菜,那么再考虑RabbitMQ。 Celery的消息排队模型非 ...
  • 如果您不关心其他队列,则可以通过命令行通过以下命令运行以下命令来删除所有队列: 警告:这也将删除您的兔子服务器上配置的任何用户和vhost rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmq文档说reset命令: 从其所属的任何集群中删除节点,从管理数据库中删除所有数据,例如已配置的用户和vhosts,并删除所有持久消息。 所以,请小心使用它。 If you do not care about the other qu ...
  • 如果要支持ACK功能,则必须将接收器队列声明为AutoDelete = false 。 这是C#中的一个示例(可能与Java有很小的差异) private bool PushDataToQueue(byte[] data, string queueName, ref string error) { try { if (_connection == null || !_connection.IsOpen) _connection = _factory.C ...
  • 您可以使用路由密钥,通过创建交换机和绑定到交换机的名为Device60的队列。 channel.queueBind("Device60","myexchange","device.60") 此时,您可以使用路由键发布消息。 例如: channel.basicPublish("myexchange","device.60",mymessage); 现在, Device60将接收消息并存储它们,直到消费者在线。 对于其他设备,它是相同的: channel.queueBind("Device50","mye ...
  • 您可以使用Apache Camel轻松桥接代理。 Camel具有RabbitMQ和ActiveMQ的组件,允许您定义桥接要与之共享消息的目标的路由。 Camel网站有很多文档可以帮助您入门。 You can easily bridge brokers using Apache Camel. Camel has components for both RabbitMQ and ActiveMQ that would allow you to define a route that bridges the de ...
  • 要删除所有任务,您应该为正在使用的客户端库中实现的每个队列queue.purge方法调用。 要从队列异步获取消息,您应该使用basic.consume方法。 To delete all tasks you should call for each queue queue.purge method implemented in client library you are using. To asynchronously get messages from queue you should use basic ...
  • 你可以重新声明队列,而不是被动。 channel.queueDeclare 如果队列已经存在,它将不执行任何操作,或者它将创建队列。 但至于检查它是否存在,而不重新创建它,queueDeclarePassive是你唯一真正的选择。 you could redeclare the queue, not passively. channel.queueDeclare this will either do nothing if the queue already exists, or it will creat ...
  • 因此,这似乎有点困难,答案与我没有在问题中列出的部分路线有关,因为我认为这并不相关。 路线始于RabbitMq终点(不包括上面)。 因此,交换机在到达时设置了一些兔头标头:rabbitmq.ROUTING_KEY rabbitmq.EXCHANGE_NAME rabbitmq.DELIVERY_TAG 这些头文件在Route的整个生命周期中都会使用,并且当我试图在不同的Rabbit端点发布时,这些头文件会覆盖这些值。 我已经解决的方法是引入一个将头部去掉的bean。 在我看来,这不是理想行为...... p ...