知识点
相关文章
更多最近更新
更多Mapreduce中的RCFile输入RCFileInputFormat实现及其应用
2019-03-28 13:28|来源: 网络
基于旧的mapreduce的api的输入格式在hive中已有实现,在org.apache.Hadoop.hive.ql.io下,下面代码是根据其源码自己实现的新mapreduce api接口。上代码:RCFileInputFormat.java
- import java.io.IOException;
- import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- /**
- * RCFileInputFormat.
- *
- * @param <K>
- * @param <V>
- */
- public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>
- extends FileInputFormat<K, V> {
- public RCFileInputFormat() {
- }
- @SuppressWarnings("unchecked")
- @Override
- public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader(
- org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1)
- throws IOException, InterruptedException {
- return new RCFileRecordReader();
- }
- }
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hive.ql.io.RCFile;
- import org.apache.hadoop.hive.ql.io.RCFile.Reader;
- import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;</P><P>/**
- * RCFileRecordReader.
- *
- * @param <K>
- * @param <V>
- */
- public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>
- extends RecordReader<LongWritable, BytesRefArrayWritable> {</P><P> private Reader in;
- private long start;
- private long end;
- private boolean more = true;
- private LongWritable key = null;
- private BytesRefArrayWritable value = null;
- protected Configuration conf;</P><P> /**
- * Return the progress within the input split.
- *
- * @return 0.0 to 1.0 of the input byte range
- */
- public float getProgress() throws IOException {
- if (end == start) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (in.getPosition() - start)
- / (float) (end - start));
- }
- }
- public void close() throws IOException {
- in.close();
- }
- @Override
- public LongWritable getCurrentKey() throws IOException,
- InterruptedException {
- return key;
- }
- @Override
- public BytesRefArrayWritable getCurrentValue() throws IOException,
- InterruptedException {
- return value;
- }
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- FileSplit fileSplit = (FileSplit) split;
- conf = context.getConfiguration();
- Path path = fileSplit.getPath();
- FileSystem fs = path.getFileSystem(conf);
- this.in = new RCFile.Reader(fs, path, conf);
- this.end = fileSplit.getStart() + fileSplit.getLength();
- if (fileSplit.getStart() > in.getPosition()) {
- in.sync(fileSplit.getStart()); // sync to start
- }
- this.start = in.getPosition();
- more = start < end;
- }
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!more) {
- return false;
- }
- if (key == null) {
- key = new LongWritable();
- }
- if (value == null) {
- value = new BytesRefArrayWritable();
- }
- more = in.next(key);
- if (!more) {
- return false;
- }
- long lastSeenSyncPos = in.lastSeenSyncPos();
- if (lastSeenSyncPos >= end) {
- more = false;
- return more;
- }
- in.getCurrentRow(value);
- return more;
- }
- }
应用方式:
job.setInputFormatClass(RCFileInputFormat.class);
public static class Map extends Mapper<LongWritable, BytesRefArrayWritable, Text, NullWritable> {
@Override
protected void map(LongWritable key, BytesRefArrayWritable value, Context context) throws IOException, InterruptedException {
String top = new String(value.get(32).getBytesCopy());
byte[] channel = value.get(12).getBytesCopy();
相关问答
更多-
您仍然可以使用MultipleInputs并传入非空路径。 它不需要指向有效的位置仍然可以工作,它只是不能为空。 我想这没关系。 You can still use MultipleInputs and just pass in a non null Path. It doesn't need to point to a valid location to still work, it just can't be null. This is ok I suppose.
-
Hadoop检查文件扩展名以检测压缩文件。 Hadoop支持的压缩类型有:gzip,bzip2和LZO。 您不需要采取任何其他操作来使用这些类型的压缩来提取文件; Hadoop为您处理它。 因此,您只需要为文本文件编写逻辑,并将包含.gz文件的目录作为输入传递。 但是gzip文件的问题在于它们不是可拆分的,假设你有每个5GB的gzip文件,那么每个映射器将处理整个5GB文件,而不是使用默认的块大小。 Hadoop checks the file extension to detect compressed ...
-
PyMongo中的MapReduce(MapReduce in PyMongo)[2023-05-31]
您可以使用聚合框架 import pymongo conn = pymongo.MongoClient() db = conn.test col = db.collection for doc in col.aggregate([{'$unwind': '$impressions'}, {'$match': {'impressions.service': 'furniture'}}, {'$group': {'_id': '$impressions.id', 'impressions ... -
您需要将db.runCommand()的返回文档捕获到变量中,然后在脚本中检查其ok值 - 然后可以抛出错误或打印输出等。 print("it is shown"); var res = db.runCommand( { mapReduce: "mycol", map: function(){ print(not_exists); }, reduce: ...
-
如何阅读RCFile(How to read in a RCFile)[2021-12-02]
经过一番挖掘,我找到了解决方案。 这里的关键是不使用RCFile.Reader而是使用RCFileRecordReader 。 这是我最终得到的,也适用于打开多个文件: try { ... -
mongodb mapReduce中的作用域和查询有什么区别?(What is the difference between scope and query in mongodb mapReduce?)[2023-11-27]
scope参数“指定可在地图中访问的全局变量,减少和完成功能”( 来源 )。 这意味着你可以传递你可以在JavaScript的JavaScript代码中使用的变量, reduce和finalize 。 因此, scope不会过滤任何东西。 然而, query参数过滤类似于db.collection.find()的集合中的文档,然后mapReduce仅适用于这些集合条目。 scope示例 这是来自mapReduce 文档的示例的修改版本。 考虑使用以下命令创建的集合: db.orders.insert({ ... -
您可以将job1的输出作为输入链接到job2。 inputdir1 - > outputdir1 - > outputdir2 ... - > outputdir9 - > outputdir10 You can just chain the output of job1 as the input to job2. inputdir1 -> outputdir1 -> outputdir2 ... -> outputdir9 -> outputdir10
-
1)当文件扩展名为.gz时,通常isSplitable返回false。 要么 2)您可以编写自己的InputFormat覆盖isSplitable。 要么 3)不要试图让isSplitable返回false。 而是将文件的块大小设置为大于文件大小: hadoop fs -D fs.local.block.size = 1000000000 -put local_name remote_location 1) Normally isSplitable returns false when your file ...
-
在mapreduce中处理的文件(Files processed in mapreduce)[2023-06-18]
这是我发现的: 准备管道 files_list=self.get_files_list() pipeline=FilesPipeline(mapper_files=files_list) pipeline.start(queue_name='proc-files') 管道定义 class FilesPipeline(base_handler.PipelineBase):def run(self,mapper_files = []): output_blobstore_ids = yield map ... -
这是安全的,因为Spark不使用MapReduce作为处理引擎,但它直接与YARN交互以提交操作。 因此,当您使用Spark时,没有安排MapReduce作业,但您有一个Spark应用程序和Spark作业。 It's safe because Spark doesn't use MapReduce as processing engine, but it interacts directly with YARN to submit operations. Thus, when you use Spark, ...