为什么我的线程不能立即启动?(Why is my thread not starting immediately?)
见下面的程序。 我用函数abc开始一个新的线程x,然后我做一些更长的任务。 为什么x只在end sub之后开始? 它不应该马上开始睡觉吗?
Private Sub Button1_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button1.Click Dim x As New Threading.Thread(AddressOf abc) x.SetApartmentState(Threading.ApartmentState.MTA) x.Start() System.Threading.Thread.Sleep(5000) End Sub Sub abc() For i As Integer = 0 To 10 Step 1 Me.lblStatus.Text = "Testing DB connection ( timeout in: " + i.ToString() + "s )" 'Me.StatusStrip1.Invoke( MsgBox(i.ToString) System.Threading.Thread.Sleep(1000) Next End Sub
编辑:
解决方案是这样的:(A)将连接尝试和超时倒计时放入单独的线程中。
(B)像这样更新UI:If Me.InvokeRequired Then Me.Invoke(pUpdateStatusMessage, "Successfully connected.") Else UpdateStatusMessage("Successfully connected.") End If
通过全局声明,所以不需要传递参数:
Delegate Sub t_pUpdateStatusText(ByVal strMessage As String) Public pUpdateStatusMessage As t_pUpdateStatusText = New t_pUpdateStatusText(AddressOf UpdateStatusMessage) Public Sub UpdateStatusMessage(ByVal strMessage As String) Me.lblStatus.Text = strMessage Me.StatusStrip1.Update() End Sub
see below program. I start a new thread x with function abc, then I do some longer task. Why does x only start after end sub? Shouldn't it start right-away, before sleep ?
Private Sub Button1_Click(ByVal sender As System.Object, ByVal e As System.EventArgs) Handles Button1.Click Dim x As New Threading.Thread(AddressOf abc) x.SetApartmentState(Threading.ApartmentState.MTA) x.Start() System.Threading.Thread.Sleep(5000) End Sub Sub abc() For i As Integer = 0 To 10 Step 1 Me.lblStatus.Text = "Testing DB connection ( timeout in: " + i.ToString() + "s )" 'Me.StatusStrip1.Invoke( MsgBox(i.ToString) System.Threading.Thread.Sleep(1000) Next End Sub
Edit:
The solution is this:(A) Put both the connection attempts and the timeout countdown into separate threads.
(B) Update the UI like this:
If Me.InvokeRequired Then Me.Invoke(pUpdateStatusMessage, "Successfully connected.") Else UpdateStatusMessage("Successfully connected.") End If
With this globally declared, so no argument passing is necessary:
Delegate Sub t_pUpdateStatusText(ByVal strMessage As String) Public pUpdateStatusMessage As t_pUpdateStatusText = New t_pUpdateStatusText(AddressOf UpdateStatusMessage) Public Sub UpdateStatusMessage(ByVal strMessage As String) Me.lblStatus.Text = strMessage Me.StatusStrip1.Update() End Sub
原文:https://stackoverflow.com/questions/3856942
最满意答案
我不确定您的
XXX-channel
和适配器应该做什么,但是您需要将RejectAndDontRequeueRecoverer
添加到重试通知工厂bean(在messageRecoverer
属性中)。默认恢复仅记录重试已用尽并丢弃该消息。
编辑
这是一个自定义
MessageRecoverer
,它将队列A
的失败消息发布到名为A.dlq
的队列中 - 根据需要自动声明队列和绑定。/* * Copyright 2014-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.example; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer { public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace"; public static final String X_EXCEPTION_MESSAGE = "x-exception-message"; public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange"; public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey"; private final Log logger = LogFactory.getLog(getClass()); private final RabbitTemplate errorTemplate; private final RabbitAdmin admin; private final String deadLetterExchangeName = "DLX"; private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName); private boolean initialized; public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) { this.errorTemplate = errorTemplate; this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory()); } @Override public void recover(Message message, Throwable cause) { if (!this.initialized) { initialize(); } Map<String, Object> headers = message.getMessageProperties().getHeaders(); headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause)); headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange()); headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey()); String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq"; if (this.admin.getQueueProperties(dlqName) == null) { bindDlq(dlqName); } this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message); if (this.logger.isWarnEnabled()) { this.logger.warn("Republishing failed message to " + dlqName); } } private void initialize() { this.admin.declareExchange(this.deadletterExchange); this.initialized = true; } private void bindDlq(String dlqName) { Queue dlq = new Queue(dlqName); this.admin.declareQueue(dlq); this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName)); } private String getStackTraceAsString(Throwable cause) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter, true); cause.printStackTrace(printWriter); return stringWriter.getBuffer().toString(); } }
I am not sure what your
XXX-channel
and adapter are supposed to do, but you need to add aRejectAndDontRequeueRecoverer
to the retry advice factory bean (in themessageRecoverer
property).The default recover just logs that retries are exhausted and discards the message.
EDIT
Here is a custom
MessageRecoverer
that publishes a failed message from queueA
to a queue namedA.dlq
- the queue and binding are declared automatically, as needed./* * Copyright 2014-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.example; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer { public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace"; public static final String X_EXCEPTION_MESSAGE = "x-exception-message"; public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange"; public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey"; private final Log logger = LogFactory.getLog(getClass()); private final RabbitTemplate errorTemplate; private final RabbitAdmin admin; private final String deadLetterExchangeName = "DLX"; private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName); private boolean initialized; public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) { this.errorTemplate = errorTemplate; this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory()); } @Override public void recover(Message message, Throwable cause) { if (!this.initialized) { initialize(); } Map<String, Object> headers = message.getMessageProperties().getHeaders(); headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause)); headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange()); headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey()); String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq"; if (this.admin.getQueueProperties(dlqName) == null) { bindDlq(dlqName); } this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message); if (this.logger.isWarnEnabled()) { this.logger.warn("Republishing failed message to " + dlqName); } } private void initialize() { this.admin.declareExchange(this.deadletterExchange); this.initialized = true; } private void bindDlq(String dlqName) { Queue dlq = new Queue(dlqName); this.admin.declareQueue(dlq); this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName)); } private String getStackTraceAsString(Throwable cause) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter, true); cause.printStackTrace(printWriter); return stringWriter.getBuffer().toString(); } }
相关问答
更多-
Spring AMQP - 使用TTL使用死信机制的消息重排队(Spring AMQP - Message re queuing using dead letter mechanism with TTL)[2023-03-28]
如果您拒绝该消息以便代理将其路由到DLQ,则可以检查x-death标头。 在这种情况下,我有一个TTL为5秒的DLQ,并且来自主队列的消息的使用者拒绝它; 代理将其路由到DLQ,然后到期并被路由回主队列x-death头显示重新路由操作的数量: If you reject the message so the broker routes it to a DLQ, you can examine the x-death header. In this scenario, I have a DLQ with a ... -
请参阅Spring AMQP关于死书的内容 。 RejectAndDontRequeueRecoverer可以注入到RabbitTemplate的AmqpItemReader 。 因此,所有那些.receive()将被重试通知包装起来,并且在耗尽消息的情况下将被拒绝给配置的DLX。 Please, refer to Spring AMQP about Dead Lettering. There is RejectAndDontRequeueRecoverer which you can inject int ...
-
默认的replyTimeout是5秒。 如果消费者在5秒内没有回复,则取消出站网关的使用者,删除临时队列。 您可以通过在
(毫秒)上配置回复超时来增加超时。 The default replyTimeout is 5 seconds. If the consumer doesn't reply in 5 seconds, the outbound gateway's consumer is canceled, removing the temporary queue. Y ... -
你使用什么版本? 我刚刚用Spring Integration 5.0.2和Spring AMQP 2.0.2以及Spring Integration 4.3.14和Spring AMQP 1.7.6测试了channelTransacted=true ,并且一切按预期工作,失败的消息传到了DLQ。 @SpringBootApplication public class So48914749Application { public static void main(String[] args) { ...
-
您必须改用AmqpHeaders.RECEIVED_DELAY 。 由于您使用了正确的mapped-request-headers="*"因此默认的DefaultAmqpHeaderMapper正确映射: Integer receivedDelay = amqpMessageProperties.getReceivedDelay(); if (receivedDelay != null) { headers.put(AmqpHeaders.RECEIVED_DELAY, receivedDelay) ...
-
如果依赖关系在代码级别上,恐怕是不可能的。 Spring Integration 3.0与Spring Framework 4.x兼容,但4.x是您不使用Messaging。 在这种情况下,旧的org.springframework.integration.Message与新的org.springframework.messaging.Message不兼容。 在迁移指南中查看更多关于迁移的信息。 I'm afraid it isn't possible if the dependency is on the ...
-
尝试添加一个显式的x-dead-letter-routing-key - 否则使用与原始路由相同的密钥 - 并且没有扇出交换所需的路由密钥。 Try adding an explicit x-dead-letter-routing-key - otherwise the same key as the original route is used - and there's no routing key needed for a fanout exchange.
-
启用WCF跟踪 (使用switchValue="All" )后,我能够看到我收到以下异常: An error occurred while deserializing an MSMQ message's XML body. The message cannot be received. Ensure that the service contract is decorated with appropriate [ServiceKnownType] attributes or the TargetSerial ...
-
我不确定您的XXX-channel和适配器应该做什么,但是您需要将RejectAndDontRequeueRecoverer添加到重试通知工厂bean(在messageRecoverer属性中)。 默认恢复仅记录重试已用尽并丢弃该消息。 编辑 这是一个自定义MessageRecoverer ,它将队列A的失败消息发布到名为A.dlq的队列中 - 根据需要自动声明队列和绑定。 /* * Copyright 2014-2016 the original author or authors. * * Lic ...
-
所以,RabbitMQ的死书不是一个负面的承认 ,但它们可以是相关的。 您所描述的场景(假设我理解正确 - 您的描述非常抽象)是一种相当常见的情况,可以通过以下一系列事件来描述: 处理器从队列中获取消息 处理器尝试处理消息 处理器无法处理消息 ? 问题是在步骤4中要做什么。在许多应用程序中,步骤4是放弃该消息。 然而,在这种情况下,RabbitMQ允许使用否定确认,这告诉经纪人该消息不能被处理。 反过来,代理可以将消息放回队列的头部。 如果发生这种情况,没有什么可以阻止同一处理器再次接收消息并尝试处理它,因 ...