相关文章
更多最近更新
更多ACTIVEMQ Topic消息 生产者 发布消息后 消费者收不到消息
2019-03-25 13:34|来源: 网路
生产者
public class Producer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic= new ActiveMQTopic("testTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for(int i=0; i<10; i++){
TextMessage message = session.createTextMessage();
message.setText("message_" + System.currentTimeMillis());
producer.send(message);
System.out.println("Sent message: " + message.getText());
}
// session.close();
// connection.stop();
// connection.close();
}
}
发布消息的结果
Sent message: message_1341915173083
Sent message: message_1341915173085
Sent message: message_1341915173085
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173087
Sent message: message_1341915173087
Sent message: message_1341915173088
Sent message: message_1341915173088
消费者
public class Consumer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic= new ActiveMQTopic("testTopic");
// javax.jms.Topic topic = session.createTopic("myTopic.messages");
MessageConsumer consumer = session.createConsumer( topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener(){
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// session.close();
// connection.stop();
// connection.close();
}
}
消费者运行程序后后获取不到生产者发布的消息,初识 ActiveMQ不太熟悉, 求解答
相关问答
更多-
明天有空试验一下先 找了个例子,试了一下,应该是topic方式的,有定义 运行正常
-
虽然发布者/订阅者和生产者/消费者术语都与消息传递有关,但它们是不同的并且不能互换使用。 发布者/订阅者是将发布分发给多个接收者的消息传递模式。 而在生产者/消费者模式中,生产者是消息的发送者,消费者是消息的接收者。 生产者和消费者是发布者/订阅者和点对点消息模式的组成部分。 希望这可以帮助。 Although both Publish/Subscribe and Producer/Consumer terms are related to messaging, they are different and ...
-
首先要强调的是:看看这个stomp插件: http://activemq.apache.org/message-redelivery-and-dlq-handling.html 我可以做的另一个解决方法是:在生产者方面:1。更改生产者以发送持久性消息 在您的消费者方面:使用计时器。 1.读取消息/帧,直到达到空或最大上限。 2.创建CURL请求和空的打包消息列表3.将服务器休眠5秒钟 你肯定需要进一步测试,但应该工作。 一旦进程唤醒,您应该能够读取排队的所有消息。 需要考虑的事项: - 持久性消息需要到期时 ...
-
消费ActiveMQ消息时如何保留消息顺序?(How do you preserve message order when consuming messages from ActiveMQ?)[2021-10-10]
你不应该做任何事来维持秩序; 这是消息队列为您做的事情之一。 我认为如果你有一个消费者正在监听队列,并且它正在处理无序的消息,那么你要么发现了一个错误,要么消息没有按照你认为的顺序排队。 此外, ActiveMQ FAQ中的这个问题可能有所帮助。 编辑:从阅读关于duffymo的答案的评论,看起来你有点过度工程 。 通常,像ActiveMQ,MQ Series,joram等消息队列具有两个特征:它们以与它们排队相同的顺序传递消息,并且它们保证消息传递。 发送单独的ACK消息是多余的; 这有点像提交数据库事务 ... -
创建消费者时,您可以指定消息选择器以使用生产者的唯一ID排除消息。 生成器可以在创建消息时在消息属性中设置其ID。 When you create your consumer you can specify a message selector to exclude messages with your producer's unique ID. Your producer can set its ID in a message property when creating its messages.
-
在创建主题消费者之后,需要发送消息。 如果没有消费者,那么主题就是火与忘,然后丢弃该消息。 任何联机的消费者只会接收在此之后发送的消息,除非它是Durable主题消费者或Queue消费者。 在持久消费者的情况下,您必须创建它的实例,以便在将这些消息发送到主题之前存在订阅记录。 所以我猜你的问题是你以前没有订阅过这个消费者,因此Broker没有为它存储任何消息。 I was so stupid about the phrase "using".Beacause I use "using" open conne ...
-
是的,Apache Camel中的生产者和消费者模板都是线程安全的。 是的,请求/回复正常。 它是如何实现的是Camel路由引擎和正在使用的组件之间的混合。 例如,REST和JMS的工作方式略有不同。 其中JMS将使用单独的消息队列来获取回复,其具有用于关联回复消息的JMSCorrelationID。 由于HTTP的同步特性,REST通常是同步调用。 但它取决于底层组件,因为有些支持http客户端上的异步通信,如camel-jetty,camel-ahc等。 Yes both the producer an ...
-
从生产者的角度来看,ActiveMQ聚集了主题并且失败了安全性(ActiveMQ clustered topics and fail safety from producer point of view)[2023-09-07]
是的,您将故障转移:方案结合起来以提供客户端恢复,然后使用服务器端的代理网络将消息分发给群集中的其他使用者。 Yes, you combine the failover: scheme to provide client-side recovery and then use the network-of-broker on the server-side to distribute the messages to other consumers in the cluster. -
看一下Prefetch Policy 。 如果您将其设置为1,那么它可能会为您修复此问题。 ...&consumer.prefetchSize=1 Take a look at Prefetch Policy. If you set it to 1 then it may fix this one for you. ...&consumer.prefetchSize=1
-
未使用ActiveMQ VM Transport从生产者接收消息(Not receiving message from producer using ActiveMQ VM Transport)[2022-02-14]
我确定,vm是VM内部的传输! 并且无法在其外部访问,因此解决方案是2个客户端中的一个需要使用vm传输,而另一个客户端需要使用vm传输启动tcp和ActiveMQ,或者将2个组件嵌入到同一个VM中。 看看我对同一个用例的另一个答案当两个应用程序都使用嵌入式activemq时,如何将Jms消息从一个spring-boot应用程序发送到另一个 i was sure, vm is the transport inside the VM! and not accessible outside it, so the ...