kafka高级教程
kafka简介与部署
java客户端调用kafka
知识点
相关文章
更多最近更新
更多使用spring-kafka操作kafka
2019-03-05 06:51|来源: 网路
添加依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
消息生产者
消息生产者spring配置
spring-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="classpath:kafka.properties" /> <!-- 定义producer的参数 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}" /> <entry key="group.id" value="0" /> <entry key="retries" value="10" /> <entry key="batch.size" value="16384" /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 创建kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="test1" /> </bean> </beans>
消息生产者测试类
KafkaProducerTest.java
package com._656463.demo.kafka.springkafka; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath*:spring/spring-producer.xml") public class KafkaProducerTest { @Autowired private KafkaTemplate<Integer, String> kafkaTemplate; @Test public void testTemplateSend() { kafkaTemplate.send("test1", "www.656463.com"); } }
消息消费者
消息消费者spring配置
spring-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:property-placeholder location="classpath:kafka.properties" /> <!-- 定义consumer的参数 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}" /> <entry key="group.id" value="0" /> <entry key="enable.auto.commit" value="true" /> <entry key="auto.commit.interval.ms" value="1000" /> <entry key="session.timeout.ms" value="15000" /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!-- 创建consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <!-- 实际执行消息消费的类 --> <bean id="messageListernerConsumerService" class="com._656463.demo.kafka.springkafka.KafkaConsumerListener" /> <!-- 消费者容器配置信息 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="test1" /> <property name="messageListener" ref="messageListernerConsumerService" /> </bean> <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> </bean> </beans>
Kafka消费者监听器
KafkaConsumerListener.java
package com._656463.demo.kafka.springkafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.MessageListener; public class KafkaConsumerListener implements MessageListener<Integer, String> { @Override public void onMessage(ConsumerRecord<Integer, String> record) { System.out.println(record); } }
启动spring容器测试消息消费
KafkaConsumerTest.java
package com._656463.demo.kafka.springkafka; import org.springframework.beans.BeansException; import org.springframework.context.support.ClassPathXmlApplicationContext; public class KafkaConsumerTest { public static void main(String[] args) { try { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "classpath:spring/spring-consumer.xml"); context.start(); } catch (BeansException e) { e.printStackTrace(); } synchronized (KafkaConsumerTest.class) { while (true) { try { KafkaConsumerTest.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
书生参考网络整理
相关问答
更多-
spring-kafka 消费时出错怎么办[2022-02-24]
Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息 -
首先感谢您对这些项目的关注! Spring Integration Kafka 1.x完全适用于Apache Kafka <0.9。 对于0.8.x要严格。 Spring Kafka 1.0(目前在M1中)完全适用于Kafka-0.9。 我们希望我们能够与Kafka 0.10及其后的产品兼容。 考虑到这一点,Spring Integration Kafka 2.0基于Spring Kafka项目,与Apache Kafka <0.9不兼容。 Kafka Client 0.8(以及Spring Integra ...
-
这是一种方法 - 在您的配置中添加EnvironmentAware bean ... @SpringBootApplication public class So43191948Application implements EnvironmentAware { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplica ...
-
您必须将容器工厂的containerProperties ackMode设置为MANUAL或MANUAL_IMMEDIATE以获取Acknowledgment对象。 对于其他ack模式,容器负责提交偏移量。 factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE) 或者如果使用Spring Boot,则设置....ackMode属性 You have to the set the container factory's co ...
-
Spring集成Kafka 2.0构建于Spring Kafka之上(Spring Integration Kafka 1.x直接使用0.8.xx scala客户端)。 Spring Integration Kafka的文档在Spring Kafka参考手册的第5章中 。 在某些时候,spring-integration-kafka可能会被纳入主要的Spring集成项目/文档中。 Spring Integration Kafka 2.0 is built on top of Spring Kafka (Sp ...
-
正确; 只有1人会得到它。 它只会停止本地容器 - Spring对你的其他实例一无所知。 由于您有ackOnError=false ,因此不会提交偏移量。 消费者不需要知道消息的发布频率。 您无法在运行时更改它们,但您可以使用属性占位符${...}或Spel Expressions #{...}在应用程序初始化期间进行设置。 Correct; only 1 will get it. It will only stop the local containers - Spring doesn't know an ...
-
这意味着支持的最高版本确实是0.10.xx的最新版本 但它仍然与Apache Kafka 0.9.0.1兼容。 你可以拥有的最好的东西是来自Spring Kafka的传递依赖:
org.apache.kafka kafka-clients 0.10.1.1 compile < ... -
我在POM中喜欢这样降级春季卡夫卡:
2.0.4.RELEASE -
考虑为每个所需的分区声明几个@KafkaListener方法。 为此,您应该使用topicPartitions属性而不是topics : /** * Used to add topic/partition information to a {@code KafkaListener}. * */ @Target({}) @Retention(RetentionPolicy.RUNTIME) public @interface TopicPartition { Consider to declare s ...
-
您可以将RebalanceListener添加到传递给构造函数的容器的ContainerProperties中。 You can add a RebalanceListener to the container's ContainerProperties that are passed into the constructor.