In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
1. Source code analysis. 1. Entry for submitting job
The job is submitted and run through job.waitForCompletion (true). Let's start with this method to analyze the source code.
/ /-job.javapublic boolean waitForCompletion (boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {/ / if the status of job is not running, submit the task if (this.state = = Job.JobState.DEFINE) {this.submit ();} if (verbose) {/ / monitor and print the running information this.monitorAndPrintJob () } else {int completionPollIntervalMillis = getCompletionPollInterval (this.cluster.getConf ()); while (! this.isComplete ()) {try {Thread.sleep ((long) completionPollIntervalMillis);} catch (InterruptedException var4) {}} return this.isSuccessful () 2. This.submit () submits job//-job.javapublic void submit () throws IOException, InterruptedException, ClassNotFoundException {/ / make sure that the job status is not running this.ensureState (Job.JobState.DEFINE); / / use the new api this.setUseNewAPI (); / / mainly initialize the client in the cluster object to communicate with the cluster connection. It is divided into yarn client and local client this.connect (); / / obtain the job submitter through the cluster object, and take the file system storing job information and client as parameters final JobSubmitter submitter = this.getJobSubmitter (this.cluster.getFileSystem (), this.cluster.getClient ()) / / submit job and run this.status = (JobStatus) this.ugi.doAs (new PrivilegedExceptionAction () {public JobStatus run () throws IOException, InterruptedException, ClassNotFoundException {/ / here is the submission job, run, return status return submitter.submitJobInternal (Job.this, Job.this.cluster);}}); this.state = Job.JobState.RUNNING LOG.info ("The url to track the job:" + this.getTrackingURL ();}
There are three important process methods involved here:
This.connect () mainly initializes the client that submits the job
This.getJobSubmitter () encapsulates a lot of api for job
Submitter.submitJobInternal (Job.this, Job.this.cluster) submit job and run
Let's take a detailed look at what these three methods have done.
3. This.connect () initializes client//-job.javaprivate synchronized void connect () throws IOException, InterruptedException, ClassNotFoundException {/ / create a cluster connection object for connecting to the cluster, and provides a lot of api if (this.cluster = = null) {this.cluster = (Cluster) this.ugi.doAs (new PrivilegedExceptionAction () {public Cluster run () throws IOException, InterruptedException) ClassNotFoundException {return new Cluster (Job.this.getConfiguration ()) });}}
The most important thing about this code is to create a Cluster object. Let's take a look at the constructor of this class.
/ /-- Cluster.javapublic Cluster (Configuration conf) throws IOException {this ((InetSocketAddress) null, conf);} public Cluster (InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {this.fs = null; this.sysDir = null; / / job working directory this.stagingAreaDir = null; this.jobHistoryDir = null; / / client and server communication protocol provider this.providerList = null / / Save the configuration conf of job this.conf = conf; / / get the current user this.ugi = UserGroupInformation.getCurrentUser (); / / A pair of job submitters client initializes this.initialize (jobTrackAddr, conf);} / / here is the method to initialize client, mainly to get this.clientprivate void initialize (InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {this.initProviderList (); Iterator i$ = this.providerList.iterator () While (i$.hasNext ()) {/ * provider there are also YarnClientProtocolProvider and LocalClientProtocolProvider, that is, local and yarn provider * / ClientProtocolProvider provider = (ClientProtocolProvider) i$.next (); LOG.debug ("Trying ClientProtocolProvider:" + provider.getClass (). GetName ()); ClientProtocol clientProtocol = null Try {/ * determines whether jobTrackAddr is empty, that is, running job in a remote cluster or local mode. For remote clusters, create a yarn submitter,: YARNRunner. If you create a local through YarnClientProtocolProvider, create a local local submitter: LocalRunner. The creation through LocalClientProtocolProvider is mainly based on whether the value of mapreduce.framework.name in conf is local or yarn to create the corresponding runner * / if (jobTrackAddr = = null) {clientProtocol = provider.create (conf) } else {clientProtocol = provider.create (jobTrackAddr, conf);} if (clientProtocol! = null) {this.clientProtocolProvider = provider; / / you can see here that client is the this.client = clientProtocol created by provider above LOG.debug ("Picked" + provider.getClass (). GetName () + "as the ClientProtocolProvider"); / / exit break;} LOG.debug ("Cannot pick" + provider.getClass (). GetName () + "as the ClientProtocolProvider-returned null protocol") as long as client and provider are successfully created. } catch (Exception var7) {LOG.info ("Failed to use" + provider.getClass (). GetName () + "due to error:", var7);} if (null = = this.clientProtocolProvider | | null = = this.client) {throw new IOException ("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses. ");}
You can see that the Cluster object mainly initializes two objects, clientProtocolProvider and client.
That is, provider and client,client are created through provider.create.
Let's take a look at ClientProtocolProvider and ClientProtocol. Both classes are abstract, so let's see which implementation subclasses they should have.
ClientProtocolProvider: YarnClientProtocolProvider LocalClientProtocolProviderClientProtocol: YARNRunner LocalJobRunner
You can take a look at YarnClientProtocolProvider and LocalClientProtocolProvider's create method
Public class LocalClientProtocolProvider extends ClientProtocolProvider {. Public ClientProtocol create (Configuration conf) throws IOException {String framework = conf.get ("mapreduce.framework.name", "local"); if (! "local" .equals (framework)) {return null;} else {conf.setInt ("mapreduce.job.maps", 1); / / create LocalJobRunner return new LocalJobRunner (conf) }}.} / = public class YarnClientProtocolProvider extends ClientProtocolProvider {.. Public ClientProtocol create (Configuration conf) throws IOException {/ / create YARNRunner return "yarn" .equals (conf.get ("mapreduce.framework.name")? New YARNRunner (conf): null;}..}
In general, provider is divided into YarnClientProtocolProvider and LocalClientProtocolProvider, which are used to create YARNRunner and LocalJobRunner in client, respectively. Indicates that job operates in both local and yarn modes.
At this point, this.client and this.provider, the two objects in the Cluster object, are initialized.
4. This.getJobSubmitter () encapsulates submitter//-job.javapublic JobSubmitter getJobSubmitter (FileSystem fs, ClientProtocol submitClient) throws IOException {return new JobSubmitter (fs, submitClient);}
Create a JobSubmitter object and look at the construction method
/ /-JobSubmitter.javaJobSubmitter (FileSystem submitFs, ClientProtocol submitClient) throws IOException {this.submitClient = submitClient; this.jtFs = submitFs;}
It seems nothing special, just save the file system fs and the initialized client in the cluster above. But there are actually a lot of methods in this class that will be called later. Talk about it later.
5. Submitter.submitJobInternal () submits job
This method is the core of the whole job submission process, and you should pay attention to
/ /-JobSubmitter.javaJobStatus submitJobInternal (Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {/ / check whether the configured output already exists and throw an exception this.checkSpecs (job); Configuration conf = job.getConfiguration (); addMRFrameworkToDistributedCache (conf); / / get all job working directories Path jobStagingArea = JobSubmissionFiles.getStagingDir (cluster, conf) / / get the ip address object InetAddress ip = InetAddress.getLocalHost (); / / set the hostname and ip if (ip! = null) {this.submitHostAddress = ip.getHostAddress (); this.submitHostName = ip.getHostName (); conf.set ("mapreduce.job.submithostname", this.submitHostName); conf.set ("mapreduce.job.submithostaddress", this.submitHostAddress) for submitting the job } / / apply to the cluster to run job through client, and get the corresponding jobid. This submitclient is the previous cluster initialization completed JobID jobId = this.submitClient.getNewJobID (); job.setJobID (jobId); / / create a directory object to store job-related resource data. Store job configuration files, slice information files, program jar packages, etc. Path submitJobDir = new Path (jobStagingArea, jobId.toString ()); JobStatus status = null; JobStatus var24; try {conf.set ("mapreduce.job.user.name", UserGroupInformation.getCurrentUser (). GetShortUserName ()); conf.set ("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer") Conf.set ("mapreduce.job.dir", submitJobDir.toString ()); LOG.debug ("Configuring job" + jobId + "with" + submitJobDir + "as the submit dir"); / / obtain authorization TokenCache.obtainTokensForNamenodes (job.getCredentials (), new Path [] {submitJobDir}, conf) to access a specific directory in namenode; this.populateTokenCache (conf, job.getCredentials ()) / / verify token related if (TokenCache.getShuffleSecretKey (job.getCredentials ()) = = null) {KeyGenerator keyGen; try {keyGen = KeyGenerator.getInstance ("HmacSHA1"); keyGen.init (64);} catch (NoSuchAlgorithmException var19) {throw new IOException ("Error generating shuffle secret key", var19) } SecretKey shuffleKey = keyGen.generateKey (); TokenCache.setShuffleSecretKey (shuffleKey.getEncoded (), job.getCredentials ());} if (CryptoUtils.isEncryptedSpillEnabled (conf)) {conf.setInt ("mapreduce.am.max-attempts", 1); LOG.warn ("Max job attempts set to 1 since encrypted intermediatedata spill is enabled") } / / copy the temporary file of job and the running jar package to this.copyAndConfigureFiles (job, submitJobDir) under submitJobDir; / / get the path to store the job configuration information file, generally named as: submitJobDir/job.xml Path submitJobFile = JobSubmissionFiles.getJobConfPath (submitJobDir); LOG.debug ("Creating splits at" + this.jtFs.makeQualified (submitJobDir)) / / store the slice information under submitJobDir and return the number of slices. InputFormat.getSplits () is called to get the planned slice information / / the slice information is written to submitJobDir/job.split, and the meta-information of the slice information entry is written to submitJobDir/job.splitmetainfo int maps = this.writeSplits (job, submitJobDir); conf.setInt ("mapreduce.job.maps", maps); LOG.info ("number of splits:" + maps) / / Transmission queue name String queue = conf.get ("mapreduce.job.queuename", "default"); / / submitClient is actually cluster's client AccessControlList acl = this.submitClient.getQueueAdmins (queue); conf.set (QueueManager.toFullPropertyName (queue, QueueACL.ADMINISTER_JOBS.getAclName ()), acl.getAclString ()); TokenCache.cleanUpTokenReferral (conf) If (conf.getBoolean ("mapreduce.job.token.tracking.ids.enabled", false)) {ArrayList trackingIds = new ArrayList (); Iterator i$ = job.getCredentials (). GetAllTokens (). Iterator (); while (i$.hasNext ()) {Token input = (InputFormat) ReflectionUtils.newInstance (job.getInputFormatClass (), conf) / / obtain planning slice information through getSplits () generation of inputformat List splits = input.getSplits (job); T [] array = (InputSplit []) ((InputSplit []) splits.toArray (new InputSplit [splits.size ()])); Arrays.sort (array, new JobSubmitter.SplitComparator ()); / / create raw data files of slice files and metadata files JobSplitWriter.createSplitFiles (jobSubmitDir, conf, jobSubmitDir.getFileSystem (conf), array) Return array.length;}
Get the inputformat object, get the planning slice information through inputformat's getSplits (), and then JobSplitWriter.createSplitFiles () to create the slice information file. Here's the last method.
/ /-JobSplitWriter.createSplitFilespublic static void createSplitFiles (Path jobSubmitDir, Configuration conf, FileSystem fs, T [] splits) throws IOException, InterruptedException {/ / create slice output stream, and the file is named jobSubmitDir/job.split FSDataOutputStream out = createFile (fs, JobSubmissionFiles.getJobSplitFile (jobSubmitDir), conf) / serialize each slice meta-information in the array and write the slice information to jobSubmitDir/job.split / / return the meta-information of each slice entry, such as the starting position and length of each slice information in job.split [] info = writeNewSplits (conf, splits, out); out.close () / / write the meta information of the slice information file to the file jobSubmitDir/job.splitmetainfo writeJobSplitMetaInfo (fs, JobSubmissionFiles.getJobSplitMetaFile (jobSubmitDir), new FsPermission (JobSubmissionFiles.JOB_FILE_PERMISSION), 1, info);}
Two main files are generated here.
JobSubmitDir/job.split: slice information file, recording the information of each slice, such as path, block location, offset, etc.
JobSubmitDir/job.splitmetainfo: the index position of each information entry in the slice information file, such as the starting position and length of each slice information in job.split.
Let's take a look at the generation of these two files
The first is jobSubmitDir/job.split.
Private static SplitMetaInfo [] writeNewSplits (Configuration conf, T [] array, FSDataOutputStream out) throws IOException, InterruptedException {SplitMetaInfo [] info = new SplitMetaInfo [array.length]; if (array.length! = 0) {SerializationFactory factory = new SerializationFactory (conf); int I = 0; int maxBlockLocations = conf.getInt ("mapreduce.job.max.split.locations", 10); long offset = out.getPos (); InputSplit [] arr$ = array The int len$ = array.length; / / loop writes each slice information in the slice information to the file and generates the meta-information for (int i$ = 0; i$) of each slice information.
< len$; ++i$) { T split = arr$[i$]; long prevCount = out.getPos(); Text.writeString(out, split.getClass().getName()); Serializer serializer = factory.getSerializer(split.getClass()); serializer.open(out); //将切片信息对象序列化存储到文件中 serializer.serialize(split); long currCount = out.getPos(); String[] locations = split.getLocations(); if (locations.length >MaxBlockLocations) {LOG.warn ("Max block location exceeded for split:" + split + "splitsize:" + locations.length + "maxsize:" + maxBlockLocations); locations = (String []) Arrays.copyOf (locations, maxBlockLocations);} / / generate the meta-information of each slice info [iTunes +] = new SplitMetaInfo (locations, offset, split.getLength ()) Offset + = currCount-prevCount;}} return info;}
The main thing is to serialize the slice information entry object in split to the file, and generate the information to be written in jobSubmitDir/job.splitmetainfo, that is, the index information of the slice file.
Then take a look at writeJobSplitMetaInfo ()
Private static void writeJobSplitMetaInfo (FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion, SplitMetaInfo [] allSplitMetaInfo) throws IOException {/ / writes the meta-information of the slice information entry and creates an output stream FSDataOutputStream out = FileSystem.create (fs, filename, p); out.write (JobSplit.META_SPLIT_FILE_HEADER); WritableUtils.writeVInt (out, splitMetaInfoVersion); WritableUtils.writeVInt (out, allSplitMetaInfo.length); SplitMetaInfo [] arr$ = allSplitMetaInfo; int len$ = allSplitMetaInfo.length / / write for (int i$ = 0; i$ < len$; + + i$) {SplitMetaInfo splitMetaInfo = arr$ [i$]; splitMetaInfo.write (out);} out.close ();}
It's actually obvious here that the index information of the slice file is written to jobSubmitDir/job.splitmetainfo.
(4) status = this.submitClient.submitJob (jobId, submitJobDir.toString (), job.getCredentials ())
Formally submit job to get the submission status of job
Public JobStatus submitJob (JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {this.addHistoryToken (ts); / / here, the job configuration and the hdfs directory path of the job resource are passed in ApplicationSubmissionContext appContext = this.createApplicationSubmissionContext (this.conf, jobSubmitDir, ts); try {/ / submit job, the returned appid ApplicationId applicationId = this.resMgrDelegate.submitApplication (appContext); / / create appMaster ApplicationReport appMaster = this.resMgrDelegate.getApplicationReport (applicationId) based on appid String diagnostics = appMaster = = null? "application report is null": appMaster.getDiagnostics (); if (appMaster! = null & & appMaster.getYarnApplicationState ()! = YarnApplicationState.FAILED & & appMaster.getYarnApplicationState ()! = YarnApplicationState.KILLED) {return this.clientCache.getClient (jobId) .getJobStatus (jobId);} else {throw new IOException ("Failed to run job:" + diagnostics);}} catch (YarnException var8) {throw new IOException (var8) }}
The main thing here is to submit the job and create the appMaster. Finally, get the job status.
II. Summary
A job submission process is as follows:
1. Establish a connection with the MapReduce cluster this.connect ()
The most important of these is to create client, which can be done in two ways: YARNRunner and LocalJobRunner. It is later used to communicate with server, submit job, and so on.
2. Formally submit job, submitter.submitJobInternal (Job.this, cluster)
(1) Path jobStagingArea = JobSubmissionFiles.getStagingDir (cluster, conf)
Get the general working directory of job
(2) JobID jobId = this.submitClient.getNewJobID ()
Job.setJobID (jobId)
Request the jobid from the cluster by processing the client and keep it in the configuration information of the job.
(3) Path submitJobDir = new Path (jobStagingArea, jobId.toString ())
Get the working directory of the current job and the jobid naming
(4) this.copyAndConfigureFiles (job, submitJobDir)
Copy the temporary file of job and run the jar package under submitJobDir
(5) Path submitJobFile = JobSubmissionFiles.getJobConfPath (submitJobDir)
Gets the path to the job configuration information file. Named: submitJobDir/job.xml
(6) int maps = this.writeSplits (job, submitJobDir)
Stores the slice information under submitJobDir and returns the number of slices. InputFormat.getSplits () is called to get the planned slice information. The slice information is written to submitJobDir/job.split, and the meta-information of the slice information entry is written to submitJobDir/job.splitmetainfo.
(7) this.writeConf (conf, submitJobFile)
Write job configuration information to submitJobDir/job.xml
(8) status = this.submitClient.submitJob (jobId, submitJobDir.toString (), job.getCredentials ())
Formally submit job to get the submission status of job
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.