Hadoop的I/O

2019-03-28 13:51|来源: 网络

1. 数据完整性:任何语言对IO的操作都要保持其数据的完整性。Hadoop当然希望数据在存储和处理中不会丢失或损坏。检查数据完整性的常用方法是校验和。

  • HDFS的数据完整性:客户端在写或者读取HDFS的文件时,都会对其进行校验和验证,当然我们可以通过在Open()方法读取之前,将false传给FileSystem中的setVerifyCheckSum()来禁用校验和。
  • 本地文件系统,hadoop的本地文件系统执行客户端校验,这意味着,在写一个filename文件时,文件系统的客户端以透明方式创建了一个隐藏的文件.filename.crc,块的大小做为元数据存于此,所以读取文件时会进行校验和验证。
  • ChecksumFileSystem:可以通过它对其数据验证。

2. 压缩:压缩后能够节省空间和减少网络中的传输。所以在hadoop中压缩是非常重要的。hadoop的压缩格式

压缩格式 算法 文件扩展名 多文件 可分割性
DEFLATEa DEFLATE .deflate no no
gzip(zip) DEFLATE .gz(.zip) no(yes) no(yes)
bzip2 bzip2 .bz2 no yes
LZO LZO .lzo no no
  • 编码/解码
Compression format          Hadoop CompressionCodec
DEFLATE                            org.apache.hadoop.io.compress.DefaultCodec
gzip                                   org.apache.hadoop.io.compress.GzipCodec
bzip2                                 org.apache.hadoop.io.compress.BZip2Codec
LZO                                   com.hadoop.compression.lzo.LzopCodec
可以用ComressionCodec轻松的压缩和解压缩。我们可以用 CompressionOutput 创建一个 CompressionOutputStream 未压缩的数据写到此)。相反,可以用compressionInputStream进行解压缩。
  1. /** 
  2.      * @param args 
  3.      */  
  4.     public static void main(String[] args) throws Exception  
  5.     {  
  6.         // TODO Auto-generated method stub   
  7.         String codecClassname = args[0];  
  8.         Class<?> codecClass = Class.forName(codecClassname);  
  9.         Configuration configuration = new Configuration();  
  10.         CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);  
  11.         CompressionOutputStream  outputStream = codec.createOutputStream(System.out);  
  12.         IOUtils.copyBytes(System.in, outputStream, 4096,false);  
  13.         outputStream.finish();  
  14.     }  
  • 压缩和分割:因为HDFS默认是以块的来存储数据的,所以在压缩时考虑是否支持分割时非常重要的。
  • 在MapReduce使用压缩:例如要压缩MapReduce作业的输出,需要将配置文件中mapred.output.compress的属性设置为true
  1. public static void main(String[] args) throws IOException {  
  2.     if (args.length != 2) {  
  3.       System.err.println("Usage: MaxTemperatureWithCompression <input path> " +  
  4.             "<output path>");  
  5.       System.exit(-1);  
  6.     }  
  7.       
  8.     JobConf conf = new JobConf(MaxTemperatureWithCompression.class);  
  9.     conf.setJobName("Max temperature with output compression");  
  10.   
  11.     FileInputFormat.addInputPath(conf, new Path(args[0]));  
  12.     FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  13.       
  14.     conf.setOutputKeyClass(Text.class);  
  15.     conf.setOutputValueClass(IntWritable.class);  
  16.       
  17.     /*[*/conf.setBoolean("mapred.output.compress"true);  
  18.     conf.setClass("mapred.output.compression.codec", GzipCodec.class,  
  19.         CompressionCodec.class);/*]*/  
  20.   
  21.     conf.setMapperClass(MaxTemperatureMapper.class);  
  22.     conf.setCombinerClass(MaxTemperatureReducer.class);  
  23.     conf.setReducerClass(MaxTemperatureReducer.class);  
  24.   
  25.     JobClient.runJob(conf);  
  26.   }  

3.序列化:将字节流和机构化对象的转化。hadoop是进程间通信(RPC调用),PRC序列号结构特点:紧凑,快速,可扩展,互操作,hadoop使用自己的序列化格式Writerable,

  • Writerable接口: 
  1. package org.apache.hadoop.io;  
  2. import java.io.DataOutput;  
  3. import java.io.DataInput;  
  4. import java.io.IOException;  
  5. public interface Writable {  
  6. void write(DataOutput out) throws IOException;// 将序列化流写入DataOutput   
  7. void readFields(DataInput in) throws IOException; //从DataInput流读取二进制   
  8. }  

 

  1. package WritablePackage;  
  2.   
  3. import java.io.ByteArrayInputStream;  
  4. import java.io.ByteArrayOutputStream;  
  5. import java.io.DataInputStream;  
  6. import java.io.DataOutputStream;  
  7. import java.io.IOException;  
  8.   
  9.   
  10. import org.apache.hadoop.io.Writable;  
  11. import org.apache.hadoop.util.StringUtils;  
  12. import org.hsqldb.lib.StringUtil;  
  13.   
  14. public class WritableTestBase  
  15. {  
  16.     public static byte[] serialize(Writable writable) throws IOException  
  17.     {  
  18.         ByteArrayOutputStream outputStream  = new ByteArrayOutputStream();  
  19.         DataOutputStream dataOutputStream = new DataOutputStream(outputStream);  
  20.         writable.write(dataOutputStream);  
  21.         dataOutputStream.close();  
  22.         return outputStream.toByteArray();  
  23.     }  
  24.       
  25.     public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException  
  26.     {  
  27.         ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);  
  28.         DataInputStream dataInputStream = new DataInputStream(inputStream);  
  29.         writable.readFields(dataInputStream);  
  30.         dataInputStream.close();  
  31.         return bytes;  
  32.     }  
  33.       
  34.     public static String serializeToString(Writable src) throws IOException  
  35.     {  
  36.         return StringUtils.byteToHexString(serialize(src));  
  37.     }  
  38.       
  39.     public static String writeTo(Writable src, Writable des) throws IOException  
  40.     {  
  41.         byte[] data = deserialize(des, serialize(src));  
  42.         return StringUtils.byteToHexString(data);  
  43.     }  
  44. }  
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关问答

