[置顶] _00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合

2019-03-02 23:56|来源: 网路

博文作者: 妳那伊抹微笑
博客地址: http://blog.csdn.net/u012185296
博文标题: _00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
个性签名: 世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向: Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明: 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群: 214293307  云计算之嫣然伊笑(期待与你一起学习,共同进步)


# Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合

# 学习前言 
框架整合中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。

本博文整合Flume+Kafka+Storm中的Eclipse工程代码下载地址http://download.csdn.net/detail/u012185296/7633405

# Flume的学习请参考_00016 Flume的体系结构介绍以及Flume入门案例(往HDFS上传数据)这篇博文

# Kafka的学习请参考_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)这篇博文

# Storm的学习请参考_00019 Storm的体系结构介绍以及Storm入门案例(官网上的简单Java案例)这篇博文

请学习了以上直来再来进行环境整合,你懂的,不解释 ...(纳尼?路都不会走你就想跑了,饿特么就是一巴掌 、、、)

# 整合场景

使用Flume监控指定目录,出现新的日志文件后将文件数据传到Kafka,最后由StormKafka中取出数据并显示、、、

# Flume+Kafka的整合

# Flume的fks001.conf的配置文件

监控指定目录/usr/local/yting/flume/tdata/tdir1,然后使用自定义的Sink(com.yting.cloud.flume.sink.KafkaSink),将数据传入Kafka

 

root@rs229 fks]# pwd

/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf/ytconf/fks

[root@rs229 fks]# vi fks001.conf

# fks : yting yousmile flume kafka storm integration

fks.sources=source1

fks.sinks=sink1

fks.channels=channel1

 

# configure source1

fks.sources.source1.type=spooldir

fks.sources.source1.spoolDir=/usr/local/yting/flume/tdata/tdir1

fks.sources.source1.fileHeader = false

 

# configure sink1

fks.sinks.sink1.type=com.yting.cloud.flume.sink.KafkaSink    #(自定义SinkFlume监控数据传入Kafka

 

# configure channel1

fks.channels.channel1.type=file

fks.channels.channel1.checkpointDir=/usr/local/yting/flume/checkpointdir/tcpdir/example_fks_001

fks.channels.channel1.dataDirs=/usr/local/yting/flume/datadirs/tddirs/example_fks_001

 

# bind source and sink

fks.sources.source1.channels=channel1

fks.sinks.sink1.channel=channel1

# Kafka 的server.properties配置文件

[root@rs229 ytconf]# pwd

/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config/ytconf

[root@rs229 ytconf]# vi server.properties

 

# A comma seperated list of directories under whichto store log files

# log.dirs=/tmp/kafka-logs

log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs

# root directory for all kafka znodes.

zookeeper.connect=rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka

# jar包的复制,Kafka中的jar包复制到Flume中去,因为自定义的Sink(com.yting.cloud.flume.sink.KafkaSink)会用到,如果不做这一步,会抱异常的!

[root@rs229 lib]# pwd

/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/lib

[root@rs229 lib]# cp/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/libs/* .

# 使用Eclipse将自定义的Sink(com.yting.cloud.flume.sink.KafkaSink)打成jar包放入$FLUME_HOME/libs目录下去

纳尼?这里不会,那你还是跟饿学养猪吧 、、、

# Kafka的启动

[root@rs229 kafka_2.9.2-0.8.1.1]# pwd

/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1

[root@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/ytconf/server.properties&

[1] 24672

[root@rs229 kafka_2.9.2-0.8.1.1]# [2014-07-1411:48:24,533] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,572] INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,572] INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,572] INFO Property log.dirs isoverridden to/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs(kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,572] INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,572] INFO Propertylog.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,573] INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,573] INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,573] INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,573] INFO Property num.partitionsis overridden to 2 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,573] INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,573] INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,574] INFO Propertysocket.request.max.bytes is overridden to 104857600(kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,574] INFO Propertysocket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,574] INFO Propertyzookeeper.connect is overridden tors229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka(kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,574] INFO Property zookeeper.connection.timeout.msis overridden to 1000000 (kafka.utils.VerifiableProperties)

[2014-07-14 11:48:24,590] INFO [Kafka Server 0],starting (kafka.server.KafkaServer)

[2014-07-14 11:48:24,592] INFO [Kafka Server 0],Connecting to zookeeper on rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka(kafka.server.KafkaServer)

[2014-07-14 11:48:24,603] INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)

[2014-07-14 11:48:24,610] INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.class.path=:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Client environment:user.home=/root(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,610] INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,611] INFO Initiating client connection,connectString=rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafkasessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9(org.apache.zookeeper.ZooKeeper)

