知识点
相关文章
更多最近更新
更多使用Shell客户端操作kafka
2019-03-04 07:58|来源: 网路
在master上创建一个test1主题
/opt/kafka/kafka_2.10-0.9.0.1/bin/kafka-topics.sh --create --topic test1 --replication-factor 3 --partitions 2 --zookeeper master:2181
replication-factor的几个不能超过Kafkar集群的broker数据
Error while executing topic command : replication factor: 3 larger than available brokers: 2 [2016-09-25 07:56:34,226] ERROR kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 2 |
在master上查看刚才创建的test1主题
bin/kafka-topics.sh --list --zookeeper master:2181
test1
在zkCli上查看主题
[zk: localhost:2181(CONNECTED) 16] ls /brokers/topics
[test1]
在slave1上发送消息至kafka,发送消息“hello kafka”
bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --sync --topic test1
hello kafka
在master上开启一个消费者,可以看到刚才发送的消息
[hadoop@master kafka_2.10-0.9.0.1]$ bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test1 --from-beginning
hello kafka
删除掉一个Topic
bin/kafka-topics.sh --delete --topic test1 --zookeeper master:2181
Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的
同时在zookeeper(假设你的chroot就是/)的/admin/delete_topics下创建一个临时节点,名字就是topic名称
[zk: localhost:2181(CONNECTED) 21] ls /admin/delete_topics [test1] [zk: localhost:2181(CONNECTED) 22] ls /brokers/topics [test1]
在server.properties中设置delete.topic.enable=true,重启kafka就删除了 |
相关问答
更多-
kafka的java客户端是jms么[2021-11-06]
用一个跟用户表关联的表记录用户选择的模块,系统启动的时候,去读这个表的记录。 第一次由于这个表里没记录,自然就不会有任何模块了。再用一个表记录上次访问的地址,也是跟用户关联的。 -
kafka的java客户端是jms么[2023-08-21]
用一个跟用户表关联的表记录用户选择的模块,系统启动的时候,去读这个表的记录。 第一次由于这个表里没记录,自然就不会有任何模块了。再用一个表记录上次访问的地址,也是跟用户关联的。 -
需要安装什么使用hbase shell客户端工具[2023-07-20]
进入hbase shell console $HBASE_HOME/bin/hbase shell 如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命令可查看当前用户 hbase(main)> whoami 表的管理 1)查看有哪些表 hbase(main)> list 2)创建表 # 语法:create , {NAME => , VERSIONS => } # 例如:创建表t1,有两个family ... -
您正在执行a_numThreads = 50然后Executors.newFixedThreadPool(a_numThreads); 是的,意味着你不能在任何时间点创建超过50个线程,至少不能与该执行者一起创建。 文档所说的是一个分区只能被分配给1个Stream,如果你不是创建50个流来创建51个流,后者将不会得到任何内容,如此处所述 The fact that you are doing a_numThreads = 50 and then Executors.newFixedThreadPool(a_ ...
-
你的微服务可以做到这一点: 订阅一个Kafka主题并使用此主题中的消息并处理该消息(为此,您需要在微服务中使用group.id创建消费者) 以一个或多个主题发送消息来联系其他微服务(为此,您需要在微服务中创建一个或多个生产者) 我建议您阅读一些关于使用Kafka或旧时尚JMS兼容系统(例如ActiveMQ或RabbitMQ)的微服务架构的文章。 你可以从这个开始: https : //medium.com/@ulymarins/an-introduction-to-apache-kafka-and-micr ...
-
Kubernetes上的Kafka - 客户端无法检索元数据(Kafka on Kubernetes - Clients are unable to retrieve metadata)[2021-12-06]
我预计经纪人间的沟通将发生在29092年。 是的,他们使用29092进行内部沟通。 外部客户端应该能够在端口9093上连接。对于整个Kubernetes服务,我有一个外部IP,这意味着这是唯一应该从Kafka代理公开的外部IP。 据我所知,Kubernetes负载均衡器会将任何请求发送给我的经纪人之一。 是的,Kubernetes会将来自该服务的所有流量路由到您的经纪人之一,这是一个问题。 在内部,您使用无头服务来发现您的Kafka代理的地址,因此它们可以通过DNS名称kafka-[_NUM_OF_THE_ ... -
Kafka消费者客户端创建单例实例与静态方法(Kafka consumer client creation singleton instance vs static method)[2023-06-11]
Kafka Consumer不是线程安全的,所以它不应该是Singleton。 但是对于Producer,因为它是线程安全的,建议使用单个实例。 Kafka Consumer is not thread safe so it shouldn't be a Singleton. But in case of Producer, as it is thread safe and is recommended to have single instance. -
Apache kafka有一个更好的新生产者客户端:
org.apache.kafka kafka-clients 0.8.2.0 -
对于简单的用例,我将使用Kafka发行版的标准Kafka客户端:Kafka Producer和Kafka Consumer。 编写简单的应用程序就足够了。 如果您想在Kafka之上构建复杂的流处理应用程序 - Kafka Streams库是最佳选择。 使用Kafka Streams,您将能够构建类似于Spark Streaming的非常复杂的应用程序(具有过滤器,地图,flatMap等功能)。 Kafka Connect是一种向/从Kafka导入/导出数据的方法。 例如,通过Kafka ElasticSea ...
-
Kafka ACL基于客户端证书DN或其部分的主题(Kafka ACL to topic based on client certificate DN, or its part)[2024-03-24]
是的,使用开箱即用的工具可以根据SSL用户名设置ACL,但默认的Principal构建器使用的不仅仅是CN值较短的用户名。 来自https://docs.confluent.io/current/kafka/authorization.html上的Confluent Online文档 默认情况下,SSL用户名将为表单 CN = writeuser,OU =未知,O =未知,L =未知,ST =未知,C =未知 可以通过在server.properties中设置自定义的PrincipalBuilder来改变它 ...