Kafka发送消费的路由

2019-03-08 07:26|来源: 网路

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。


在发送一条消息时,可以指定这条消息的keyProducer根据这个keyPartition机制来判断应该将这条消息发送到哪个ParitionParitition机制可以通过指定Producerparitition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)


import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class JasonPartitioner<T> implements Partitioner {
 
    public JasonPartitioner(VerifiableProperties verifiableProperties) {}
     
    @Override
    public int partition(Object key, int numPartitions) {
        try {
            int partitionNum = Integer.parseInt((String) key);
            return Math.abs(Integer.parseInt((String) key) % numPartitions);
        } catch (Exception e) {
            return Math.abs(key.hashCode() % numPartitions);
        }
    }
}


如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0123)至topic3(包含4Partition)。


public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
      List messageList = new ArrayList<KeyedMessage<String, String>>();
      for(int j = 0; j < 4; j++){
          messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
      }
      producer.send(messageList);
}
producer.close();
}


key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。



转自:http://www.jasongj.com/2015/03/10/KafkaColumn1/


相关问答

更多
  • 我认为你的zookeeper.session.timeout.ms被设置为低于5分钟。 从zookeeper设置中调整超时时间。 看它是否仍然失败。 如果是,那么你应该调整kafka配置的超时。 group.max.session.timeout.ms , group.max.session.timeout.ms , heartbeat.interval.ms应相应调整。 您的客户端休眠5分钟,在此期间,超出了其中一个超时值,并且Group Coordinator尝试重新平衡消费者,认为这些消费者已经失败。 ...
  • 像forever一样的目的是让你的节点应用程序始终运行(如果它会崩溃)。 这与这两个软件包做或不做的事情非常不同。 它们在节点应用程序中运行。 如果您希望它们始终运行,那么您需要将它们用于始终运行的节点应用程序。 你可以写一个坚如磐石的节点应用程序,它不会崩溃,所以它可以持续运行,或者你可以尝试这样做,也可以像永远一样运行,这样如果你的应用程序死亡,永远会自动重新启动它。 这些包是否使node.js始终订阅kafka? 还是我需要一些像永远或pm2的包来实现这个目标? forever和pm2号对kafka做 ...
  • 我想我明白了。 我让生产者只设置一个卡夫卡实例,所以如果它是脱机的,消息被缓存在某处 。 在它恢复在线之后,消息得到提交并正常交付。 I think I figured it out. I had producers set only to one kafka instance, so if it was offline, messages were cached somewhere. After it came back online, messages got commited and delivere ...
  • 你不能直接AFAIK这样做。 Kafka不提供将其数据镜像到不同数据存储的工具。 你还可以通过其他方式将消息从Kafka中提取出来并将其内容写入SQL。 一种常见的模式是使用Storm将数据从Kafka中提取出来,然后将其写入数据存储区,在本例中为MySQL。 You can't do this directly AFAIK. Kafka does not provide facilities to mirror its data to different datastores. You'll someth ...
  • 它的实现方式意味着一个分区只有一个消费者线程。 所以我想要10个消费者线程然后我需要10个主题分区。 我假设你想拥有比分区更多的消费者线程? Kafka不支持此功能。 在Kafka中,每个消费者都会监视其进度(即,它读取分区的内容 - 称为偏移 )本身。 如果你想要更多的消费者线程而不是分区,那些线程会以某种方式相互交谈以“分离”数据。 此模式是非标准的,不受支持。 它也不会很好地扩展,因为如果添加更多的消费者,同步开销会很高。 每个分区只有一个消费者线程使Kafka可以扩展。 最佳做法是对主题进行过度分区 ...
  • 这是否意味着每个组中的每个消费者都会读取所有分区中的所有记录?!! 否。该声明假定每个组只有一个消费者(如“ 所有消费者实例具有不同的消费者群体”所示)。 所以你的整体理解是正确的。 如果您有多个使用者组,则会向每个组发送一次消息。 does that mean that each consumer in each group will read all the records in all partitions ?!! No. The statement assumes that each group h ...
  • 我们肯定需要改进我们的文档,但这里有一些指针。 如果您希望使用者处理停止时生成的消息,则需要指定使用者组名称,例如spring.cloud.stream.bindings..group=foo 。 当指定了一个使用者组时,如果具有相同使用者组的客户端已经运行(即我们记录了该使用者消耗的偏移量)或者b) spring.cloud.stream.binder.kafka.start-offset指定的值,则应用程序启动a)最新的未使用消息spring.cloud.stream.bind ...
  • 您忘记在使用者参数中添加使用者组。 在您的情况下,两个使用者都属于一个使用者组(默认情况下称为kafka-node-group )。 拥有一个消费者群体意味着每个消费者群体将传递一次消息。 由于您的主题有0个分区,因此只有第一个消费者才会处理该消息。 如果要向两个消费者发送相同的消息,则必须有两个消费者组(每个组一个消费者)。 你可以这样做: let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {gro ...
  • Kafka有两个配置参数 - 一个是在回答消费者请求之前设置接收的最小数据量,另一个是设置在回答请求之前等待此数据到达的最大时间量。 您可以尝试添加以下选项: --consumer-property fetch.max.wait.ms=0 --consumer-property fetch.min.bytes=0 欲了解更多信息: 请查看此问题以获取详细信息。 检查kafka消费者配置 。 There are two configuration parameters for Kafka - one is ...
  • 您所要做的就是通过为它们分配相同的group.id来将消费者放在同一个使用者组中。 只有同一组中的一个消费者实例才会收到消息。 请参阅https://kafka.apache.org/documentation#intro_consumers上的文档 All you have to do is to put the consumers in the same consumer group by assigning the same group.id to them. Only one instance of ...