ACTIVEMQ Topic消息 生产者 发布消息后 消费者收不到消息

2019-03-25 13:34|来源: 网路

 

生产者

public class Producer {
	 public static void main(String[] args) throws JMSException {  
	        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
	        Connection connection = factory.createConnection();  
	        connection.start();  
	          
	        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
	        ActiveMQTopic topic= new ActiveMQTopic("testTopic");  
	  
	        MessageProducer producer = session.createProducer(topic);  
	       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
	  
	
	        for(int i=0; i<10; i++){
	            TextMessage message = session.createTextMessage();  
	           message.setText("message_" + System.currentTimeMillis());  
	           producer.send(message);  
	           System.out.println("Sent message: " + message.getText());  
	        }
	          
	       
	  
//	      session.close();  
//	      connection.stop();  
//	      connection.close();  
	    }  
}

发布消息的结果
Sent message: message_1341915173083
Sent message: message_1341915173085
Sent message: message_1341915173085
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173086
Sent message: message_1341915173087
Sent message: message_1341915173087
Sent message: message_1341915173088
Sent message: message_1341915173088






消费者
public class Consumer {
	public static void main(String[] args) throws JMSException {  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
          
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        ActiveMQTopic topic= new ActiveMQTopic("testTopic");
       // javax.jms.Topic topic =  session.createTopic("myTopic.messages");  
  
        MessageConsumer consumer = session.createConsumer( topic);  
        consumer.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                try {  
                    System.out.println("Received message: " + tm.getText());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        
        
        
        MessageConsumer comsumer2 = session.createConsumer(topic);
        comsumer2.setMessageListener(new MessageListener(){
        	 public void onMessage(Message message) {  
                 TextMessage tm = (TextMessage) message;  
                 try {  
                     System.out.println("Received message: " + tm.getText());  
                 } catch (JMSException e) {  
                     e.printStackTrace();  
                 }  
             }  
        });
//      session.close();  
//      connection.stop();  
//      connection.close();  
    }  
}
消费者运行程序后后获取不到生产者发布的消息,初识 ActiveMQ不太熟悉,  求解答
 

 

相关问答

更多
  • 明天有空试验一下先 找了个例子,试了一下,应该是topic方式的,有定义 运行正常
  • 虽然发布者/订阅者和生产者/消费者术语都与消息传递有关,但它们是不同的并且不能互换使用。 发布者/订阅者是将发布分发给多个接收者的消息传递模式。 而在生产者/消费者模式中,生产者是消息的发送者,消费者是消息的接收者。 生产者和消费者是发布者/订阅者和点对点消息模式的组成部分。 希望这可以帮助。 Although both Publish/Subscribe and Producer/Consumer terms are related to messaging, they are different and ...
  • 首先要强调的是:看看这个stomp插件: http://activemq.apache.org/message-redelivery-and-dlq-handling.html 我可以做的另一个解决方法是:在生产者方面:1。更改生产者以发送持久性消息 在您的消费者方面:使用计时器。 1.读取消息/帧,直到达到空或最大上限。 2.创建CURL请求和空的打包消息列表3.将服务器休眠5秒钟 你肯定需要进一步测试,但应该工作。 一旦进程唤醒,您应该能够读取排队的所有消息。 需要考虑的事项: - 持久性消息需要到期时 ...
  • 你不应该做任何事来维持秩序; 这是消息队列为您做的事情之一。 我认为如果你有一个消费者正在监听队列,并且它正在处理无序的消息,那么你要么发现了一个错误,要么消息没有按照你认为的顺序排队。 此外, ActiveMQ FAQ中的这个问题可能有所帮助。 编辑:从阅读关于duffymo的答案的评论,看起来你有点过度工程 。 通常,像ActiveMQ,MQ Series,joram等消息队列具有两个特征:它们以与它们排队相同的顺序传递消息,并且它们保证消息传递。 发送单独的ACK消息是多余的; 这有点像提交数据库事务 ...
  • 创建消费者时,您可以指定消息选择器以使用生产者的唯一ID排除消息。 生成器可以在创建消息时在消息属性中设置其ID。 When you create your consumer you can specify a message selector to exclude messages with your producer's unique ID. Your producer can set its ID in a message property when creating its messages.
  • 在创建主题消费者之后,需要发送消息。 如果没有消费者,那么主题就是火与忘,然后丢弃该消息。 任何联机的消费者只会接收在此之后发送的消息,除非它是Durable主题消费者或Queue消费者。 在持久消费者的情况下,您必须创建它的实例,以便在将这些消息发送到主题之前存在订阅记录。 所以我猜你的问题是你以前没有订阅过这个消费者,因此Broker没有为它存储任何消息。 I was so stupid about the phrase "using".Beacause I use "using" open conne ...
  • 是的,Apache Camel中的生产者和消费者模板都是线程安全的。 是的,请求/回复正常。 它是如何实现的是Camel路由引擎和正在使用的组件之间的混合。 例如,REST和JMS的工作方式略有不同。 其中JMS将使用单独的消息队列来获取回复,其具有用于关联回复消息的JMSCorrelationID。 由于HTTP的同步特性,REST通常是同步调用。 但它取决于底层组件,因为有些支持http客户端上的异步通信,如camel-jetty,camel-ahc等。 Yes both the producer an ...
  • 是的,您将故障转移:方案结合起来以提供客户端恢复,然后使用服务器端的代理网络将消息分发给群集中的其他使用者。 Yes, you combine the failover: scheme to provide client-side recovery and then use the network-of-broker on the server-side to distribute the messages to other consumers in the cluster.
  • 看一下Prefetch Policy 。 如果您将其设置为1,那么它可能会为您修复此问题。 ...&consumer.prefetchSize=1 Take a look at Prefetch Policy. If you set it to 1 then it may fix this one for you. ...&consumer.prefetchSize=1
  • 我确定,vm是VM内部的传输! 并且无法在其外部访问,因此解决方案是2个客户端中的一个需要使用vm传输,而另一个客户端需要使用vm传输启动tcp和ActiveMQ,或者将2个组件嵌入到同一个VM中。 看看我对同一个用例的另一个答案当两个应用程序都使用嵌入式activemq时,如何将Jms消息从一个spring-boot应用程序发送到另一个 i was sure, vm is the transport inside the VM! and not accessible outside it, so the ...