[2014-07-14 11:48:24,625] INFO Opening socketconnection to server rs198/116.255.234.198:2181(org.apache.zookeeper.ClientCnxn)

[2014-07-14 11:48:24,631] INFO Socket connectionestablished to rs198/116.255.234.198:2181, initiating session(org.apache.zookeeper.ClientCnxn)

[2014-07-14 11:48:24,642] INFO Session establishmentcomplete on server rs198/116.255.234.198:2181, sessionid = 0xc6472c07f50b0000,negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)

[2014-07-14 11:48:24,645] INFO zookeeper statechanged (SyncConnected) (org.I0Itec.zkclient.ZkClient)

[2014-07-14 11:48:24,892] INFO Found clean shutdownfile. Skipping recovery for all logs in data directory'/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs'(kafka.log.LogManager)

[2014-07-14 11:48:24,894] INFO Loading log'flume-kafka-storm-001-0' (kafka.log.LogManager)

[2014-07-14 11:48:24,945] INFO Completed load of logflume-kafka-storm-001-0 with log end offset 18 (kafka.log.Log)

SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) loggerimplementation

SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

[2014-07-14 11:48:24,966] INFO Loading log'flume-kafka-storm-001-1' (kafka.log.LogManager)

[2014-07-14 11:48:24,969] INFO Completed load of logflume-kafka-storm-001-1 with log end offset 7 (kafka.log.Log)

[2014-07-14 11:48:24,970] INFO Loading log'test001-1' (kafka.log.LogManager)

[2014-07-14 11:48:24,973] INFO Completed load of logtest001-1 with log end offset 0 (kafka.log.Log)

[2014-07-14 11:48:24,974] INFO Loading log'test003-1' (kafka.log.LogManager)

[2014-07-14 11:48:24,976] INFO Completed load of logtest003-1 with log end offset 47 (kafka.log.Log)

[2014-07-14 11:48:24,977] INFO Loading log'test004-0' (kafka.log.LogManager)

[2014-07-14 11:48:24,980] INFO Completed load of logtest004-0 with log end offset 51 (kafka.log.Log)

[2014-07-14 11:48:24,981] INFO Loading log'test004-1' (kafka.log.LogManager)

[2014-07-14 11:48:24,984] INFO Completed load of logtest004-1 with log end offset 49 (kafka.log.Log)

[2014-07-14 11:48:24,985] INFO Loading log'test002-0' (kafka.log.LogManager)

[2014-07-14 11:48:24,987] INFO Completed load of logtest002-0 with log end offset 0 (kafka.log.Log)

[2014-07-14 11:48:24,987] INFO Loading log'test001-0' (kafka.log.LogManager)

[2014-07-14 11:48:24,991] INFO Completed load of logtest001-0 with log end offset 0 (kafka.log.Log)

[2014-07-14 11:48:24,991] INFO Loading log'test002-1' (kafka.log.LogManager)

[2014-07-14 11:48:24,993] INFO Completed load of logtest002-1 with log end offset 0 (kafka.log.Log)

[2014-07-14 11:48:24,994] INFO Loading log'test003-0' (kafka.log.LogManager)

[2014-07-14 11:48:24,997] INFO Completed load of logtest003-0 with log end offset 53 (kafka.log.Log)

[2014-07-14 11:48:24,999] INFO Starting log cleanup witha period of 60000 ms. (kafka.log.LogManager)

[2014-07-14 11:48:25,003] INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)

[2014-07-14 11:48:25,031] INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)

[2014-07-14 11:48:25,032] INFO [Socket Server onBroker 0], Started (kafka.network.SocketServer)

[2014-07-14 11:48:25,143] INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)

[2014-07-14 11:48:25,163] INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)

[2014-07-14 11:48:25,639] INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2014-07-14 11:48:25,645] INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)

[2014-07-14 11:48:25,660] INFO [Kafka Server 0],started (kafka.server.KafkaServer)

[2014-07-14 11:48:25,942] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions [test001,0],[test003,1],[test003,0],[flume-kafka-storm-001,1],[flume-kafka-storm-001,0],[test004,1],[test004,0],[test001,1],[test002,0],[test002,1](kafka.server.ReplicaFetcherManager)

[2014-07-14 11:48:26,045] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions[test001,0],[test003,1],[test003,0],[flume-kafka-storm-001,1],[flume-kafka-storm-001,0],[test004,1],[test004,0],[test001,1],[test002,0],[test002,1](kafka.server.ReplicaFetcherManager)

# Flume的启动

[root@rs229 apache-flume-1.5.0-bin]# bin/flume-ng agent -n fks -c conf/ -fconf/ytconf/fks/fks001.conf -Dflume.root.logger=INFO,console &