更多
  • I / O的机制取决于实现。 此外,没有一种I / O风格。 某些I / O由远程队列缓存,并在运行结束时由mpirun进程收集。 根据需要将某些I / O写入本地临时空间。 某些I / O被写入NAS / SAN样式的高性能共享文件系统。 一些MPI使用第三方库来支持并行文件系统的I / O,这些细节可能是专有的。 有些文件系统是本地光盘,有些则是光纤上的SAN或InfinBand。 您是如何计划实际测量I / O所花费的时间的? 您是否计划使用pMPI接口拦截所有对库的调用? The mechanism ...
  • Hadoop用于对大量数据执行计算。 您的工作可能受IO(您称之为I / O密集型),CPU和网络资源的限制。 在Hadoop使用的经典案例中,您正在对大量输入数据执行本地计算,同时返回相对较小的结果集,这使得您的任务比CPU和网络密集型更加IO密集,但它在很大程度上取决于作业本身。 这里有些例子: IO密集的工作 。 您在地图上阅读了大量数据,但地图任务的结果并不是那么大。 一个示例是计算输入文本中的行数,计算RCfile中某些列的总和,将Hive查询的结果通过具有相对较小基数的列的组获取。 这意味着你的 ...
  • 其他答案解释了您的代码有什么问题。 这是一种更加pythonic的方式来做你想做的事情: with open('seq.txt', mode='r', encoding='utf-8') as infile: with open('seqpar.txt', mode='w', encoding='utf-8') as outfile: for line in infile: outfile.write(line.strip("\n")) # O ...
  • 您无法同时读取和写入文件,这将抛出IOException 。 在尝试使用其他内容访问文件之前,您应该关闭任何有权访问该文件的内容。 在尝试使用BufferedReader访问文件之前,在BufferedWriter上调用close()方法应该可以解决问题。 编辑:此外,正如其他人所提到的,您可以使用e.printStackTrace()来查看程序中发生异常的位置,这在调试时非常有用。 编辑:正如zapl澄清的那样,某些文件系统就是这种情况,包括Windows,但不是全部。 我假设您使用的文件系统限制了它,因 ...
  • 你在每次迭代时重置dirs和spks,所以基本上每次循环运行时它都会启动一个新的列表。 在循环外获取dirs和spks声明应该可以解决问题。 You reset dirs and spks at every iteration, so basically it start a fresh new list every time your loop runs. Getting the dirs and spks declaration outside the loop should do the trick. ...
  • 因为您正在使用MS-DOS ... MS MS WinDOS,并且ASCII号码为26 / ^ Z是文本结束文件标记。 该功能的存在使得环境与20世纪70年代早期的CP / M操作系统兼容,以防您需要使用源自该文件的一些文件。 正如你所注意到的那样,只有type才能起作用,但more会显示更多......(没有双关语)。 不开玩笑。 Because you're using MS-DOS... er MS WinDOS, and there ASCII number 26/^Z is the end-of- ...
  • Hadoop不会改变任何东西的渐近复杂性。 它只是关于减少大O忽略的常数因素。 将分布式计算的结果放在一起总会有一些开销。 但是,如果遇到三个问题,使用组合器会将最终排序减少到O(1)。 我不知道当只有一个键时,每个地图主机上发生的局部排序的复杂性是什么,以便为组合器分组。 在这种情况下,它可能比O(n lg n)更好。 Hadoop does not change the asymptotic complexity of anything. It is merely about reducing the ...
  • 如果您使用的是具有YARN框架的Hadoop 2.2.0,则其中没有jobtracker 。 它的功能被拆分并由ResourceManager和ApplicationMaster取代。 这是运行YARN时预期的jps打印输出 $jps 18509 Jps 17107 NameNode 17170 DataNode 17252 ResourceManager 17309 NodeManager 17626 JobHistoryServer If you are using Hadoop 2.2.0 whic ...
  • 大多数VM(全部?)不公开该数据。 您需要使用像sigar这样的库(source在github上 ),它可以收集这些值,然后通过JMX公开sigar的返回值。 我们在http://rhq-project.org/上成功使用了sigar Most VMs (all?) do not expose that data. You would need to use a library like sigar (source is at github), that can gather those values an ...
  • 如果您的问题是关于向现有H2O群集(在Hadoop或任何地方)添加新节点,答案是否定的。 您必须首先列出您想要创建H2O节点的所有IP地址,并形成群集。 H2O不支持动态分配资源,因此您无法将新节点添加到现有的H2O群集中。 If your question is about adding a new node to existing H2O cluster (on Hadoop or anywhere), the answer is no. You must list all the IP address ...