基于Flume的美团日志收集系统(二)改进和优化

2019-03-02 23:51|来源: 网路

问题导读:
1.Flume的存在些什么问题?
2.基于开源的Flume美团增加了哪些功能?
3.Flume系统如何调优?







在《 基于Flume的美团日志收集系统(一)架构和设计》中,我们详述了基于Flume的美团日志收集系统的架构设计,以及为什么做这样的设计。在本节中,我们将会讲述在实际部署和使用过程中遇到的问题,对Flume的功能改进和对系统做的优化。

1 Flume的问题总结

在Flume的使用过程中,遇到的主要问题如下:
a. Channel“水土不服”:使用固定大小的MemoryChannel在日志高峰时常报队列大小不够的异常;使用FileChannel又导致IO繁忙的问题;
b. HdfsSink的性能问题:使用HdfsSink向Hdfs写日志,在高峰时间速度较慢;
c. 系统的管理问题:配置升级,模块重启等;

2 Flume的功能改进和优化点

从上面的问题中可以看到,有一些需求是原生Flume无法满足的,因此,基于开源的Flume我们增加了许多功能,修改了一些Bug,并且进行一些调优。下面将对一些主要的方面做一些说明。

2.1 增加Zabbix monitor服务

一方面,Flume本身提供了http, ganglia的监控服务,而我们目前主要使用zabbix做监控。因此,我们为Flume添加了zabbix监控模块,和sa的监控服务无缝融合。
另一方面,净化Flume的metrics。只将我们需要的metrics发送给zabbix,避免 zabbix server造成压力。目前我们最为关心的是Flume能否及时把应用端发送过来的日志写到Hdfs上, 对应关注的metrics为:
  • Source : 接收的event数和处理的event数
  • Channel : Channel中拥堵的event数
  • Sink : 已经处理的event数

2.2 为HdfsSink增加自动创建index功能

首先,我们的HdfsSink写到hadoop的文件采用lzo压缩存储。 HdfsSink可以读取hadoop配置文件中提供的编码类列表,然后通过配置的方式获取使用何种压缩编码,我们目前使用lzo压缩数据。采用lzo压缩而非bz2压缩,是基于以下测试数据:
event大小(Byte) sink.batch-size hdfs.batchSize 压缩格式 总数据大小(G) 耗时(s) 平均events/s 压缩后大小(G)
544 300 10000 bz2 9.1 2448 6833 1.36
544 300 10000 lzo 9.1 612 27333 3.49
其次,我们的HdfsSink增加了创建lzo文件后自动创建index功能。Hadoop提供了对lzo创建索引,使得压缩文件是可切分的,这样Hadoop Job可以并行处理数据文件。HdfsSink本身lzo压缩,但写完lzo文件并不会建索引,我们在close文件之后添加了建索引功能。
  1. /**
  2.    * Rename bucketPath file from .tmp to permanent location.
  3.    */
  4.   private void renameBucket() throws IOException, InterruptedException {
  5.       if(bucketPath.equals(targetPath)) {
  6.               return;
  7.         }
  8.         final Path srcPath = new Path(bucketPath);
  9.         final Path dstPath = new Path(targetPath);
  10.         callWithTimeout(new CallRunner<Object>() {
  11.               @Override
  12.               public Object call() throws Exception {
  13.                 if(fileSystem.exists(srcPath)) { // could block
  14.                       LOG.info("Renaming " + srcPath + " to " + dstPath);
  15.                      fileSystem.rename(srcPath, dstPath); // could block
  16.                       //index the dstPath lzo file
  17.                       if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) {
  18.                               LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
  19.                               lzoIndexer.index(dstPath);
  20.                       }
  21.                 }
  22.                 return null;
  23.               }
  24.     });
  25. }
复制代码


2.3 增加HdfsSink的开关

我们在HdfsSink和DualChannel中增加开关,当开关打开的情况下,HdfsSink不再往Hdfs上写数据,并且数据只写向DualChannel中的FileChannel。以此策略来防止Hdfs的正常停机维护。

