首页 \ 问答 \ eclipse中的scala插件怎么安装

eclipse中的scala插件怎么安装

更新时间:2022-03-26 20:03

最满意答案

连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。
我现在使用 samsa 这个 highlevel 库
Producer示例
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')

** Consumer示例 **
from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:
print msg

Tip
consumer 必需在 producer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。
在 Kafka 中一个 consumer 需要指定 groupname , groue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。
kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。

其他回答

建议使用专业的程序卸载工具,专业的卸载软件在卸载后能扫描软件安装目录和注册表等,能最大程度清理程序带来的垃圾,从而实现最干净的卸载。如果你希望系统保持干净快速稳定工作,这是较好的选择。如腾讯电脑管家,不仅可以卸载程序,软件本身还带有管理系统随机启动程序工具,加快系统启动速度。

相关问答

更多
  • 连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。 我现在使用 samsa 这个 highlevel 库 Producer示例 from kazoo.client import KazooClientfrom samsa.cluster import C ...
  • 不过要注意一些注意事项,对于多个partition和多个consumer 1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 最好partiton数目是consumer数目的整 ...
  • 不过要注意一些注意事项,对于多个partition和多个consumer 1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 最好partiton数目是consumer数目的整 ...
  • 连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafka 的 brokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。 我现在使用 samsa 这个 highlevel 库 Producer示例 from kazoo.client import KazooClientfrom samsa.cluster import C ...
  • 显然,你的制片人不需要连接到broker3 :) 我会尝试向您解释当您向Kafka制作数据时会发生什么情况: 你启动一些经纪人,比方说3,然后用2个分区创建一些主题foo ,复制因子2.很简单的例子,但对某人来说可能是真实的情况。 您可以为这些代理创建一个包含metadata.broker.list (或新生产者中的bootstrap.servers )的生产者。 值得一提的是,您不必指定群集中的所有代理,事实上,您只能指定其中的一个,它仍然可以工作。 我也会解释一下。 您使用您的制作人发送消息给主题foo ...
  • 处理程序的实现非常简单。 实际上,设置环境需要比执行处理程序更多的时间。 处理程序构造函数接受可选参数key 。 如果提供了,则写入的消息将被发送到由此密钥指定的单个分区。 如果未提供,则消息将按循环方式在服务器之间分发。 我没有做太多的测试,但很简单,我不明白这里会出现什么问题。 希望它会有用。 from kafka.client import KafkaClient from kafka.producer import SimpleProducer,KeyedProducer import loggin ...
  • 根据Dominic的评论,将kafka python升级到1.3.3为我解决了这个问题。 As per Dominic's comment, upgrading kafka python to 1.3.3 fixed the issue for me.
  • 了解Kafka更多信息的最简单方法是使用http://landoop.com/docs/lenses/developers 您将需要运行1个docker - 在本地调出所有内容,然后开发Python应用程序,使用通过Kafka API连接到Kafka的相应Kafka库并向其生成消息 一旦你构建了你的应用程序 - 然后你可以打包它并对你的Hadoop的Kafka经纪人运行它 Easiest way to learn more about Kafka is to use http://landoop.com/d ...
  • 看起来它无法连接到Schema Registry。 您确定已启动并运行并为您的消费者连接URL和端口吗? 尝试将以下内容添加到控制台使用者 --property schema.registry.url =架构注册表的地址 Looks like it can't connect to the Schema Registry. Are you sure you have it up and running and have the URL and port connect for your consumer? ...
  • 所以出于某种原因,这个问题出现在node-kafka,kafka-node和kafkaesque问题上,我不知道如何解决。 no-kafka与外界的关系很好。 So for some reason, this problem is on node-kafka, kafka-node and kafkaesque problem which I have no idea how to solve. no-kafka works fine with the outside world.

相关文章

更多

最新问答

更多
  • 您如何使用git diff文件,并将其应用于同一存储库的副本的本地分支?(How do you take a git diff file, and apply it to a local branch that is a copy of the same repository?)
  • 将长浮点值剪切为2个小数点并复制到字符数组(Cut Long Float Value to 2 decimal points and copy to Character Array)
  • OctoberCMS侧边栏不呈现(OctoberCMS Sidebar not rendering)
  • 页面加载后对象是否有资格进行垃圾回收?(Are objects eligible for garbage collection after the page loads?)
  • codeigniter中的语言不能按预期工作(language in codeigniter doesn' t work as expected)
  • 在计算机拍照在哪里进入
  • 使用cin.get()从c ++中的输入流中丢弃不需要的字符(Using cin.get() to discard unwanted characters from the input stream in c++)
  • No for循环将在for循环中运行。(No for loop will run inside for loop. Testing for primes)
  • 单页应用程序:页面重新加载(Single Page Application: page reload)
  • 在循环中选择具有相似模式的列名称(Selecting Column Name With Similar Pattern in a Loop)
  • System.StackOverflow错误(System.StackOverflow error)
  • KnockoutJS未在嵌套模板上应用beforeRemove和afterAdd(KnockoutJS not applying beforeRemove and afterAdd on nested templates)
  • 散列包括方法和/或嵌套属性(Hash include methods and/or nested attributes)
  • android - 如何避免使用Samsung RFS文件系统延迟/冻结?(android - how to avoid lag/freezes with Samsung RFS filesystem?)
  • TensorFlow:基于索引列表创建新张量(TensorFlow: Create a new tensor based on list of indices)
  • 企业安全培训的各项内容
  • 错误:RPC失败;(error: RPC failed; curl transfer closed with outstanding read data remaining)
  • C#类名中允许哪些字符?(What characters are allowed in C# class name?)
  • NumPy:将int64值存储在np.array中并使用dtype float64并将其转换回整数是否安全?(NumPy: Is it safe to store an int64 value in an np.array with dtype float64 and later convert it back to integer?)
  • 注销后如何隐藏导航portlet?(How to hide navigation portlet after logout?)
  • 将多个行和可变行移动到列(moving multiple and variable rows to columns)
  • 提交表单时忽略基础href,而不使用Javascript(ignore base href when submitting form, without using Javascript)
  • 对setOnInfoWindowClickListener的意图(Intent on setOnInfoWindowClickListener)
  • Angular $资源不会改变方法(Angular $resource doesn't change method)
  • 在Angular 5中不是一个函数(is not a function in Angular 5)
  • 如何配置Composite C1以将.m和桌面作为同一站点提供服务(How to configure Composite C1 to serve .m and desktop as the same site)
  • 不适用:悬停在悬停时:在元素之前[复制](Don't apply :hover when hovering on :before element [duplicate])
  • 常见的python rpc和cli接口(Common python rpc and cli interface)
  • Mysql DB单个字段匹配多个其他字段(Mysql DB single field matching to multiple other fields)
  • 产品页面上的Magento Up出售对齐问题(Magento Up sell alignment issue on the products page)