本篇内容主要讲解“MapReduce的output输出过程是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce的output输出过程是什么”吧!

1、首先看 ReduceTask.run() 这个执行入口
//--------------------------ReduceTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    if (this.isMapOrReduce()) {
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
    }
    TaskReporter reporter = this.startReporter(umbilical);
    boolean useNewApi = job.getUseNewReducer();
    //reducetask初始化工作
    this.initialize(job, this.getJobID(), reporter, useNewApi);
    if (this.jobCleanup) {
        this.runJobCleanupTask(umbilical, reporter);
    } else if (this.jobSetup) {
        this.runJobSetupTask(umbilical, reporter);
    } else if (this.taskCleanup) {
        this.runTaskCleanupTask(umbilical, reporter);
    } else {
        this.codec = this.initCodec();
        RawKeyValueIterator rIter = null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = null;
        Class combinerClass = this.conf.getCombinerClass();
        CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;
        Class extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);
        shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
        Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);
        shuffleConsumerPlugin.init(shuffleContext);
        rIter = shuffleConsumerPlugin.run();
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        this.setPhase(Phase.REDUCE);
        this.statusUpdate(umbilical);
        Class keyClass = job.getMapOutputKeyClass();
        Class valueClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputValueGroupingComparator();
        //开始运行reducetask
        if (useNewApi) {
            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        } else {
            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        }
        shuffleConsumerPlugin.close();
        this.done(umbilical, reporter);
    }和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。
2、this.initialize()
//----------------------------------------ReduceTask.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {
    //创建上下文对象
    this.jobContext = new JobContextImpl(job, id, reporter);
    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
    //修改reducetask的状态为运行中
    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
    }
    if (useNewApi) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("using new api for output committer");
        }
        //反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。
        //默认是TextOutputFormat.class
        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
    } else {
        this.committer = this.conf.getOutputCommitter();
    }
    //获取输出路径
    Path outputPath = FileOutputFormat.getOutputPath(this.conf);
    if (outputPath != null) {
        if (this.committer instanceof FileOutputCommitter) {
            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
        } else {
            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
        }
    }
    this.committer.setupTask(this.taskContext);
    Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
    if (this.pTree != null) {
        this.pTree.updateProcessTree();
        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
    }
}主要就是初始化上下文对象,获取outputformat对象。
3、this.runNewReducer()
//-----------------------------------------------ReduceTask.java privatevoid runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException { //匿名内部类,用于构建key,value的迭代器 rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //反射获取Reducer对象 org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //获取RecordWriter对象,用于将结果写入到文件中 org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); //创建reduceContext对象,用于reduce任务 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); //开始运行reduce try { reducer.run(reducerContext); } finally { //关闭输出流 trackedRW.close(reducerContext); } } 
可以看到,主要做了以下工作:
1)获取reducer对象,用于运行run() ,也就是运行reduce方法
2)创建 RecordWriter对象
3)创建reduceContext
4)开始运行reducer中的run
4、ReduceTask.NewTrackingRecordWriter()
//--------------------------------------NewTrackingRecordWriter.java static class NewTrackingRecordWriterextends org.apache.hadoop.mapreduce.RecordWriter { private final org.apache.hadoop.mapreduce.RecordWriter real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; private final List fsStats; NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List matchedStats = null; if (reduce.outputFormat instanceof FileOutputFormat) { matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesOutPrev = this.getOutputBytes(this.fsStats); //通过outputFormat创建RecordWriter对象 this.real = reduce.outputFormat.getRecordWriter(taskContext); long bytesOutCurr = this.getOutputBytes(this.fsStats); this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } ..................... } 
重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。
上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()
5、TextOutputFormat.getRecordWriter()
public class TextOutputFormatextends FileOutputFormat { public TextOutputFormat() { } //可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //key和value的分隔符,默认是 \t String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); //分为压缩和非压缩输出 if (!isCompressed) { //获取输出路径 Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); //创建输出流 FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator); } else { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); //返回LineRecordWriter对象 return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } //这里就是 LineRecordWriter 类 protected static class LineRecordWriter implements RecordWriter { private static final byte[] NEWLINE; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } //将KV输出 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { //先写key if (!nullKey) { this.writeObject(key); } //接着写入key和value之间的分隔符 if (!nullKey && !nullValue) { this.out.write(this.keyValueSeparator); } //最后写入value if (!nullValue) { this.writeObject(value); } //接着写入新的一行 this.out.write(NEWLINE); } } public synchronized void close(Reporter reporter) throws IOException { this.out.close(); } static { NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); } } } 
可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。
接着回到3中,看 reduceContext这个对象的类
6、reduceContext = ReduceTask.createReduceContext()
protected staticReducer .Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException { ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); Reducer .Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext); return reducerContext; } 
可以看到reducerContext是一个ReduceContextImpl类对象。
下面看看ReduceContextImpl 这个类的构造方法
//---------------------------------ReduceContextImpl.java public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException { //父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了 super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(this.buffer); this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(this.buffer); this.hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; } 
这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。
继续看看父类 TaskInputOutputContextImpl
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriteroutput, OutputCommitter committer, StatusReporter reporter) { //可以看到这里的output就是recordWriter对象 super(conf, taskid, reporter); this.output = output; this.committer = committer; } //这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回true public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; //调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.output.write(key, value); 
可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。
7、reducer.run()
public void run(Reducer.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } } 
可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。
到此,相信大家对“MapReduce的output输出过程是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
名称栏目:MapReduce的output输出过程是什么-创新互联
URL地址:http://www.jxjierui.cn/article/eejsd.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 