知识点
相关文章
更多最近更新
更多Hadoop的I/O
2019-03-28 13:51|来源: 网络
1. 数据完整性:任何语言对IO的操作都要保持其数据的完整性。Hadoop当然希望数据在存储和处理中不会丢失或损坏。检查数据完整性的常用方法是校验和。
- HDFS的数据完整性:客户端在写或者读取HDFS的文件时,都会对其进行校验和验证,当然我们可以通过在Open()方法读取之前,将false传给FileSystem中的setVerifyCheckSum()来禁用校验和。
- 本地文件系统,hadoop的本地文件系统执行客户端校验,这意味着,在写一个filename文件时,文件系统的客户端以透明方式创建了一个隐藏的文件.filename.crc,块的大小做为元数据存于此,所以读取文件时会进行校验和验证。
- ChecksumFileSystem:可以通过它对其数据验证。
2. 压缩:压缩后能够节省空间和减少网络中的传输。所以在hadoop中压缩是非常重要的。hadoop的压缩格式
压缩格式 | 算法 | 文件扩展名 | 多文件 | 可分割性 |
DEFLATEa | DEFLATE | .deflate | no | no |
gzip(zip) | DEFLATE | .gz(.zip) | no(yes) | no(yes) |
bzip2 | bzip2 | .bz2 | no | yes |
LZO | LZO | .lzo | no | no |
- 编码/解码
Compression format Hadoop CompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
可以用ComressionCodec轻松的压缩和解压缩。我们可以用
CompressionOutput
创建一个
CompressionOutputStream
(未压缩的数据写到此)。相反,可以用compressionInputStream进行解压缩。
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- // TODO Auto-generated method stub
- String codecClassname = args[0];
- Class<?> codecClass = Class.forName(codecClassname);
- Configuration configuration = new Configuration();
- CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- CompressionOutputStream outputStream = codec.createOutputStream(System.out);
- IOUtils.copyBytes(System.in, outputStream, 4096,false);
- outputStream.finish();
- }
-
压缩和分割:因为HDFS默认是以块的来存储数据的,所以在压缩时考虑是否支持分割时非常重要的。
-
在MapReduce使用压缩:例如要压缩MapReduce作业的输出,需要将配置文件中mapred.output.compress的属性设置为true
- public static void main(String[] args) throws IOException {
- if (args.length != 2) {
- System.err.println("Usage: MaxTemperatureWithCompression <input path> " +
- "<output path>");
- System.exit(-1);
- }
- JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
- conf.setJobName("Max temperature with output compression");
- FileInputFormat.addInputPath(conf, new Path(args[0]));
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(IntWritable.class);
- /*[*/conf.setBoolean("mapred.output.compress", true);
- conf.setClass("mapred.output.compression.codec", GzipCodec.class,
- CompressionCodec.class);/*]*/
- conf.setMapperClass(MaxTemperatureMapper.class);
- conf.setCombinerClass(MaxTemperatureReducer.class);
- conf.setReducerClass(MaxTemperatureReducer.class);
- JobClient.runJob(conf);
- }
3.序列化:将字节流和机构化对象的转化。hadoop是进程间通信(RPC调用),PRC序列号结构特点:紧凑,快速,可扩展,互操作,hadoop使用自己的序列化格式Writerable,
-
Writerable接口:
- package org.apache.hadoop.io;
- import java.io.DataOutput;
- import java.io.DataInput;
- import java.io.IOException;
- public interface Writable {
- void write(DataOutput out) throws IOException;// 将序列化流写入DataOutput
- void readFields(DataInput in) throws IOException; //从DataInput流读取二进制
- }
- package WritablePackage;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.util.StringUtils;
- import org.hsqldb.lib.StringUtil;
- public class WritableTestBase
- {
- public static byte[] serialize(Writable writable) throws IOException
- {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
- writable.write(dataOutputStream);
- dataOutputStream.close();
- return outputStream.toByteArray();
- }
- public static byte[] deserialize(Writable writable,byte[] bytes) throws IOException
- {
- ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- writable.readFields(dataInputStream);
- dataInputStream.close();
- return bytes;
- }
- public static String serializeToString(Writable src) throws IOException
- {
- return StringUtils.byteToHexString(serialize(src));
- }
- public static String writeTo(Writable src, Writable des) throws IOException
- {
- byte[] data = deserialize(des, serialize(src));
- return StringUtils.byteToHexString(data);
- }
- }
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
相关问答
更多-
MPI I / O是如何实现的?(How is MPI I/O Implemented?)[2021-11-28]
I / O的机制取决于实现。 此外,没有一种I / O风格。 某些I / O由远程队列缓存,并在运行结束时由mpirun进程收集。 根据需要将某些I / O写入本地临时空间。 某些I / O被写入NAS / SAN样式的高性能共享文件系统。 一些MPI使用第三方库来支持并行文件系统的I / O,这些细节可能是专有的。 有些文件系统是本地光盘,有些则是光纤上的SAN或InfinBand。 您是如何计划实际测量I / O所花费的时间的? 您是否计划使用pMPI接口拦截所有对库的调用? The mechanism ... -
Hadoop用于对大量数据执行计算。 您的工作可能受IO(您称之为I / O密集型),CPU和网络资源的限制。 在Hadoop使用的经典案例中,您正在对大量输入数据执行本地计算,同时返回相对较小的结果集,这使得您的任务比CPU和网络密集型更加IO密集,但它在很大程度上取决于作业本身。 这里有些例子: IO密集的工作 。 您在地图上阅读了大量数据,但地图任务的结果并不是那么大。 一个示例是计算输入文本中的行数,计算RCfile中某些列的总和,将Hive查询的结果通过具有相对较小基数的列的组获取。 这意味着你的 ...
-
文件i / o问题(file i/o problem)[2023-10-12]
其他答案解释了您的代码有什么问题。 这是一种更加pythonic的方式来做你想做的事情: with open('seq.txt', mode='r', encoding='utf-8') as infile: with open('seqpar.txt', mode='w', encoding='utf-8') as outfile: for line in infile: outfile.write(line.strip("\n")) # O ... -
您无法同时读取和写入文件,这将抛出IOException 。 在尝试使用其他内容访问文件之前,您应该关闭任何有权访问该文件的内容。 在尝试使用BufferedReader访问文件之前,在BufferedWriter上调用close()方法应该可以解决问题。 编辑:此外,正如其他人所提到的,您可以使用e.printStackTrace()来查看程序中发生异常的位置,这在调试时非常有用。 编辑:正如zapl澄清的那样,某些文件系统就是这种情况,包括Windows,但不是全部。 我假设您使用的文件系统限制了它,因 ...
-
一些i / o问题(Some i/o problems)[2023-03-04]
你在每次迭代时重置dirs和spks,所以基本上每次循环运行时它都会启动一个新的列表。 在循环外获取dirs和spks声明应该可以解决问题。 You reset dirs and spks at every iteration, so basically it start a fresh new list every time your loop runs. Getting the dirs and spks declaration outside the loop should do the trick. ... -
I / O重定向(I/O redirection)[2022-07-09]
因为您正在使用MS-DOS ... MS MS WinDOS,并且ASCII号码为26 / ^ Z是文本结束文件标记。 该功能的存在使得环境与20世纪70年代早期的CP / M操作系统兼容,以防您需要使用源自该文件的一些文件。 正如你所注意到的那样,只有type才能起作用,但more会显示更多......(没有双关语)。 不开玩笑。 Because you're using MS-DOS... er MS WinDOS, and there ASCII number 26/^Z is the end-of- ... -
Hadoop不会改变任何东西的渐近复杂性。 它只是关于减少大O忽略的常数因素。 将分布式计算的结果放在一起总会有一些开销。 但是,如果遇到三个问题,使用组合器会将最终排序减少到O(1)。 我不知道当只有一个键时,每个地图主机上发生的局部排序的复杂性是什么,以便为组合器分组。 在这种情况下,它可能比O(n lg n)更好。 Hadoop does not change the asymptotic complexity of anything. It is merely about reducing the ...
-
如果您使用的是具有YARN框架的Hadoop 2.2.0,则其中没有jobtracker 。 它的功能被拆分并由ResourceManager和ApplicationMaster取代。 这是运行YARN时预期的jps打印输出 $jps 18509 Jps 17107 NameNode 17170 DataNode 17252 ResourceManager 17309 NodeManager 17626 JobHistoryServer If you are using Hadoop 2.2.0 whic ...
-
大多数VM(全部?)不公开该数据。 您需要使用像sigar这样的库(source在github上 ),它可以收集这些值,然后通过JMX公开sigar的返回值。 我们在http://rhq-project.org/上成功使用了sigar Most VMs (all?) do not expose that data. You would need to use a library like sigar (source is at github), that can gather those values an ...
-
如果您的问题是关于向现有H2O群集(在Hadoop或任何地方)添加新节点,答案是否定的。 您必须首先列出您想要创建H2O节点的所有IP地址,并形成群集。 H2O不支持动态分配资源,因此您无法将新节点添加到现有的H2O群集中。 If your question is about adding a new node to existing H2O cluster (on Hadoop or anywhere), the answer is no. You must list all the IP address ...