In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
小编给大家分享一下Hadoop如何实现job提交,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
从如下地方开始,就要进行job的提交了
boolean isSuccess = job.waitForCompletion(true);
之后,进入Job类的waitForCompletion方法。
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } //---- return isSuccessful(); }> > 这里输入引用文本
之后调用Job类的submit方法,
public void submit() throws IOException, InterruptedException, ClassNotFoundException { connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); }
connect方法负责初始化集群信息:
private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
集群信息cluster,包括什么,应该很清晰:
private ClientProtocolProvider clientProtocolProvider; private ClientProtocol client;
private UserGroupInformation ugi; private Configuration conf; private FileSystem fs = null;
private Path sysDir = null; private Path stagingAreaDir = null; private Path jobHistoryDir = null;
略微分析下, ClientProtocolProvider是客户端协议的生产者,对应的客户端是ClientProtocol。
ClientProtocolProvider规定了2个方法:
create
close 分别也用来创建和关闭客户端ClientProtocol。
而,ClientProtocolProvider的具体实现类有2个。
可以看到,有两个协议生产者,分别是yarn和local的。
那么,对应的客户端ClientProtocol,也会有两个。
ClientProtocol是个接口,里面规定了如下几个方法:
那么,不同的客户端yarn或者local,实现其中的方法即可。 因为,我们是本地Eclipse运行,直接看local即可,yarn的原理差不多,
OK,经过connect方法之后,cluster中这几个就有啦,即使没有的话,get的时候,也会初始化的。
之后, 使用集群的,FileSystem和client创建一个submiter。
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
然后,调用submitter 的submitJobInternal方法提交作业,OK,进入submitJobInternal方法。
JobSubmiter类的submitJobInternal方法大致过程如下:
checkSpecs(job); 检查作业输出路径。
//获得staging路径,注意:集群cluster中有这个路径的名称,只不过这里需要创建路径。 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//在staging路径下创建一个以jobid为标示的文件夹 JobID jobId = submitClient.getNewJobID(); Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//job需要的一些文件和jar包之类的,都放到刚才的那个submitJobDir路径下 copyAndConfigureFiles(job, submitJobDir);
具体的东西包括:
String files = conf.get("tmpfiles"); String libjars = conf.get("tmpjars"); String archives = conf.get("tmparchives");
//写入job输入的分片信息 int maps = writeSplits(job, submitJobDir);
split信息包括两个部分。 首先调用Inputformat获得分片的个数,具体如何获得,后续讲。 将返回的分片数组逐个遍历并持久化到一个文件。
SplitMetaInfo[] info = writeNewSplits(conf, splits, out); 而writeNewSplits代码主要就是写分片信息到文件中。
之后,将split的分片信息持久化一个元数据文件。 writeJobSplitMetaInfo方法。
//将job的描述信息,写到一个job.xml放到相应的staging目录下的jobid目录。 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); writeConf(conf, submitJobFile);
FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { conf.writeXml(out); } finally { out.close(); }
//提交作业 status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
OK,提交作业部分的代码就到这,后续写写,app master运行的过程。
总结,提交作业的主要功能。
创建staging路径
在staging路径下面创建作业id的路径
把job相关的文件拷贝到路径下
将job的split信息序列化到文件中
将job的xml写到路径下
这些东西都放到hdfs,作为所有节点共享访问的地方。之后,app master会访问这个目录,copy job的配置文件到本地并创建job对象,并根据split的信息,创建对应的maptaskrunable。运行。
但是,总的job信息依然在hdfs上。
以上是"Hadoop如何实现job提交"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.