使用oozie distcp-action将文件从hdfs目录复制到另一个目录(Copying files from a hdfs directory to another with oozie distcp-action)
我的行为
start_fair_usage
以状态okey结束,但test_copy
返回Main class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, null
在
/user/comverse/data/${1}_B
我有很多不同的文件,其中一些我想复制到${NAME_NODE}/user/evkuzmin/output
。 为此,我尝试从包含路径数组的copy_files.sh
传递路径到我需要的文件。<action name="start_fair_usage"> <shell xmlns="uri:oozie:shell-action:0.1"> <job-tracker>${JOB_TRACKER}</job-tracker> <name-node>${NAME_NODE}</name-node> <exec>${copy_file}</exec> <argument>${today_without_dash}</argument> <argument>${mta}</argument> <!-- <file>${path}#${start_fair_usage}</file> --> <file>${path}${copy_file}#${copy_file}</file> <capture-output/> </shell> <ok to="test_copy"/> <error to="KILL"/> </action> <action name="test_copy"> <distcp xmlns="uri:oozie:distcp-action:0.2"> <job-tracker>${JOB_TRACKER}</job-tracker> <name-node>${NAME_NODE}</name-node> <arg>${wf:actionData('start_fair_usage')['paths']}</arg> <!-- <arg>${NAME_NODE}/user/evkuzmin/input/*</arg> --> <arg>${NAME_NODE}/user/evkuzmin/output</arg> </distcp> <ok to="END"/> <error to="KILL"/> </action>
start_fair_usage
启动start_fair_usage
echo ${1} echo ${2} dirs=( /user/comverse/data/${1}_B ) args=() for i in $(hadoop fs -ls "${dirs[@]}" | egrep ${2}.gz | awk -F " " '{print $8}') do args+=("$i") echo "copy file - "${i} done paths=${args} echo ${paths}
My actions
start_fair_usage
ends with status okey, buttest_copy
returnsMain class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, null
In
/user/comverse/data/${1}_B
I have a lot of different files, some of which I want to copy to${NAME_NODE}/user/evkuzmin/output
. For that I try to passpaths
fromcopy_files.sh
which holds an array of paths to the files I need.<action name="start_fair_usage"> <shell xmlns="uri:oozie:shell-action:0.1"> <job-tracker>${JOB_TRACKER}</job-tracker> <name-node>${NAME_NODE}</name-node> <exec>${copy_file}</exec> <argument>${today_without_dash}</argument> <argument>${mta}</argument> <!-- <file>${path}#${start_fair_usage}</file> --> <file>${path}${copy_file}#${copy_file}</file> <capture-output/> </shell> <ok to="test_copy"/> <error to="KILL"/> </action> <action name="test_copy"> <distcp xmlns="uri:oozie:distcp-action:0.2"> <job-tracker>${JOB_TRACKER}</job-tracker> <name-node>${NAME_NODE}</name-node> <arg>${wf:actionData('start_fair_usage')['paths']}</arg> <!-- <arg>${NAME_NODE}/user/evkuzmin/input/*</arg> --> <arg>${NAME_NODE}/user/evkuzmin/output</arg> </distcp> <ok to="END"/> <error to="KILL"/> </action>
start_fair_usage
startscopy_file.sh
echo ${1} echo ${2} dirs=( /user/comverse/data/${1}_B ) args=() for i in $(hadoop fs -ls "${dirs[@]}" | egrep ${2}.gz | awk -F " " '{print $8}') do args+=("$i") echo "copy file - "${i} done paths=${args} echo ${paths}
原文:https://stackoverflow.com/questions/43257926
最满意答案
@AutoWired private CachingConnectionFactory connectionFactory;
调用
connectionFactory.createConnection().close()
来验证代理是否已启动。 如果不是,你会发现一个异常。一般来说,这不会打开一个新的连接,它只会检查共享(或缓存)的连接是否可用。
@AutoWired private CachingConnectionFactory connectionFactory;
Call
connectionFactory.createConnection().close()
to verify that the broker is up. If it's not you'll catch an exception.In general, this won't open a new connection, it will just check that the shared (or a cached) connection is available.
相关问答
更多-
RabbitMQ(或Spring云流)可以独占消息吗?(Can RabbitMQ (or spring cloud stream) consume messages exclusively?)[2023-02-27]
RabbitMQ(AMQP)不支持; 每个消费者都会收到prefetch消息。 它确实支持独占消费者,但这意味着consumer1将获得所有消息,而consumer2只会在consumer1消亡时获取消息。 但是,Spring Cloud Stream当前不提供用于设置该选项的属性。 RabbitMQ (AMQP) doesn't support that; each consumer gets prefetch messages. It does support exclusive consumers, ... -
编辑 您需要从pom中移除测试支持jar。 它的存在(在测试范围内)是触发器用测试活页夹代替真实活页夹的原因。 删除测试活页夹支持后,这对我来说工作正常... @RunWith(SpringRunner.class) @SpringBootTest public class So49816044ApplicationTests { @Autowired private Source source; @Autowired private AmqpAdmin admin; ...
-
在Spring云流中BROKER rabbitMq出现故障时如何处理?(How to handle when the BROKER rabbitMq is down in Spring cloud stream?)[2023-05-08]
@AutoWired private CachingConnectionFactory connectionFactory; 调用connectionFactory.createConnection().close()来验证代理是否已启动。 如果不是,你会发现一个异常。 一般来说,这不会打开一个新的连接,它只会检查共享(或缓存)的连接是否可用。 @AutoWired private CachingConnectionFactory connectionFactory; Call connectionFa ... -
STOMP目前不是受支持的活页夹协议。 STOMP is not currently a supported binder protocol.
-
Spring Cloud Stream + RabbitMQ Publisher确认(Spring Cloud Stream + RabbitMQ Publisher confirm)[2022-06-22]
没有; 它不被支持。 但是,您可以在生产者上设置transactional标志,如果发送失败,则会引发异常。 虽然发布商确认通常比交易“更快”,但只有当您发送一堆消息并稍后等待其确认时,情况才是如此。 由于我们在这里有一个小溪, 你会发送一条消息,然后等待确认。 这不会比仅仅使用交易快得多。 如果你想使用确认; 您将不得不直接使用RabbitTemplate或Spring Integration出站通道适配器,而不是活页夹。 No; it's not supported. You can, however, ... -
在Spring云流(RabbitMQ)中以编程方式声明绑定?(Programmatically declare bindings in Spring Cloud Stream (RabbitMQ)?)[2022-08-20]
您可以使用交换 - 交换绑定来满足此要求。 x -> z y -> z 然后从与#绑定的z上的单个队列中消耗。 您可以定义交换以在您的引导应用程序@Bean绑定交换为@Bean 。 You can use exchange-to-exchange binding to satisfy this requirement. x -> z y -> z Then consume from a single queue on z that is bound with #. You can define the ... -
Spring Cloud Stream允许您使用Spring Cloud Stream Binder实现(Kafka,RabbitMQ,JMS绑定器等)将应用程序连接(通过@EnableBinding )到外部消息传递系统,从而开发事件驱动的微服务应用程序。 显然,Spring Cloud Stream使用Spring AMQP进行RabbitMQ绑定器实现。 BinderAwareChannelResolver适用于动态绑定对生产者的支持,我认为在您的情况下,它是关于配置消费者与该交换的交换和绑定。 例如 ...
-
您可以通过将spring.cloud.stream.dynamicDestinations属性设置为目标名称列表(如果您事先知道名称)或将其保持为空来实现。 BinderAwareChannelResolver负责动态创建/绑定这些动态目标的出站通道。 有一个开箱即用的router 应用程序可以做类似的事情。 You can achieve that by setting spring.cloud.stream.dynamicDestinations property to a list of destin ...
-
Spring Boot - RabbitMQ - 处理经纪人失败的场景(Spring Boot - RabbitMQ - Handle Scenario where broker is down)[2023-11-23]
您可以将RetryTemplate添加到RabbitTemplate bean。 将RecoveryCallback实现添加到重试模板,该模板忽略(或记录)错误。 它会尝试连接,但不会失败。 如果在Spring AMQP之上使用Spring Integration层 ,则可以向出站通道适配器添加断路器请求处理程序建议 。 You can add a RetryTemplate to the RabbitTemplate bean. Add a RecoveryCallback implementation ... -
如何使用Spring Cloud Stream app starter TCP处理消息(How to handle message using Spring Cloud Stream app starter TCP)[2023-06-02]
请参阅聚合 。 您可以使用源和处理器创建聚合应用程序。 Spring Cloud Stream支持将多个应用程序聚合在一起,直接连接其输入和输出通道,并避免通过代理交换消息的额外成本。 从Spring Cloud Stream 1.0版开始,仅支持以下类型的应用程序的聚合: 来源,汇,处理器...... 它们可以通过创建一系列互连的应用程序聚合在一起,其中序列中元素的输出通道连接到下一个元素的输入通道(如果存在)。 序列可以从源或处理器开始,它可以包含任意数量的处理器,并且必须以处理器或接收器结束。 编辑 ...