JMS&ActiveMQ实战- JMSCorrelationID与Selector

2019-03-28 00:24|来源: 网络

前面讲过JMSCorrelationID主要是用来关联多个Message,例如需要回复一个消息的时候,通常把回复的消息的 JMSCorrelationID设置为原来消息的ID。在下面这个例子中,创建了三个消息生产者A,B,C和三个消息消费者A,B,C。生产者A给消费者A发送一个消息,同时需要消费者A给它回复一个消息。B、C与A类似。


简图如下:

   生产者A-----发送----〉消费者A-----回复------〉生产者A

   生产者B-----发送----〉消费者B-----回复------〉生产者B

   生产者C-----发送----〉消费者C-----回复------〉生产者C

 

需要注意的是,所有的发送和回复都使用同一个Queue,通过Selector区分。

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
 
public class JMSCorrelationIDTest {
    
    private Queue queue;
    private Session session;
    
    public JMSCorrelationIDTest() throws JMSException{
        ActiveMQConnectionFactory factory = new
                ActiveMQConnectionFactory("vm://localhost");
        Connection connection = factory.createConnection();
        connection.start();
    
        queue = new ActiveMQQueue("testQueue");
        session = connection.createSession(false, 
                Session.AUTO_ACKNOWLEDGE);
        
        setupConsumer("ConsumerA");
        setupConsumer("ConsumerB");
        setupConsumer("ConsumerC");
        
        setupProducer("ProducerA", "ConsumerA");
        setupProducer("ProducerB", "ConsumerB");
        setupProducer("ProducerC", "ConsumerC");
    }
    
    private void setupConsumer(final String name) 
            throws JMSException {
        //创建一个消费者,它只接受属于它自己的消息
        MessageConsumer consumer = session.createConsumer(queue, 
                "receiver='" + name + "'");
        consumer.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    MessageProducer producer = session
                            .createProducer(queue);
                    System.out.println(name + " get:" + 
                            ((TextMessage)m).getText());
                    //回复一个消息
                    Message replyMessage = session
                            .createTextMessage("Reply from " + name);
                    //设置JMSCorrelationID为刚才收到的消息的ID
                    replyMessage.setJMSCorrelationID(
                            m.getJMSMessageID());
                    producer.send(replyMessage);
                } catch (JMSException e) { }
            }
        });
    }
 
    private void setupProducer(final String name, String consumerName)
            throws JMSException {
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //创建一个消息,并设置一个属性receiver,为消费者的名字。
        Message message = session.createTextMessage("Message from " + 
                name);
        message.setStringProperty("receiver", consumerName);
        producer.send(message);
        
        //等待回复的消息
        MessageConsumer replyConsumer = session.createConsumer(queue,
              "JMSCorrelationID='" + message.getJMSMessageID() + "'");
        replyConsumer.setMessageListener(new MessageListener(){
            public void onMessage(Message m) {
                try {
                    System.out.println(name + " get reply:" + 
                        ((TextMessage)m).getText());
                } catch (JMSException e) { }
            }
        });
    }
    
    public static void main(String[] args) throws Exception {
        new JMSCorrelationIDTest ();
    }
}


运行结果为:

ConsumerA get:Message from ProducerA

ProducerA get reply:Reply from ConsumerA

ConsumerB get:Message from ProducerB

ProducerB get reply:Reply from ConsumerB

ConsumerC get:Message from ProducerC

ProducerC get reply:Reply from ConsumerC


本文链接:JMS&ActiveMQ实战- JMSCorrelationID与Selector,领悟书生学习笔记,转自:http://www.360doc.com/content/09/0712/20/18042_4241252.shtml

相关问答

更多
  • Thread.sleep(5000); 延时加载
  • Thread.sleep(5000);延时加载
  • JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而activemq则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者。 jms 的一个标准或者说是一个协议. 通常用于企业级应用的消息传递. 主要有topic 消息(1 对多), queue 消息(1对1)。activemq 是一个jms 的实现, apache 出的. 另外还其它的实现 jboss 。 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对 ...
  • 使用ActiveMQ,您可以使用STOMP作为协议。 它比xmpp简单得多。 似乎没有一个本地stomp执行android(有几个java)。 但规范只有一个页面,所以这不应该成为一个问题。 另一种选择可能是RabbitMQ。 我记得有关Android系统的rabbitMQ库的消息。 With ActiveMQ you could use STOMP as a protocol. its much simpler than xmpp. there doesn't seem to be a native st ...
  • JMS选择器语法支持AND以及OR。 看起来你需要一个复合选择器。selector = paymentSystem ='pay'or paymentSystem ='foo' ref: JMS选择器语法 The JMS selector syntax supports AND as well as OR. Looks like you need a compound selector.. selector=paymentSystem='pay' or paymentSystem='foo' ref: JMS ...
  • 您可以使用TCP适配器/网关(可以选择使用NIO)以及自定义(反)序列化程序。 如果必须使用Grizzly,则可以编写服务器连接工厂实现。 对于出站适配器(或入站网关),端点注册为“TcpListener”(使用connectionId),SI消息包含用于确定哪个连接获得回复的IpHeaders.CONNECTION_ID标头。 连接关闭时,它将取消注册(从地图中删除)。 You could use the TCP adapters/gateways (which have an option to use ...
  • 使用正确的后端uri创建一个地址端点,如下所示。
    1.0 ...
  • 我将PDI集成到JAX-RS Web应用程序中没有问题。 从JMS调用作业应该很容易。 在这里您可以找到如何将PDI与java集成: http : //wiki.pentaho.com/display/EAI/Pentaho+Data+Integration+-+Java+API+Examples 对于webapps,你可以在网上找到大量的材料。 There is a JMS consumer comes handy and the corresponding settings (Similar to JM ...
  • Web套接字是一种在Web浏览器和Web服务器之间提供双向套接字样式接口的方式,服务器能够推送信息,而不是仅响应浏览器HTTP“拉”请求。 这听起来来自你的问题和澄清,这不是你所需要的。 然而,简单套接字是在应用程序之间提供同步通信的好方法。 如果接收到消息的应用程序可以同步处理它们 - 只要它们被发送 - 常规套接字可能是一个很好的解决方案。 消息队列用于异步通信 - 消息在发送后可能需要存储一段时间,然后接收者才能收到消息并对消息进行操作。 由于需要存储,消息队列需要单独的服务器来存储消息,或者在某些情 ...
  • MEL在选择器中工作正常,但其使用非常有限。 创建JMS选择器时,Mule没有可用的空中事件,因此没有任何事件绑定数据(包括会话)可用。 要选择一个非常特殊的消息,您需要使用由所需选择器构造的JMS 消息请求者 ,如: jms://REQUEST.QUEUE?selector=MULE_CORRELATION_ID%3D'#[sessionVars.myCorrelationId]' MEL is working fine in the selector but its usage is very lim ...