Spring-kafka高容量加工(Spring-kafka high volume processing)
使用spring-kafka 1.0.5,我正在使用10个分区的并发度为10的繁忙主题。
我当前的代码根据分区ID向队列添加消息,这些消息都保存在HashMap中。
@KafkaListener(topics = "${kafka.topic}") public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { //Pseudo code add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition. }
不幸的是,这种设计需要花费两倍的处理时间来进行简单的消费。
我的要求是单独处理分区,但如何避免使用基于@KafkaListener的对分区的引用的hashmap。
有没有更有效的方法来解决这个问题? 理想情况下,来自侦听器注释的每个线程都将管理自己的列表。 有没有办法在没有交叉引用的情况下执行此操作,例如基于分区ID的上述hashmap?
Using spring-kafka 1.0.5, I am consuming from a busy topic with 10 partitions with a concurrency of 10.
My current code adds a message to a queue based on the partition ID which are both persisted in a HashMap.
@KafkaListener(topics = "${kafka.topic}") public void onMessage(ConsumerRecord consumerRecord, Acknowledgment acknowledgment) { //Pseudo code add to Hashmap<Integer, Queue<ConsumerRecord>> based on partition. }
Unfortunately, that design is taking twice the processing time a simple consumption would take.
My requirement is to process partitions separately but how can avoid having a hashmap with a reference to a partition based on the @KafkaListener.
Is there a more efficient way of going about this? Ideally, each thread from the listener annotation would manage its own list. Is there a way to do that without having a cross reference such as the hashmap mentioned above based on the partition ID?
原文:https://stackoverflow.com/questions/41128122
最满意答案
batch:input
需要为batch:input
生成Collection,Iterable等batch:process-records
工作,因为它分别对每个记录起作用。 您只是传递整个文件流。同样在上面的用例中,根本不需要批处理模块,除非您计划添加每个记录处理。
有关所有步骤如何工作的更多信息: https : //docs.mulesoft.com/mule-user-guide/v/3.7/batch-processing
batch:input
needs to produce a Collection, Iterable etc forbatch:process-records
to work as it works on each record individually. You are just passing the entire file stream.Also in your use-case above there is no need for the batch module at all, unless you plan on adding per record processing.
More information on how all the steps works here:https://docs.mulesoft.com/mule-user-guide/v/3.7/batch-processing
相关问答
更多-
我只是看着代码,问题似乎是AutomaticJobRegistrar使用上下文刷新事件来加载作业; 它应该真正实施SmartLifecycle并开始“早期”阶段。 Spring集成组件实现了SmartLifecycle和入站端点(如兔端点)在后期开始。 我建议你打开JIRA反对批处理 - 在AutomaticJobRegistrar代码中有一个TODO: // TODO: With Spring 3 a SmartLifecycle is started automatically 作为解决方法,您可以在 ...
-
有不同的方法来实现你正在寻找的东西。 我的解决方案 @ECHO OFF START "Proxy" sahi.bat START /WAIT testrunner.bat tests.suite http://website.fr/ firefox && taskkill /FI "WINDOWTITLE eq
: Proxy" START "Proxy" sahi.bat将使用您的代理启动一个新的控制台。 窗口标题将是用户名 :代理 。 如果您不确定窗口标题,只需尝试使用START“ ... -
以下是针对有效Google查询扩展的代码(对于所有可能的查询都不完整): @echo off setlocal EnableDelayedExpansion set "SearchUrl=https://www.google.co.uk/search?q=" set /p "Input=Enter search query: " set "SearchUrl=%SearchUrl%!Input: =+!" set "SearchUrl=!SearchUrl:"=%%22!" start "Google Se ...
-
这可能有点矫枉过正,但如果第二个批处理文件应该始终以管理员身份运行,则可以在其开头粘贴以下代码: @echo off & setlocal EnableDelayedExpansion IF "%PROCESSOR_ARCHITECTURE%" EQU "amd64" ( >NUL 2>&1 "%SYSTEMROOT%\SysWOW64\cacls.exe" "%SYSTEMROOT%\SysWOW64\config\system" ) ELSE ( >NUL 2>&1 "%SYSTEMRO ...
-
简单的答案。 在运行命令的窗口上使用“Rcmd”而不是“R CMD”。 有一个单独的exe文件来运行命令。 查看R安装的bin文件夹。 Simple answer. On windows when running command use "Rcmd" not "R CMD". There is a separate exe for running the commands. Look in the bin folder of your R installation.
-
这取决于你的读者。 ItemReader实现负责通过ItemStream接口回调在ExecutionContext保持自己的状态。 如果ItemReader通过ItemReader保持了它的状态(在这种情况下我假设的行号),并且如果ItemReader通过ItemSteam#open回调方法恢复该状态,那么我希望它在正确的情况下重新启动行。 所有Spring提供的ItemReader实现都在有意义的地方使用了这个功能。 由于我对你的配置中myLookupItemReader内容没有任何了解,我无法判断你的是 ...
-
骡子批次无法启动(Mule batch will not start)[2022-03-18]
batch:input需要为batch:input生成Collection,Iterable等batch:process-records工作,因为它分别对每个记录起作用。 您只是传递整个文件流。 同样在上面的用例中,根本不需要批处理模块,除非您计划添加每个记录处理。 有关所有步骤如何工作的更多信息: https : //docs.mulesoft.com/mule-user-guide/v/3.7/batch-processing batch:input needs to produce a Collect ... -
经过一番调查后,我发现你可以为JobLauncher配置一个TaskExecuter。 然后,您可以使用SimpleAsyncTaskExector并将其配置为deamon并设置线程优先级。 @Bean public JobLauncher jobLauncher(final JobRepository jobRepository, final TaskExecutor taskExecutor) { final SimpleJobLauncher jobLauncher = new Simple ...
-
我想你正试图这样做: @echo off start /wait custom.bin echo custom.bin has finished running pause 这将启动custom.bin,并等待它关闭,然后继续运行批处理文件的其余部分 I think you're trying to do something like this: @echo off start /wait custom.bin echo custom.bin has finished running pause Thi ...
-
使用start命令启动具有多个参数的另一个批处理文件?(Start another batch file with multiple parameters with the start command?)[2022-06-05]
如何读取批处理文件中的参数: 来电批次 start "" "%dirofbatch%data1.exe" "%downloc%" "%dirofbatch%" "%lver%" "%lget%" 叫批处理 set "parm1=%~1" set "parm2=%~2" set "parm3=%~3" set "parm4=%~4" echo %parm1% %parm2% %parm3% %parm4% How to read parameters in a batch file: caller bat ...