2014-07-14 11:50:13,882 (lifecycleSupervisor-1-0)[INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)]SpoolDirectorySource source starting with directory:/usr/local/yting/flume/tdata/tdir1

2014-07-14 11:50:13,912 (lifecycleSupervisor-1-0)[INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]Monitored counter group for type: SOURCE, name:source1: Successfully registered new MBean.

2014-07-14 11:50:13,916 (lifecycleSupervisor-1-0)[INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Component type: SOURCE, name: source1 started

2014-07-14 11:50:13,916 (pool-4-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

2014-07-14 11:50:14,417 (pool-4-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.

这样Flume就算启动成功了,并且如果你的监控目录下出现新的日志文件的话,日志文件中的信息会传到Kafka中去,你懂的!

# 在Flume的监控目录下新建一个文件试试

[root@rs229 ytconf]# cd/usr/local/yting/flume/tdata/tdir1/

[root@rs229 tdir1]# ll

total 0

[root@rs229 tdir1]# vi yousmile.log

The you smile until forever .....................

[root@rs229 tdir1]# ll

total 1

-rw-r--r-- 1 root root   50 Jul 14 13:57 yousmile.log.COMPLETED(说明已经被Flume处理过了)

[root@rs229 tdir1]#

# Eclipse下链接服务器查看Flume的自定义Sink是否将数据传到Kafka中去了

# MySimpleConsumer.java(Eclipse下运行即可得到结果)

package com.yting.cloud.kafa.consumer;

 

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.*;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

 

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

/**

 * Kafka官网给的案例SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码

 *

 * @Author 王扬庭

 * @Time2014-07-14

 *

 */

public class MySimpleConsumer {

       publicstatic void main(String args[]) {

              MySimpleConsumerexample = new MySimpleConsumer();

              //long maxReads = Long.parseLong(args[0]);

              //String topic = args[1];

              //int partition = Integer.parseInt(args[2]);

              //seeds.add(args[3]);

              //int port = Integer.parseInt(args[4]);

              longmaxReads = 100;

//            Stringtopic = "yting_page_visits";

//            Stringtopic = "test003";

              Stringtopic = "flume-kafka-storm-001";

//            intpartition = 0;

              intpartition = 1; // The you smile until forever .....................日志文件中的这条信息被送到分区1中去了,默认2分区

              List<String>seeds = new ArrayList<String>();

//            seeds.add("rs229");

              seeds.add("rs229");

              seeds.add("rs227");

              seeds.add("rs226");

              seeds.add("rs198");

              seeds.add("rs197");

              intport = Integer.parseInt("9092");

              try{

                     example.run(maxReads,topic, partition, seeds, port);

              }catch (Exception e) {

                     System.out.println("Oops:"+ e);

                     e.printStackTrace();

              }

       }

 

       privateList<String> m_replicaBrokers = new ArrayList<String>();

 

       publicMySimpleConsumer() {

              m_replicaBrokers= new ArrayList<String>();

       }

 

       publicvoid run(long a_maxReads, String a_topic, int a_partition, List<String>a_seedBrokers, int a_port) throws Exception {

              //find the meta data about the topic and partition we are interested in

              //

              PartitionMetadatametadata = findLeader(a_seedBrokers, a_port, a_topic,

                            a_partition);

              if(metadata == null) {

                     System.out.println("Can'tfind metadata for Topic and Partition. Exiting");

                     return;

              }

              if(metadata.leader() == null) {

                     System.out.println("Can'tfind Leader for Topic and Partition. Exiting");

                     return;

              }

              StringleadBroker = metadata.leader().host();

              StringclientName = "Client_" + a_topic + "_" + a_partition;

 

              SimpleConsumerconsumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,clientName);

              longreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.EarliestTime(), clientName);

 

              intnumErrors = 0;

              while(a_maxReads > 0) {

                     if(consumer == null) {

                            consumer= new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);

                     }

                     FetchRequestreq = new FetchRequestBuilder().clientId(clientName)

                                   .addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka

                                   .build();

                     FetchResponsefetchResponse = consumer.fetch(req);

 

                     if(fetchResponse.hasError()) {

                            numErrors++;

                            //Something went wrong!

                            shortcode = fetchResponse.errorCode(a_topic, a_partition);

                            System.out.println("Errorfetching data from the Broker:" + leadBroker + " Reason: " +code);

                            if(numErrors > 5)

                                   break;

                            if(code == ErrorMapping.OffsetOutOfRangeCode()) {

                                   //We asked for an invalid offset. For simple case ask for

                                   //the last element to reset

                                   readOffset= getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);

                                   continue;

                            }

                            consumer.close();

                            consumer= null;

                            leadBroker= findNewLeader(leadBroker, a_topic, a_partition, a_port);

                            continue;

                     }

                     numErrors= 0;

 

                     longnumRead = 0;

                     for(MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,a_partition)) {

                            longcurrentOffset = messageAndOffset.offset();

                            if(currentOffset < readOffset) {

                                   System.out.println("Foundan old offset: " + currentOffset + " Expecting: " + readOffset);

                                   continue;

                            }

                            readOffset =messageAndOffset.nextOffset();

                            ByteBufferpayload = messageAndOffset.message().payload();

 

                            byte[]bytes = new byte[payload.limit()];

                            payload.get(bytes);

                            System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + new String(bytes, "UTF-8"));

                            numRead++;

                            a_maxReads--;

                     }

 

                     if(numRead == 0) {

                            try{

                                   Thread.sleep(1000);

                            }catch (InterruptedException ie) {

                            }

                     }

              }

              if(consumer != null)

                     consumer.close();

       }

 

       publicstatic long getLastOffset(SimpleConsumer consumer, String topic,

                     intpartition, long whichTime, String clientName) {

              TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic, partition);

              Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();

              requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(whichTime, 1));

              kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);

              OffsetResponseresponse = consumer.getOffsetsBefore(request);

 

              if(response.hasError()) {

                     System.out.println("Errorfetching data Offset Data the Broker. Reason: "+ response.errorCode(topic,partition));

                     return0;

              }

              long[]offsets = response.offsets(topic, partition);

              returnoffsets[0];

       }

 

       privateString findNewLeader(String a_oldLeader, String a_topic,

                     inta_partition, int a_port) throws Exception {

              for(int i = 0; i < 3; i++) {

                     booleangoToSleep = false;

                     PartitionMetadatametadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);

                     if(metadata == null) {

                            goToSleep= true;

                     }else if (metadata.leader() == null) {

                            goToSleep= true;

                     }else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())

                                   &&i == 0) {

                            //first time through if the leader hasn't changed give

                            //ZooKeeper a second to recover

                            //second time, assume the broker did recover before failover,

                            //or it was a non-Broker issue

                            //

                            goToSleep= true;

                     }else {

                            returnmetadata.leader().host();

                     }

                     if(goToSleep) {

                            try{

                                   Thread.sleep(1000);

                            }catch (InterruptedException ie) {

                            }

                     }

              }

              System.out.println("Unableto find new leader after Broker failure. Exiting");

              thrownew Exception("Unable to find new leader after Broker failure.Exiting");

       }

 

       privatePartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,String a_topic, int a_partition) {

              PartitionMetadatareturnMetaData = null;

              loop:for (String seed : a_seedBrokers) {

                     SimpleConsumerconsumer = null;

                     try{

                            consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup");

                            List<String>topics = Collections.singletonList(a_topic);

                            TopicMetadataRequestreq = new TopicMetadataRequest(topics);

                            kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);

 

                            List<TopicMetadata>metaData = resp.topicsMetadata();

                            for(TopicMetadata item : metaData) {

                                   for (PartitionMetadata part: item.partitionsMetadata()) {

                                          if(part.partitionId() == a_partition) {

                                                 returnMetaData= part;

                                                 breakloop;

                                          }

                                   }

                            }

                     }catch (Exception e) {

                            System.out.println("Errorcommunicating with Broker [" + seed + "] to find Leader for [" +a_topic + ", " + a_partition + "] Reason: " + e);

                     }finally {

                            if(consumer != null)

                                   consumer.close();

                     }

              }

              if(returnMetaData != null) {

                     m_replicaBrokers.clear();

                     for(kafka.cluster.Broker replica : returnMetaData.replicas()) {

                            m_replicaBrokers.add(replica.host());

                     }

              }

              returnreturnMetaData;

       }

}


