In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
当job提交至yarn之后,就会开始调度运行map任务,这里开始讲解map输入的源码分析。
一个map任务的入口就是 MapTask.class 中的run() 方法
1、首先看看MapTask.run() 方法
MapTask.class
//---------------------------------MapTask.javapublic void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (this.isMapTask()) { if (this.conf.getNumReduceTasks() == 0) { this.mapPhase = this.getProgress().addPhase("map", 1.0F); } else { this.mapPhase = this.getProgress().addPhase("map", 0.667F); this.sortPhase = this.getProgress().addPhase("sort", 0.333F); } } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); //进行map任务的初始化 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { //启动map任务,判断是使用新的还是旧的api if (useNewApi) { this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter); } else { this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter); } this.done(umbilical, reporter); }}
上面重点有两个方法,一个是 this.initialize()以及 this.runNewMapper()。
2、下面看看this.initialize()//---------------------------------Task.javapublic void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //创建task以及job上下文对象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //将task任务的状态改为正在运行 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //获取job中配置的输出格式类,并通过反射获取该类的Class对象 this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); //通过outputformat类获取commiter this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //从FileOutputFormat获取任务结果输出路径。 /* 可能有的人会奇怪,为啥mapper这里要获取outputformat 的输出路径。 首先我们要知道,一个MapReduce任务可以只有mapper,而没有reducer的, 那么这时候程序的输出是有mapper直接输出的,这时候自然就需要知道输出的路径,这里就派上用场了 */ Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.