知识点
相关文章
更多最近更新
更多JMS&ActiveMQ实战- JMSCorrelationID与Selector
2019-03-28 00:24|来源: 网络
前面讲过JMSCorrelationID主要是用来关联多个Message,例如需要回复一个消息的时候,通常把回复的消息的 JMSCorrelationID设置为原来消息的ID。在下面这个例子中,创建了三个消息生产者A,B,C和三个消息消费者A,B,C。生产者A给消费者A发送一个消息,同时需要消费者A给它回复一个消息。B、C与A类似。
简图如下:
生产者A-----发送----〉消费者A-----回复------〉生产者A
生产者B-----发送----〉消费者B-----回复------〉生产者B
生产者C-----发送----〉消费者C-----回复------〉生产者C
需要注意的是,所有的发送和回复都使用同一个Queue,通过Selector区分。
import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class JMSCorrelationIDTest { private Queue queue; private Session session; public JMSCorrelationIDTest() throws JMSException{ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); Connection connection = factory.createConnection(); connection.start(); queue = new ActiveMQQueue("testQueue"); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); setupConsumer("ConsumerA"); setupConsumer("ConsumerB"); setupConsumer("ConsumerC"); setupProducer("ProducerA", "ConsumerA"); setupProducer("ProducerB", "ConsumerB"); setupProducer("ProducerC", "ConsumerC"); } private void setupConsumer(final String name) throws JMSException { //创建一个消费者,它只接受属于它自己的消息 MessageConsumer consumer = session.createConsumer(queue, "receiver='" + name + "'"); consumer.setMessageListener(new MessageListener(){ public void onMessage(Message m) { try { MessageProducer producer = session .createProducer(queue); System.out.println(name + " get:" + ((TextMessage)m).getText()); //回复一个消息 Message replyMessage = session .createTextMessage("Reply from " + name); //设置JMSCorrelationID为刚才收到的消息的ID replyMessage.setJMSCorrelationID( m.getJMSMessageID()); producer.send(replyMessage); } catch (JMSException e) { } } }); } private void setupProducer(final String name, String consumerName) throws JMSException { MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //创建一个消息,并设置一个属性receiver,为消费者的名字。 Message message = session.createTextMessage("Message from " + name); message.setStringProperty("receiver", consumerName); producer.send(message); //等待回复的消息 MessageConsumer replyConsumer = session.createConsumer(queue, "JMSCorrelationID='" + message.getJMSMessageID() + "'"); replyConsumer.setMessageListener(new MessageListener(){ public void onMessage(Message m) { try { System.out.println(name + " get reply:" + ((TextMessage)m).getText()); } catch (JMSException e) { } } }); } public static void main(String[] args) throws Exception { new JMSCorrelationIDTest (); } }
运行结果为:
ConsumerA get:Message from ProducerA
ProducerA get reply:Reply from ConsumerA
ConsumerB get:Message from ProducerB
ProducerB get reply:Reply from ConsumerB
ConsumerC get:Message from ProducerC
ProducerC get reply:Reply from ConsumerC
本文链接:JMS&ActiveMQ实战- JMSCorrelationID与Selector,领悟书生学习笔记,转自:http://www.360doc.com/content/09/0712/20/18042_4241252.shtml
相关问答
更多-
ActiveMQ与Spring实现JMS的问题?[2022-03-13]
Thread.sleep(5000); 延时加载 -
ActiveMQ与Spring实现JMS的问题?[2024-01-16]
Thread.sleep(5000);延时加载 -
activemq和jms是种什么关系[2022-07-23]
JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而activemq则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者。 jms 的一个标准或者说是一个协议. 通常用于企业级应用的消息传递. 主要有topic 消息(1 对多), queue 消息(1对1)。activemq 是一个jms 的实现, apache 出的. 另外还其它的实现 jboss 。 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对 ... -
使用ActiveMQ,您可以使用STOMP作为协议。 它比xmpp简单得多。 似乎没有一个本地stomp执行android(有几个java)。 但规范只有一个页面,所以这不应该成为一个问题。 另一种选择可能是RabbitMQ。 我记得有关Android系统的rabbitMQ库的消息。 With ActiveMQ you could use STOMP as a protocol. its much simpler than xmpp. there doesn't seem to be a native st ...
-
骆驼Spring JMS选择器(Camel Spring JMS Selector)[2022-07-14]
JMS选择器语法支持AND以及OR。 看起来你需要一个复合选择器。selector = paymentSystem ='pay'or paymentSystem ='foo' ref: JMS选择器语法 The JMS selector syntax supports AND as well as OR. Looks like you need a compound selector.. selector=paymentSystem='pay' or paymentSystem='foo' ref: JMS ... -
您可以使用TCP适配器/网关(可以选择使用NIO)以及自定义(反)序列化程序。 如果必须使用Grizzly,则可以编写服务器连接工厂实现。 对于出站适配器(或入站网关),端点注册为“TcpListener”(使用connectionId),SI消息包含用于确定哪个连接获得回复的IpHeaders.CONNECTION_ID标头。 连接关闭时,它将取消注册(从地图中删除)。 You could use the TCP adapters/gateways (which have an option to use ...
-
使用正确的后端uri创建一个地址端点,如下所示。
1.0 -
我将PDI集成到JAX-RS Web应用程序中没有问题。 从JMS调用作业应该很容易。 在这里您可以找到如何将PDI与java集成: http : //wiki.pentaho.com/display/EAI/Pentaho+Data+Integration+-+Java+API+Examples 对于webapps,你可以在网上找到大量的材料。 There is a JMS consumer comes handy and the corresponding settings (Similar to JM ...
-
Web套接字是一种在Web浏览器和Web服务器之间提供双向套接字样式接口的方式,服务器能够推送信息,而不是仅响应浏览器HTTP“拉”请求。 这听起来来自你的问题和澄清,这不是你所需要的。 然而,简单套接字是在应用程序之间提供同步通信的好方法。 如果接收到消息的应用程序可以同步处理它们 - 只要它们被发送 - 常规套接字可能是一个很好的解决方案。 消息队列用于异步通信 - 消息在发送后可能需要存储一段时间,然后接收者才能收到消息并对消息进行操作。 由于需要存储,消息队列需要单独的服务器来存储消息,或者在某些情 ...
-
MEL在选择器中工作正常,但其使用非常有限。 创建JMS选择器时,Mule没有可用的空中事件,因此没有任何事件绑定数据(包括会话)可用。 要选择一个非常特殊的消息,您需要使用由所需选择器构造的JMS 消息请求者 ,如: jms://REQUEST.QUEUE?selector=MULE_CORRELATION_ID%3D'#[sessionVars.myCorrelationId]' MEL is working fine in the selector but its usage is very lim ...