这张图是Eclipse下运行结果 、、、

# Flume+Kafka整合成功,下面开始Kafka+Storm整合

# Kafka+Storm的整合

# Kafka的启动(这个在Flume+Kafka的整合中Kafka已经启动了,所以不需要再启动了)

# Storm的启动

# Storm nimbus的启动

[root@rs229 apache-storm-0.9.2-incubating]#bin/storm nimbus &

[1] 26815

[root@rs229 apache-storm-0.9.2-incubating]#Running: /usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server-Dstorm.options= -Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp / /usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/lib/zkclient-0.3.jar:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx1024m -Dlogfile.name=nimbus.log-Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.nimbus

# Storm ui的启动

[root@rs229 apache-storm-0.9.2-incubating]# bin/storm ui &

[2] 26912

[root@rs229 apache-storm-0.9.2-incubating]# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp //usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx768m -Dlogfile.name=ui.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.ui.core

# Storm supervisor的启动

# rs226上启动Supervisor

[root@rs226 apache-storm-0.9.2-incubating]# bin/storm supervisor &

[1] 15273

[root@rs226 apache-storm-0.9.2-incubating]# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor

 

# rs198上启动Supervisor

[root@rs198 apache-storm-0.9.2-incubating]# bin/storm supervisor &

[1] 15274

