知识点
相关文章
更多最近更新
更多JMS&ActiveMQ实战- 消息的接收与监听
2019-03-27 23:43|来源: 网络
消息的消费者接收消息可以采用两种方式:
1、consumer.receive() 或 consumer.receive(int timeout);
2、注册一个MessageListener。
采用第一种方式,消息的接收者会一直等待下去,直到有消息到达,或者超时。后一种方式会注册一个监听器,当有消息到达的时候,会回调它的 onMessage()方法。下面举例说明:
MessageConsumer comsumer = session.createConsumer(queue);
comsumer.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message m) {
TextMessage textMsg = (TextMessage) m;
try {
System.out.println(textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
);
Queue实现的是点到点模型,在下面的例子中,启动2个消费者共同监听一个Queue,然后循环给这个Queue中发送多个消息,我们依然采用 ActiveMQ
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
public class QueueTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
//创建一个Queue
Queue queue = new ActiveMQQueue("testQueue");
//创建一个Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//注册消费者1
MessageConsumer comsumer1 = session.createConsumer(queue);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//注册消费者2
MessageConsumer comsumer2 = session.createConsumer(queue);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//创建一个生产者,然后发送多个消息。
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
}
}
运行这个例子会得到下面的输出结果:
Consumer1 get Message:0
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:9
可以看出每个消息直被消费了一次,但是如果有多个消费者同时监听一个Queue的话,无法确定一个消息最终会被哪一个消费者消费。
与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
//创建一个Topic
Topic topic= new ActiveMQTopic("testTopic");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//注册消费者1
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer1 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//注册消费者2
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message m) {
try {
System.out.println("Consumer2 get " + ((TextMessage)m).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//创建一个生产者,然后发送多个消息。
MessageProducer producer = session.createProducer(topic);
for(int i=0; i<10; i++){
producer.send(session.createTextMessage("Message:" + i));
}
}
}
运行后得到下面的输出结果:
Consumer1 get Message:0
Consumer2 get Message:0
Consumer1 get Message:1
Consumer2 get Message:1
Consumer1 get Message:2
Consumer2 get Message:2
Consumer1 get Message:3
Consumer2 get Message:3
Consumer1 get Message:4
Consumer2 get Message:4
Consumer1 get Message:5
Consumer2 get Message:5
Consumer1 get Message:6
Consumer2 get Message:6
Consumer1 get Message:7
Consumer2 get Message:7
Consumer1 get Message:8
Consumer2 get Message:8
Consumer1 get Message:9
Consumer2 get Message:9
说明每一个消息都会被所有的消费者消费。
本文链接:JMS&ActiveMQ实战- 消息的接收与监听,领悟书生学习笔记,转自:http://calvinliu.iteye.com/blog/677493
相关问答
更多-
ActiveMQ与Spring实现JMS的问题?[2022-03-13]
Thread.sleep(5000); 延时加载 -
ActiveMQ与Spring实现JMS的问题?[2024-01-16]
Thread.sleep(5000);延时加载 -
activemq和jms是种什么关系[2022-07-23]
JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而activemq则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者。 jms 的一个标准或者说是一个协议. 通常用于企业级应用的消息传递. 主要有topic 消息(1 对多), queue 消息(1对1)。activemq 是一个jms 的实现, apache 出的. 另外还其它的实现 jboss 。 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对 ... -
这很容易。 使用WSDL文件1创建一个SOAP项目。 然后右键单击WSDL端点绑定。 并添加JMS端点。 给你的配置和其他选项,如会话,发送,接收。 我附上了一个示例图像以帮助您做到这一点。 希望这可以帮助。 快乐的编码。 1 http://www.webservicex.com/CurrencyConvertor.asmx?wsdl It's easy. Create a SOAP project using the WSDL file 1. Then right click on WSDL endpoi ...
-
通过
,Mule将继续重新尝试连接到ActiveMQ 您可以使用TCP适配器/网关(可以选择使用NIO)以及自定义(反)序列化程序。 如果必须使用Grizzly,则可以编写服务器连接工厂实现。 对于出站适配器(或入站网关),端点注册为“TcpListener”(使用connectionId),SI消息包含用于确定哪个连接获得回复的IpHeaders.CONNECTION_ID标头。 连接关闭时,它将取消注册(从地图中删除)。 You could use the TCP adapters/gateways (which have an option to use ...您可以使用单个ActiveMQ实例。 只需确保没有目标名称冲突[除非他们将共享]并且您的实例使用tcp://或其他网络协议之一创建网络侦听器,即您不希望使用vm:// 。 You can use a single ActiveMQ instance. Just make sure you have no destination name collisions [unless they will share] and that your instance creates networked listeners ...使用正确的后端uri创建一个地址端点,如下所示。1.0 我将PDI集成到JAX-RS Web应用程序中没有问题。 从JMS调用作业应该很容易。 在这里您可以找到如何将PDI与java集成: http : //wiki.pentaho.com/display/EAI/Pentaho+Data+Integration+-+Java+API+Examples 对于webapps,你可以在网上找到大量的材料。 There is a JMS consumer comes handy and the corresponding settings (Similar to JM ...Web套接字是一种在Web浏览器和Web服务器之间提供双向套接字样式接口的方式,服务器能够推送信息,而不是仅响应浏览器HTTP“拉”请求。 这听起来来自你的问题和澄清,这不是你所需要的。 然而,简单套接字是在应用程序之间提供同步通信的好方法。 如果接收到消息的应用程序可以同步处理它们 - 只要它们被发送 - 常规套接字可能是一个很好的解决方案。 消息队列用于异步通信 - 消息在发送后可能需要存储一段时间,然后接收者才能收到消息并对消息进行操作。 由于需要存储,消息队列需要单独的服务器来存储消息,或者在某些情 ...