知识点

相关文章

更多

最近更新

更多

使用Shell客户端操作kafka

2019-03-04 07:58|来源: 网路

master上创建一个test1主题

/opt/kafka/kafka_2.10-0.9.0.1/bin/kafka-topics.sh --create --topic test1 --replication-factor 3 --partitions 2 --zookeeper master:2181


replication-factor的几个不能超过Kafkar集群的broker数据

Error while executing topic command : replication factor: 3 larger than available brokers: 2

[2016-09-25 07:56:34,226] ERROR kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 2


master上查看刚才创建的test1主题

bin/kafka-topics.sh --list --zookeeper master:2181

test1


zkCli上查看主题

[zk: localhost:2181(CONNECTED) 16] ls /brokers/topics

[test1]


slave1上发送消息至kafka,发送消息hello kafka

bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --sync --topic test1

hello kafka


master上开启一个消费者,可以看到刚才发送的消息

[hadoop@master kafka_2.10-0.9.0.1]$ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test1 --from-beginning

hello kafka


删除掉一个Topic

bin/kafka-topics.sh --delete --topic test1 --zookeeper master:2181

Topic test1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的


同时zookeeper(假设你的chroot就是/)的/admin/delete_topics下创建一个临时节点,名字就是topic名称

[zk: localhost:2181(CONNECTED) 21] ls /admin/delete_topics

[test1]

[zk: localhost:2181(CONNECTED) 22] ls /brokers/topics    

[test1]

在server.properties中设置delete.topic.enable=true,重启kafka就删除了


相关问答

更多
  • 用一个跟用户表关联的表记录用户选择的模块,系统启动的时候,去读这个表的记录。 第一次由于这个表里没记录,自然就不会有任何模块了。再用一个表记录上次访问的地址,也是跟用户关联的。
  • 用一个跟用户表关联的表记录用户选择的模块,系统启动的时候,去读这个表的记录。 第一次由于这个表里没记录,自然就不会有任何模块了。再用一个表记录上次访问的地址,也是跟用户关联的。
  • 进入hbase shell console $HBASE_HOME/bin/hbase shell 如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命令可查看当前用户 hbase(main)> whoami 表的管理 1)查看有哪些表 hbase(main)> list 2)创建表 # 语法:create , {NAME => , VERSIONS => } # 例如:创建表t1,有两个family ...
  • 您正在执行a_numThreads = 50然后Executors.newFixedThreadPool(a_numThreads); 是的,意味着你不能在任何时间点创建超过50个线程,至少不能与该执行者一起创建。 文档所说的是一个分区只能被分配给1个Stream,如果你不是创建50个流来创建51个流,后者将不会得到任何内容,如此处所述 The fact that you are doing a_numThreads = 50 and then Executors.newFixedThreadPool(a_ ...
  • 你的微服务可以做到这一点: 订阅一个Kafka主题并使用此主题中的消息并处理该消息(为此,您需要在微服务中使用group.id创建消费者) 以一个或多个主题发送消息来联系其他微服务(为此,您需要在微服务中创建一个或多个生产者) 我建议您阅读一些关于使用Kafka或旧时尚JMS兼容系统(例如ActiveMQ或RabbitMQ)的微服务架构的文章。 你可以从这个开始: https : //medium.com/@ulymarins/an-introduction-to-apache-kafka-and-micr ...
  • 我预计经纪人间的沟通将发生在29092年。 是的,他们使用29092进行内部沟通。 外部客户端应该能够在端口9093上连接。对于整个Kubernetes服务,我有一个外部IP,这意味着这是唯一应该从Kafka代理公开的外部IP。 据我所知,Kubernetes负载均衡器会将任何请求发送给我的经纪人之一。 是的,Kubernetes会将来自该服务的所有流量路由到您的经纪人之一,这是一个问题。 在内部,您使用无头服务来发现您的Kafka代理的地址,因此它们可以通过DNS名称kafka-[_NUM_OF_THE_ ...
  • Kafka Consumer不是线程安全的,所以它不应该是Singleton。 但是对于Producer,因为它是线程安全的,建议使用单个实例。 Kafka Consumer is not thread safe so it shouldn't be a Singleton. But in case of Producer, as it is thread safe and is recommended to have single instance.
  • Apache kafka有一个更好的新生产者客户端: org.apache.kafka kafka-clients 0.8.2.0 示例: https : //github.com/CameronGregory/kafka/blob/master/TestProducer.java 显然你的配置还可以。 ...
  • 对于简单的用例,我将使用Kafka发行版的标准Kafka客户端:Kafka Producer和Kafka Consumer。 编写简单的应用程序就足够了。 如果您想在Kafka之上构建复杂的流处理应用程序 - Kafka Streams库是最佳选择。 使用Kafka Streams,您将能够构建类似于Spark Streaming的非常复杂的应用程序(具有过滤器,地图,flatMap等功能)。 Kafka Connect是一种向/从Kafka导入/导出数据的方法。 例如,通过Kafka ElasticSea ...
  • 是的,使用开箱即用的工具可以根据SSL用户名设置ACL,但默认的Principal构建器使用的不仅仅是CN值较短的用户名。 来自https://docs.confluent.io/current/kafka/authorization.html上的Confluent Online文档 默认情况下,SSL用户名将为表单 CN = writeuser,OU =未知,O =未知,L =未知,ST =未知,C =未知 可以通过在server.properties中设置自定义的PrincipalBuilder来改变它 ...