[root@RS198 apache-storm-0.9.2-incubating]# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor

# rs197上启动Supervisor

[root@RS197 apache-storm-0.9.2-incubating]# bin/stormsupervisor &

[1] 25262

[root@RS197 apache-storm-0.9.2-incubating]# Running:/root/jdk1.6.0_26/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor

# rs167上启动Supervisor

[root@RS196 apache-storm-0.9.2-incubating]# bin/stormsupervisor &

[1] 17330

[root@RS196 apache-storm-0.9.2-incubating]# Running:/root/jdk1.6.0_26/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log-Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor

# 需要打成jar包的3个Class,放到$STORM_HOME/lib目录下去

# MyHighLevelConsumer.java

package com.yting.cloud.kafa.consumer;

 

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import kafka.consumer.ConsumerIterator;

//import kafka.consumer.KafkaStream;

 

/**

 * KafkaSink(custom)

 *

 * @Author王扬庭(妳那伊抹微笑)

 * @Time2014-07-14

 * @Problem youshould run this consumer class before producer

 *

 */

//@SuppressWarnings("all")

public class MyHighLevelConsumer {

    privatefinal ConsumerConnector consumer;

    privatefinal String topic;

    privateExecutorService executor;

 

    publicMyHighLevelConsumer(String a_zookeeper, String a_groupId, String a_topic) {

       consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));

       this.topic = a_topic;

    }

 

    public voidshutdown() {

        if(consumer != null) consumer.shutdown();

        if(executor != null) executor.shutdown();

    }

 

    public voidrun(int a_numThreads) {

       Map<String, Integer> topicCountMap = new HashMap<String,Integer>();

       topicCountMap.put(topic, new Integer(a_numThreads));

       Map<String, List<KafkaStream<byte[], byte[]>>>consumerMap = consumer.createMessageStreams(topicCountMap);

       List<KafkaStream<byte[], byte[]>> streams =consumerMap.get(topic);

 

        // nowlaunch all the threads

        //

       executor = Executors.newFixedThreadPool(a_numThreads);

 

        // nowcreate an object to consume the messages

        //

        intthreadNumber = 0;

        for(final KafkaStream stream : streams) {

           executor.submit(new ConsumerTest(stream, threadNumber));

           threadNumber++;

        }

    }

 

    privatestatic ConsumerConfig createConsumerConfig(String a_zookeeper, Stringa_groupId) {

       Properties props = new Properties();

       props.put("zookeeper.connect", a_zookeeper);

       props.put("group.id", a_groupId);

        props.put("zookeeper.session.timeout.ms","4000");

       props.put("zookeeper.sync.time.ms", "200");

       props.put("auto.commit.interval.ms", "1000");

//       props.put("auto.offset.reset", "smallest");

 

        returnnew ConsumerConfig(props);

    }

   

 

    classConsumerTest implements Runnable {

        privateKafkaStream<byte[], byte[]> m_stream;

        privateint m_threadNumber;

    

        publicConsumerTest(KafkaStream<byte[], byte[]> a_stream, int a_threadNumber) {

           m_threadNumber = a_threadNumber;

           m_stream = a_stream;

        }

    

        publicvoid run() {

           ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

           while (it.hasNext())

               System.out.println("Thread " + m_threadNumber + ": "+ new String(it.next().message()));

           System.out.println("Shutting down Thread: " + m_threadNumber);

        }

    }

   

    publicstatic void main(String[] args) {

//      StringzooKeeper = args[0];

//      String groupId = args[1];

//      Stringtopic = args[2];

//      intthreads = Integer.parseInt(args[3]);

 

//      StringzooKeeper = "116.255.224.229:2182";

      StringzooKeeper ="rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka";

      String groupId = "1";

      Stringtopic = "flume-kafka-storm-001";

      intthreads = 1;

 

     MyHighLevelConsumer example = new MyHighLevelConsumer(zooKeeper,groupId, topic);

     example.run(threads);

 

//      try {

//         Thread.sleep(1000);

//      } catch(InterruptedException ie) {

//

//      }

//     example.shutdown();

  }

}

# HighLevelKafkaSpout.java

package com.yting.cloud.storm.spout;

 

import java.util.Map;

 

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

 

importcom.yting.cloud.kafa.consumer.MyHighLevelConsumer;

import kafka.javaapi.consumer.ConsumerConnector;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

/**

 * Storm spout

 *

 * @Author王扬庭(妳那伊抹微笑)

 * @Time2014-07-14

 *

 */

