JMS&ActiveMQ实战- 消息的接收与监听

2019-03-27 23:43|来源: 网络

消息的消费者接收消息可以采用两种方式:
1、consumer.receive() 或 consumer.receive(int timeout);
2、注册一个MessageListener。
采用第一种方式,消息的接收者会一直等待下去,直到有消息到达,或者超时。后一种方式会注册一个监听器,当有消息到达的时候,会回调它的 onMessage()方法。下面举例说明:

  1. MessageConsumer comsumer = session.createConsumer(queue);  

  2. comsumer.setMessageListener(new MessageListener(){  

  3.            @Override  

  4.            public void onMessage(Message m) {  

  5.                TextMessage textMsg = (TextMessage) m;  

  6.                try {  

  7.                    System.out.println(textMsg.getText());  

  8.                } catch (JMSException e) {  

  9.                    e.printStackTrace();  

  10.               }  

  11.          }  

  12.     }  

  13. );  


Queue实现的是点到点模型,在下面的例子中,启动2个消费者共同监听一个Queue,然后循环给这个Queue中发送多个消息,我们依然采用 ActiveMQ

  1. import javax.jms.Connection;  

  2. import javax.jms.DeliveryMode;  

  3. import javax.jms.JMSException;  

  4. import javax.jms.Message;  

  5. import javax.jms.MessageConsumer;  

  6. import javax.jms.MessageListener;  

  7. import javax.jms.MessageProducer;  

  8. import javax.jms.Queue;  

  9. import javax.jms.Session;  

  10. import javax.jms.TextMessage;  

  11.  

  12. import org.apache.activemq.ActiveMQConnectionFactory;  

  13. import org.apache.activemq.command.ActiveMQQueue;  

  14.  

  15.  

  16. public class QueueTest {  

  17.  

  18.    public static void main(String[] args) throws Exception {  

  19.        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  

  20.    

  21.        Connection connection = factory.createConnection();  

  22.        connection.start();  

  23.        

  24.        //创建一个Queue  

  25.        Queue queue = new ActiveMQQueue("testQueue");  

  26.        //创建一个Session  

  27.        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  

  28.        

  29.        //注册消费者1  

  30.        MessageConsumer comsumer1 = session.createConsumer(queue);  

  31.        comsumer1.setMessageListener(new MessageListener(){  

  32.            public void onMessage(Message m) {  

  33.                try {  

  34.                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());  

  35.                } catch (JMSException e) {  

  36.                    e.printStackTrace();  

  37.                }  

  38.            }  

  39.        });  

  40.        

  41.        //注册消费者2  

  42.        MessageConsumer comsumer2 = session.createConsumer(queue);  

  43.        comsumer2.setMessageListener(new MessageListener(){  

  44.            public void onMessage(Message m) {  

  45.                try {  

  46.                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());  

  47.                } catch (JMSException e) {  

  48.                    e.printStackTrace();  

  49.                }  

  50.            }  

  51.            

  52.        });  

  53.        

  54.        //创建一个生产者,然后发送多个消息。  

  55.        MessageProducer producer = session.createProducer(queue);  

  56.        for(int i=0; i<10; i++){  

  57.            producer.send(session.createTextMessage("Message:" + i));  

  58.        }  

  59.    }  

  60.  

  61. }  


运行这个例子会得到下面的输出结果:

  1. Consumer1 get Message:0  

  2. Consumer2 get Message:1  

  3. Consumer1 get Message:2  

  4. Consumer2 get Message:3  

  5. Consumer1 get Message:4  

  6. Consumer2 get Message:5  

  7. Consumer1 get Message:6  

  8. Consumer2 get Message:7  

  9. Consumer1 get Message:8  

  10. Consumer2 get Message:9  

可以看出每个消息直被消费了一次,但是如果有多个消费者同时监听一个Queue的话,无法确定一个消息最终会被哪一个消费者消费。


与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息。

  1. import javax.jms.Connection;  

  2. import javax.jms.JMSException;  

  3. import javax.jms.Message;  

  4. import javax.jms.MessageConsumer;  

  5. import javax.jms.MessageListener;  

  6. import javax.jms.MessageProducer;  

  7. import javax.jms.Session;  

  8. import javax.jms.TextMessage;  

  9. import javax.jms.Topic;  

  10. import org.apache.activemq.ActiveMQConnectionFactory;  

  11. import org.apache.activemq.command.ActiveMQTopic;  

  12.  

  13. public class TopicTest {  

  14.    public static void main(String[] args) throws Exception {  

  15.        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");  

  16.    

  17.        Connection connection = factory.createConnection();  

  18.        connection.start();  

  19.        

  20.        //创建一个Topic  

  21.        Topic topic= new ActiveMQTopic("testTopic");  

  22.        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  

  23.          

  24.        //注册消费者1  

  25.        MessageConsumer comsumer1 = session.createConsumer(topic);  

  26.        comsumer1.setMessageListener(new MessageListener(){  

  27.            public void onMessage(Message m) {  

  28.                try {  

  29.                    System.out.println("Consumer1 get " + ((TextMessage)m).getText());  

  30.                } catch (JMSException e) {  

  31.                    e.printStackTrace();  

  32.                }  

  33.            }  

  34.        });  

  35.        

  36.        //注册消费者2  

  37.        MessageConsumer comsumer2 = session.createConsumer(topic);  

  38.        comsumer2.setMessageListener(new MessageListener(){  

  39.            public void onMessage(Message m) {  

  40.                try {  

  41.                    System.out.println("Consumer2 get " + ((TextMessage)m).getText());  

  42.                } catch (JMSException e) {  

  43.                    e.printStackTrace();  

  44.                }  

  45.            }  

  46.            

  47.        });  

  48.        

  49.        //创建一个生产者,然后发送多个消息。  

  50.        MessageProducer producer = session.createProducer(topic);  

  51.        for(int i=0; i<10; i++){  

  52.            producer.send(session.createTextMessage("Message:" + i));  

  53.        }  

  54.    }  

  55. }  


