Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How does Hadoo submit jobs to the cluster?

2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how Hadoo submits the homework to the cluster". In the daily operation, I believe that many people have doubts about how the Hadoo submitted the homework to the cluster. The editor consulted all kinds of materials and sorted out the simple and useful operation methods. I hope it will be helpful to answer the doubt of "how Hadoo submits the homework to the cluster". Next, please follow the editor to study!

One: the flow chart of the MapReduce job submission process

From the figure, we can see that there are three main parts, namely: 1) JobClient: job client. 2) JobTracker: the tracker of the job. 3) TaskTracker: tracker for the task.

MapReduce submits the job to JobClient, then JobClient interacts with JobTracker, and JobTracker monitors and assigns TaskTracker to complete the processing of specific jobs.

The following analysis is the source code of Hadoop2.6.4. Please note: the source code is slightly different from the previous Hadoop version, so some concepts are still a little different from the above figure.

2: how to submit MapReduce Job 2.1 to complete the actual submission of the job, that is: * * job.waitForCompletion (true) * *

Trace the waitForCompletion and notice the submit () in it, as follows:

/ * Submit the job to the cluster and wait for it to finish. * / public boolean waitForCompletion (boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {if (state = = JobState.DEFINE) {submit ();} if (verbose) {monitorAndPrintJob ();} else {/ / get the completion poll interval from the client. Int completionPollIntervalMillis = Job.getCompletionPollInterval (cluster.getConf ()); while (! isComplete ()) {try {Thread.sleep (completionPollIntervalMillis);} catch (InterruptedException ie) {}} return isSuccessful ();}

Parameter verbose, if you want to print the current task execution progress on the console, set it to true

**

2.2 submit ()

* * in the submit method, the Job is submitted to the corresponding Cluster, and then returned immediately without waiting for the completion of the Job execution.

At the same time, the status of the Job instance is set to JobState.RUNNING, which indicates that Job is in progress.

Then while Job is running, you can call getJobState () to get the running status of Job

/ * Submit the job to the cluster and return immediately. * / public void submit () throws IOException, InterruptedException, ClassNotFoundException {ensureState (JobState.DEFINE); setUseNewAPI (); connect (); final JobSubmitter submitter = getJobSubmitter (cluster.getFileSystem (), cluster.getClient ()); status = ugi.doAs (new PrivilegedExceptionAction () {public JobStatus run () throws IOException, InterruptedException, ClassNotFoundException {return submitter.submitJobInternal (Job.this, cluster);}}); state = JobState.RUNNING LOG.info ("The url to track the job:" + getTrackingURL ();}

Before the task is submitted, the cluster (Cluster) is linked through the connect () method:

Private synchronized void connect () throws IOException, InterruptedException, ClassNotFoundException {if (cluster = = null) {cluster = ugi.doAs (new PrivilegedExceptionAction () {public Cluster run () throws IOException, InterruptedException, ClassNotFoundException {return new Cluster (getConfiguration ()) });}}

This is a thread protection method. In this method, a Cluster object is initialized based on the configuration information, which represents the cluster.

Public Cluster (Configuration conf) throws IOException {this (null, conf);} public Cluster (InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser (); initialize (jobTrackAddr, conf) } private void initialize (InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {synchronized (frameworkLoader) {for (ClientProtocolProvider provider: frameworkLoader) {LOG.debug ("Trying ClientProtocolProvider:" + provider.getClass (). GetName ()); ClientProtocol clientProtocol = null; try {if (jobTrackAddr = = null) {clientProtocol = provider.create (conf) } else {clientProtocol = provider.create (jobTrackAddr, conf);} if (clientProtocol! = null) {clientProtocolProvider = provider; client = clientProtocol; LOG.debug ("Picked" + provider.getClass (). GetName () + "as the ClientProtocolProvider"); break } else {LOG.debug ("Cannot pick" + provider.getClass (). GetName () + "as the ClientProtocolProvider-returned null protocol");}} catch (Exception e) {LOG.info ("Failed to use" + provider.getClass (). GetName () + "due to error:" + e.getMessage ()) }} if (null = = clientProtocolProvider | | null = = client) {throw new IOException ("Cannot initialize Cluster. Please check your configuration for "+ MRConfig.FRAMEWORK_NAME +" and the correspond server addresses. ");}}

And before the previous code,

Private static ServiceLoader frameworkLoader = ServiceLoader.load (ClientProtocolProvider.class)

You can see that java.util.ServiceLoader is used in the client agent creation phase, including LocalClientProtocolProvider (local job) and YarnClientProtocolProvider (yarn job) (hadoop has a Yarn parameter mapreduce.framework.name to control the application framework of your choice. In MRv2, mapreduce.framework.name has two values: local and yarn), and the corresponding client will be created according to the configuration of mapreduce.framework.name

Mapred-site.xml:

Mapreduce.framework.name yarn 2.3 after instantiating Cluster, the real task is submitted to submitter.submitJobInternal (Job.this, cluster); JobStatus submitJobInternal (Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {/ / validate the jobs output specs checkSpecs (job); Configuration conf = job.getConfiguration (); addMRFrameworkToDistributedCache (conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir (cluster, conf); / / configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost () If (ip! = null) {submitHostAddress = ip.getHostAddress (); submitHostName = ip.getHostName (); conf.set (MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set (MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);} JobID jobId = submitClient.getNewJobID (); job.setJobID (jobId); Path submitJobDir = new Path (jobStagingArea, jobId.toString ()); JobStatus status = null Try {conf.set (MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser (). GetShortUserName ()); conf.set ("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set (MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString ()); LOG.debug ("Configuring job" + jobId + "with" + submitJobDir + "as the submit dir") / / get delegation token for the dir TokenCache.obtainTokensForNamenodes (job.getCredentials (), new Path [] {submitJobDir}, conf); populateTokenCache (conf, job.getCredentials ()); / / generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey (job.getCredentials ()) = = null) {KeyGenerator keyGen; try {int keyLen = CryptoUtils.isShuffleEncrypted (conf)? Conf.getInt (MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS): SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance (SHUFFLE_KEYGEN_ALGORITHM); keyGen.init (keyLen);} catch (NoSuchAlgorithmException e) {throw new IOException ("Error generating shuffle secret key", e) } SecretKey shuffleKey = keyGen.generateKey (); TokenCache.setShuffleSecretKey (shuffleKey.getEncoded (), job.getCredentials ());} copyAndConfigureFiles (job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath (submitJobDir); / / Create the splits for the job LOG.debug ("Creating splits at" + jtFs.makeQualified (submitJobDir)); int maps = writeSplits (job, submitJobDir); conf.setInt (MRJobConfig.NUM_MAPS, maps) LOG.info ("number of splits:" + maps); / / write "queue admins of the queue to which job is being submitted" / / to job file. String queue = conf.get (MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins (queue); conf.set (toFullPropertyName (queue, QueueACL.ADMINISTER_JOBS.getAclName ()), acl.getAclString ()); / / removing jobtoken referrals before copying the jobconf to HDFS / / as the tasks don't need this setting, actually they may break / / because of it if present as the referral will point to a / / different job. TokenCache.cleanUpTokenReferral (conf); if (conf.getBoolean (MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {/ / Add HDFS tracking ids ArrayList trackingIds = new ArrayList (); for (Token)

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report