public class HighLevelKafkaSpout implementsIRichSpout {

       privatestatic final Log log = LogFactory.getLog(HighLevelKafkaSpout.class);

       privateSpoutOutputCollector collector;

       privateConsumerConnector consumer;

       privateString topic;

       privateint a_numThreads = 1;

 

       publicHighLevelKafkaSpout() {

       }

      

       public HighLevelKafkaSpout(Stringtopic) {

              this.topic= topic;

       }

 

       @Override

       publicvoid nextTuple() {

 

       }

 

       @Override

       publicvoid open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

              this.collector= collector;

       }

 

       @Override

       public voidack(Object msgId) {

              log.info("--------->ack");

       }

 

       @Override

       publicvoid activate() {

              log.info("--------->activatestart--------->");

              MyHighLevelConsumer.main(null);

              //这里的具体代码可以重构出来,还有collector.emit(newValues("need to emit"));这样的代码也还没写的,先意思一下了

              log.info("--------->activateend--------->");

       }

 

       @Override

       publicvoid close() {

              log.info("--------->close");

       }

 

       @Override

       publicvoid deactivate() {

              log.info("--------->deactivate");

       }

 

       @Override

       publicvoid fail(Object msgId) {

              log.info("--------->fail");

       }

 

       @Override

       publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(newFields("highLevelKafkaSpout"));

       }

 

       @Override

       publicMap<String, Object> getComponentConfiguration() {

              log.info("--------->getComponentConfiguration");

              returnnull;

       }

 

}

# KafkaTopology.java

package com.yting.cloud.storm.topology;

 

import java.util.HashMap;

import java.util.Map;

 

import com.yting.cloud.storm.spout.HighLevelKafkaSpout;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.utils.Utils;

 

/**

 * Stormtopology

 *

 * @Author 王扬庭(妳那伊抹微笑)

 * @Time2014-07-14

 *

 */

public class KafkaTopology {

       publicstatic void main(String[] args) {

              TopologyBuilderbuilder = new TopologyBuilder();

 

              builder.setSpout("1",new HighLevelKafkaSpout(""), 1);

 

              Mapconf = new HashMap();

              conf.put(Config.TOPOLOGY_WORKERS,1);

              conf.put(Config.TOPOLOGY_DEBUG,true);

 

              LocalClustercluster = new LocalCluster();

              cluster.submitTopology("my-flume-kafka-storm-topology-integration",conf, builder.createTopology());

             

              Utils.sleep(1000*60*5);// local cluster test ...

              cluster.shutdown();

       }

}

# Storm jar 命令运行storm程序

[root@rs229 yjar]# ll

total 72

-rw-r--r-- 1 root root 19826 Jul 14 10:27fks-storm-0013.jar

-rw-r--r-- 1 root root 19833 Jul 14 10:31fks-storm-high-001.jar

-rw-r--r-- 1 root root 15149 Jul  7 17:43 storm-wordcount-official-cluster.jar

-rw-r--r-- 1 root root 15192 Jul  7 17:47 storm-wordcount-official-local.jar

[root@rs229 yjar]# pwd

/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/yjar

[root@rs229 yjar]# storm jarfks-storm-high-001.jar com.yting.cloud.storm.topology.KafkaTopology

日志信息太多了,随便弄一点吧!

14650 [main]INFO com.yting.cloud.storm.spout.HighLevelKafkaSpout - --------->close

14650 [main]INFO  backtype.storm.daemon.executor -Shut down executor 1:[1 1]

14650 [main]INFO  backtype.storm.daemon.worker - Shutdown executors

14650 [main]INFO  backtype.storm.daemon.worker -Shutting down transfer thread

14651[Thread-14-disruptor-worker-transfer-queue] INFO  backtype.storm.util - Async loop interrupted!

14651 [main]INFO  backtype.storm.daemon.worker - Shutdown transfer thread

14652 [main]INFO  backtype.storm.daemon.worker -Shutting down default resources

14653 [main]INFO  backtype.storm.daemon.worker - Shutdown default resources

14661 [main]INFO  backtype.storm.daemon.worker -Disconnecting from storm cluster state context

14664 [main]INFO  backtype.storm.daemon.worker - Shutdown worker my-flume-kafka-storm-topology-integration-1-1405320692e0d44e3c-5b2a-4263-8dab-4aacf4215d2d 1024

14667 [main]INFO  backtype.storm.daemon.supervisor -Shut downe0d44e3c-5b2a-4263-8dab-4aacf4215d2d:14d31572-9e60-4a16-9638-56c22530826d

14667 [main]INFO  backtype.storm.daemon.supervisor -Shutting down supervisor e0d44e3c-5b2a-4263-8dab-4aacf4215d2d

14668 [Thread-3]INFO  backtype.storm.event - Eventmanager interrupted