运行后得到下面的输出结果:

  1. Consumer1 get Message:0  

  2. Consumer2 get Message:0  

  3. Consumer1 get Message:1  

  4. Consumer2 get Message:1  

  5. Consumer1 get Message:2  

  6. Consumer2 get Message:2  

  7. Consumer1 get Message:3  

  8. Consumer2 get Message:3  

  9. Consumer1 get Message:4  

  10. Consumer2 get Message:4  

  11. Consumer1 get Message:5  

  12. Consumer2 get Message:5  

  13. Consumer1 get Message:6  

  14. Consumer2 get Message:6  

  15. Consumer1 get Message:7  

  16. Consumer2 get Message:7  

  17. Consumer1 get Message:8  

  18. Consumer2 get Message:8  

  19. Consumer1 get Message:9  

  20. Consumer2 get Message:9  

说明每一个消息都会被所有的消费者消费。


本文链接:JMS&ActiveMQ实战- 消息的接收与监听,领悟书生学习笔记,转自:http://calvinliu.iteye.com/blog/677493

相关问答

更多
  • Thread.sleep(5000); 延时加载
  • Thread.sleep(5000);延时加载
  • JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而activemq则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者。 jms 的一个标准或者说是一个协议. 通常用于企业级应用的消息传递. 主要有topic 消息(1 对多), queue 消息(1对1)。activemq 是一个jms 的实现, apache 出的. 另外还其它的实现 jboss 。 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对 ...
  • 这很容易。 使用WSDL文件1创建一个SOAP项目。 然后右键单击WSDL端点绑定。 并添加JMS端点。 给你的配置和其他选项,如会话,发送,接收。 我附上了一个示例图像以帮助您做到这一点。 希望这可以帮助。 快乐的编码。 1 http://www.webservicex.com/CurrencyConvertor.asmx?wsdl It's easy. Create a SOAP project using the WSDL file 1. Then right click on WSDL endpoi ...
  • 通过 ,Mule将继续重新尝试连接到ActiveMQ
  • 您可以使用TCP适配器/网关(可以选择使用NIO)以及自定义(反)序列化程序。 如果必须使用Grizzly,则可以编写服务器连接工厂实现。 对于出站适配器(或入站网关),端点注册为“TcpListener”(使用connectionId),SI消息包含用于确定哪个连接获得回复的IpHeaders.CONNECTION_ID标头。 连接关闭时,它将取消注册(从地图中删除)。 You could use the TCP adapters/gateways (which have an option to use ...
  • 您可以使用单个ActiveMQ实例。 只需确保没有目标名称冲突[除非他们将共享]并且您的实例使用tcp://或其他网络协议之一创建网络侦听器,即您不希望使用vm:// 。 You can use a single ActiveMQ instance. Just make sure you have no destination name collisions [unless they will share] and that your instance creates networked listeners ...
  • 使用正确的后端uri创建一个地址端点,如下所示。
    1.0 ...
  • 我将PDI集成到JAX-RS Web应用程序中没有问题。 从JMS调用作业应该很容易。 在这里您可以找到如何将PDI与java集成: http : //wiki.pentaho.com/display/EAI/Pentaho+Data+Integration+-+Java+API+Examples 对于webapps,你可以在网上找到大量的材料。 There is a JMS consumer comes handy and the corresponding settings (Similar to JM ...
  • Web套接字是一种在Web浏览器和Web服务器之间提供双向套接字样式接口的方式,服务器能够推送信息,而不是仅响应浏览器HTTP“拉”请求。 这听起来来自你的问题和澄清,这不是你所需要的。 然而,简单套接字是在应用程序之间提供同步通信的好方法。 如果接收到消息的应用程序可以同步处理它们 - 只要它们被发送 - 常规套接字可能是一个很好的解决方案。 消息队列用于异步通信 - 消息在发送后可能需要存储一段时间,然后接收者才能收到消息并对消息进行操作。 由于需要存储,消息队列需要单独的服务器来存储消息,或者在某些情 ...