2.4 增加DualChannel

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。
其具体的逻辑如下:
  1. /***
  2. * putToMemChannel indicate put event to memChannel or fileChannel
  3. * takeFromMemChannel indicate take event from memChannel or fileChannel
  4. * */
  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
  7. void doPut(Event event) {
  8.         if (switchon && putToMemChannel.get()) {
  9.               //往memChannel中写数据
  10.               memTransaction.put(event);
  11.               if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
  12.                 putToMemChannel.set(false);
  13.               }
  14.         } else {
  15.               //往fileChannel中写数据
  16.               fileTransaction.put(event);
  17.         }
  18.   }
  19. Event doTake() {
  20.     Event event = null;
  21.     if ( takeFromMemChannel.get() ) {
  22.         //从memChannel中取数据
  23.         event = memTransaction.take();
  24.         if (event == null) {
  25.             takeFromMemChannel.set(false);
  26.         } 
  27.     } else {
  28.         //从fileChannel中取数据
  29.         event = fileTransaction.take();
  30.         if (event == null) {
  31.             takeFromMemChannel.set(true);
  32.             putToMemChannel.set(true);
  33.         } 
  34.     }
  35.     return event;
  36. }
复制代码



2.5 增加NullChannel

Flume提供了NullSink,可以把不需要的日志通过NullSink直接丢弃,不进行存储。然而,Source需要先将events存放到Channel中,NullSink再将events取出扔掉。为了提升性能,我们把这一步移到了Channel里面做,所以开发了NullChannel。

2.6 增加KafkaSink

为支持向Storm提供实时数据流,我们增加了KafkaSink用来向Kafka写实时数据流。其基本的逻辑如下:
  1. public class KafkaSink extends AbstractSink implements Configurable {
  2.         private String zkConnect;
  3.         private Integer zkTimeout;
  4.         private Integer batchSize;
  5.         private Integer queueSize;
  6.         private String serializerClass;
  7.         private String producerType;
  8.         private String topicPrefix;
  9.         private Producer<String, String> producer;
  10.         public void configure(Context context) {
  11.             //读取配置,并检查配置
  12.         }
  13.         @Override
  14.         public synchronized void start() {
  15.             //初始化producer
  16.         }
  17.         @Override
  18.         public synchronized void stop() {
  19.             //关闭producer
  20.         }
  21.         @Override
  22.         public Status process() throws EventDeliveryException {
  23.             Status status = Status.READY;
  24.             Channel channel = getChannel();
  25.             Transaction tx = channel.getTransaction();
  26.             try {
  27.                     tx.begin();
  28.                     //将日志按category分队列存放
  29.                     Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>();
  30.                     //从channel中取batchSize大小的日志,从header中获取category,生成topic,并存放于上述的Map中;
  31.                     //将Map中的数据通过producer发送给kafka 
  32.                    tx.commit();
  33.             } catch (Exception e) {
  34.                     tx.rollback();
  35.                     throw new EventDeliveryException(e);
  36.             } finally {
  37.                 tx.close();
  38.             }
  39.             return status;
  40.         }
  41. }
复制代码



2.7 修复和scribe的兼容问题

Scribed在通过ScribeSource发送数据包给Flume时,大于4096字节的包,会先发送一个Dummy包检查服务器的反应,而Flume的ScribeSource对于logentry.size()=0的包返回TRY_LATER,此时Scribed就认为出错,断开连接。这样循环反复尝试,无法真正发送数据。现在在ScribeSource的Thrift接口中,对size为0的情况返回OK,保证后续正常发送数据。

3. Flume系统调优经验总结3.1 基础参数调优经验

  • HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符;
  1. lc.sinks.sink_hdfs.serializer.appendNewline = false
复制代码



  • 调大MemoryChannel的capacity,尽量利用MemoryChannel快速的处理能力;
  • 调大HdfsSink的batchSize,增加吞吐量,减少hdfs的flush次数;
  • 适当调大HdfsSink的callTimeout,避免不必要的超时错误;

3.2 HdfsSink获取Filename的优化

HdfsSink的path参数指明了日志被写到Hdfs的位置,该参数中可以引用格式化的参数,将日志写到一个动态的目录中。这方便了日志的管理。例如我们可以将日志写到category分类的目录,并且按天和按小时存放:
  1. lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H
复制代码
HdfsS ink中处理每条event时,都要根据配置获取此event应该写入的Hdfs path和filename,默认的获取方法是通过正则表达式替换配置中的变量,获取真实的path和filename。因为此过程是每条event都要做的操作,耗时很长。通过我们的测试,20万条日志,这个操作要耗时6-8s左右。
由于我们目前的path和filename有固定的模式,可以通过字符串拼接获得。而后者比正则匹配快几十倍。拼接定符串的方式,20万条日志的操作只需要几百毫秒。