14668 [Thread-4]INFO  backtype.storm.event - Eventmanager interrupted

14671 [main]INFO  backtype.storm.daemon.supervisor - Shuttingdown supervisor cd464efd-fa69-4566-8cba-7e10d51dae6c

14671 [Thread-5]INFO  backtype.storm.event - Eventmanager interrupted

14672 [Thread-6]INFO  backtype.storm.event - Eventmanager interrupted

14674 [main]INFO  backtype.storm.testing - Shuttingdown in process zookeeper

14675 [main]INFO  backtype.storm.testing - Doneshutting down in process zookeeper

14675 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/c88e689e-c97e-4822-886d-ddcc1e7b9e9d

14677 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/88f3f052-cfaf-4d53-91ad-489965612e94

14678 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/17f2f7cf-7844-4077-8ad9-b3503fa21fb6

14681 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/548e5d3e-b05d-42a7-bd71-ac4ece8e93c4

#在Flume的监控目录下新建一个测试文件,看看Storm程序能否打印出文件内容的数据

# 新建fks-yousmile-20140714.log文件

[root@rs229tdir1]# vi fks-yousmile-20140714.log

So i miss the change to see youagain .

[root@rs229tdir1]# ll

-rw-r--r-- 1root root   40 Jul 14 14:59fks-yousmile-20140714.log.COMPLETED

# Storm控制台信息

14675 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/c88e689e-c97e-4822-886d-ddcc1e7b9e9d

14677 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/88f3f052-cfaf-4d53-91ad-489965612e94

14678 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/17f2f7cf-7844-4077-8ad9-b3503fa21fb6

14681 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/548e5d3e-b05d-42a7-bd71-ac4ece8e93c4

Thread 0: So i miss the change tosee you again .Storm程序已经打印出来刚刚新建文件的内容信息了)

# Flume+Kafka+Storm整合完毕

# Storm那里写的太简单了,可以把Kafka的HighConsumer提取到Storm里面去,便于控制,比如下面的代码:

package com.yting.cloud.storm.spout;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

 

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

 

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

/**

 * Storm spout

 *

 * @Author 王扬庭(妳那伊抹微笑)

 * @Time 2014-07-14

 *

 */

public classKafkaSpoutimplementsIRichSpout {

    private static final Loglog =LogFactory.getLog(KafkaSpout.class);

    private SpoutOutputCollector collector;

    private ConsumerConnector consumer;

    private String topic;

    private int a_numThreads = 1;

 

    public KafkaSpout(String topic) {

       this.topic = topic;

    }

 

    @Override

    public void nextTuple() {

 

    }

 

    @Override

    public void open(Map conf, TopologyContextcontext, SpoutOutputCollector collector) {

       this.collector = collector;

    }

 

    @Override

    public void ack(Object msgId) {

       log.info("--------->ack");

    }

 

    @Override

    public void activate() {

       log.info("--------->activate");

       this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());

       Map<String, Integer> topicCountMap =newHashMap<String, Integer>();

       topicCountMap.put(topic,new Integer(a_numThreads));

       Map<String, List<KafkaStream<byte[],byte[]>>>consumerMap = consumer.createMessageStreams(topicCountMap);

       KafkaStream<byte[],byte[]> streams =consumerMap.get(topic).get(0);

       ConsumerIterator<byte[],byte[]> it =streams.iterator();

       while (it.hasNext()) {

           String value =new String(it.next().message());

           log.info("(--------->Storm kafkaconsumer)------->" + value);

           collector.emit(newValues(value), value);

       }

    }

 

    private static ConsumerConfigcreateConsumerConfig() {

       Properties props =new Properties();

       props.put("zookeeper.connect", "rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka");

//     props.put("zookeeper.connect","rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181");

//     props.put("zookeeper.connect","rs229");

       props.put("group.id", "2");

        props.put("zookeeper.session.timeout.ms","4000");

        props.put("zookeeper.sync.time.ms","200");

        props.put("auto.commit.interval.ms","1000");

 

       return new ConsumerConfig(props);

    }

 

    @Override

    public void close() {

       log.info("--------->close");

    }

 

    @Override

    public void deactivate() {

       log.info("--------->deactivate");

    }

 

    @Override

    public void fail(Object msgId) {

       log.info("--------->fail");

    }

 

    @Override

    public voiddeclareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields("kafkaSpout"));

    }

 

    @Override

    public Map<String, Object>getComponentConfiguration() {

       log.info("--------->getComponentConfiguration");

       return null;

    }

 

}

本人有点小懒,就不弄的更精细了 ,等到搞数据架构的时候再弄好点吧!

# 结束感言

框架整合中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。

本博文整合Flume+Kafka+Storm中的Eclipse工程代码下载地址http://download.csdn.net/detail/u012185296/7633405

好好学习,天天向上,本来这个早该搞好了,由于种种原因延迟了许久,真心计划往往赶不上变化啊!

Good good study , day day up high

The you smile until forever .....................


# Time  2014-07-14
转自:http://blog.csdn.net/u012185296/article/details/37762793

相关问答

更多
  • 主要是为了数据的安全。因为分布式数据库相当于单独存在的个体,一旦发生意外也只影响到局部数据库。另外,当数据传送时,可以减轻中心服务器的压力。具体的看下面: 分布式数据库系统通常使用较小的计算机系统,每台计算机可单独放在一个地方
  • Kafka 分布式消息队列 类似产品有JBoss、MQ 一、由Linkedln 开源,使用scala开发,有如下几个特点: (1)高吞吐 (2)分布式 (3)支持多语言客户端 (C++、Java) 二、组成: 客户端是 producer 和 consumer,提供一些API,服务器端是Broker,客户端提供可以向Broker内发布消息、消费消息,服务器端提供消息的存储等功能 Kafka 特点是支持分区、分布式、可拓展性强 三、Kafka 的消息分几个层次 (1)Topic 一类主题 (2)Partitio ...
  • 实现思路: 1.首先把需要自动执行的django method写成django command 2.将自己定义的django command添加到cron中使用cron服务实现定期执行 Part1 在django工程中添加自定义的django command 1.我们自己建立的 application叫做myapp,首先在这个app目录下,我们需要新建management目录,这个目录里应该包 括:__init__.py(内容为空,用于打包)和commands目录,然后在commands目录下包括:__in ...
  • 现在的软件开发都讲究个"层"的意思. 分布式开发将一个系统分为三个层次:客户端应用程序,应用程序服务器,后台数据库。客户端提出请求,应用服务器接受请求并处理然后返回数据给客户端,后台数据库当然是提供数据。多半是用于WEB开发.这样的分层开发有很多 好处..我就不多说了...
  • 这个比较复杂,这个属于架构方面的,大概是指客户端和服务器端的关系。以前的程序的服务端比较集中在一块,分布式的服务器端可能分布在不同的地方,如云端等等。。。
  • 一、DFS为何物? DFS 即微软分布式文件系统的简称,系统管理员可以利用它来有效的整合网络资源,并把这些资源以单一的层次结构呈现给网络用户。管理员利用它可以把资源发布成一 个树形结构,这样大大简化了为用户进行资源配置和对资源管理的工作量。我们可以在不同的机器上调整和移动文件,这不会影响到用户的访问。 二、为什么要使用DES? 1、DFS使用了现有网络中的Share权限,管理员不必进行新的配置 2、通过一个DFS树形结构用户就可以访问多个网络资源,而不用再把远程驱动器映射到本地共享资源中。 3、DFS可以配 ...
  • 分布式系统(distributed system)是建立在网络之上的软件系统。正是因为软件的特性,所以分布式系统具有高度的内聚性和透明性。因此,网络和分布式系统之间的区别更多的在于高层软件(特别是操作系统),而不是硬件。内聚性是指每一个数据库分布节点高度自治,有本地的数据库管理系统。透明性是指每一个数据库分布节点对用户的应用来说都是透明的,看不出是本地还是远程。在分布式数据库系统中,用户感觉不到数据是分布的,即用户不须知道关系是否分割、有无复本、数据存于哪个站点以及事务在哪个站点上执行等。 故名思义,分布式 ...
  • 如果连接器在同一个使用者组中有三项任务,则会有不同日志的独特任务。 但是,最初的启动日志将是相同的,是的。 我相信每个工作人员只会生成一个文件,而不是每个任务分配的文件 If the connector is given three tasks in the same consumer group, there would be unique tasks with different logs. However, the initial startup logs will be the same, yes. ...
  • 只需按照群集设置指南操作: https://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html 对于伪分布式设置,请在单台计算机中运行所有守护程序(ZK,Nimbus和一个单独的主管)。 Just follow the cluster setup guide: https://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html For pseudo distri ...
  • 第一个请求交换是协商步骤,第二个请求交换是lookup()步骤 正确,但这只需要在客户端应用程序的生命周期中发生一次,而不是每次远程方法调用。 RMI客户端发送协商请求,第一个节点接收它,然后负载均衡器将lookup()请求发送给第二个节点...... 不可能。 这两个请求都通过相同的TCP连接传输到相同的目标主机。 然后其他请求交换是远程过程调用(当我们使用扩展Remote的类中的方法时)。 不必要。 对目标对象的请求可以通过不同的连接进行,因为它们可以包括另一个协商步骤。 无论是否存在负载均衡器,它们也 ...