3.3 HdfsSink的b/m/s优化

在我们初始的设计中,所有的日志都通过一个Channel和一个HdfsSink写到Hdfs上。我们来看一看这样做有什么问题。
首先,我们来看一下HdfsSink在发送数据的逻辑:
  1. //从Channel中取batchSize大小的events
  2. for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
  3.     //对每条日志根据category append到相应的bucketWriter上;
  4.     bucketWriter.append(event);
  5. for (BucketWriter bucketWriter : writers) {
  6.     //然后对每一个bucketWriter调用相应的flush方法将数据flush到Hdfs上
  7.     bucketWriter.flush();
复制代码


假设我们的系统中有100个category,batchSize大小设置为20万。则每20万条数据,就需要对100个文件进行append或者flush操作。
其次,对于我们的日志来说,基本符合80/20原则。即20%的category产生了系统80%的日志量。这样对大部分日志来说,每20万条可能只包含几条日志,也需要往Hdfs上flush一次。
上述的情况会导致HdfsSink写Hdfs的效率极差。下图是单Channel的情况下每小时的发送量和写hdfs的时间趋势图。
 
鉴于这种实际应用场景,我们把日志进行了大小归类,分为big, middle和small三类,这样可以有效的避免小日志跟着大日志一起频繁的flush,提升效果明显。下图是分队列后big队列的每小时的发送量和写hdfs的时间趋势图。

转自:http://www.cnblogs.com/gaojunfeng/p/3948536

相关问答

更多
  • 采集层 主要可以使用Flume, Kafka两种技术。 Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API. Kafka:Kafka是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka; ...
  • 、Flume介绍 Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 flume可以搜集数据,合并数据。合并到什么地方,可以是hdfs。也就是flume可以与hadoop相结合 复制代码 设计目标: (1) 可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将e ...
  • REHL6的硬件相关的日志在/var/log/messages下面 可以用使用 dmesg 命令进行打印。
  • 我假设你正在使用一个cloudera沙箱,并且正在讨论将一个文件放在你正在计划启动的flume agent的本地沙箱上。 水剂包含: 源通道接收器 这些应该坐在当地的水槽代理商。 可用的flume源列表位于用户指南中: https : //flume.apache.org/FlumeUserGuide.html 。 如果您只想使用tail或cat命令从文件中流数据,则可以使用Exec源。 您也可以使用后台打印目录源将监视新文件的指定目录,并在新文件出现时解析事件。 仔细阅读用户指南。 包含你需要的一切。 I ...
  • 经典的Syslog源基本上设计为连接到一个syslog主机,即您必须为10个syslog服务器设置10个源。 所有这些源都可以在一个代理中运行,并使用一个通道将其事件假脱机到一个接收器 - 如果数据量很大,这个设置很快就会遇到性能问题。 您必须配置路由器以连接到该syslog主机/端口配置。 较大的设置是为每个syslog服务器安装一个代理,并使用Avro sink / Avro Source将事件假定为一个或两个代理,然后再将事件排序并将它们写入您想要的位置。 您还可以使用较新的Multiport Sys ...
  • 您可以使用flume exec source,收集日志并使用hdfs sink来存储日志。 配置可以这样: a1.sources = r1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = dstat -ta --top-cputime a1.sources.r1.channels = c1 http://flume.apache.org/FlumeUserGuide.html#hdfs-sink http://flume ...
  • 那么我该如何收集这些日志呢? 通常,除非您正在为设备制造商工作,否则不会收集这些日志。 首先,从未正式支持在运行时访问LogCat; 因此,你必须采取笨重的“fork logcat ”方法。 除此之外,你需要READ_LOGS权限才能获得比你更多的东西。 该权限拥有signature|privileged|development ,这意味着普通应用程序无法拥有该权限。 这是出于隐私原因。 READ_LOGS使您可以访问所有的LogCat,以及许多应用程序(和一些系统进程)记录可能敏感的信息。 So how ...
  • 在/etc/flume/conf.empty下,将log4j.properties从它复制到/ etc / flume / conf。 然后,flume及其扩展将记录您在log4j.properties中配置的方式。 under /etc/flume/conf.empty, copy the log4j.properties from it to /etc/flume/conf. Then flume and its extensions will log the way you configured in ...