Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to construct Kubernetes Job Controller

2025-01-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to construct Kubernetes Job Controller". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to construct Kubernetes Job Controller".

Realization flow chart

Don't talk too much nonsense, post the whole process first.

New JobControllertype JobController struct {kubeClient clientset.Interface podControl controller.PodControlInterface / / To allow injection of updateJobStatus for testing. UpdateHandler func (job * batch.Job) error syncHandler func (jobKey string) (bool, error) / / podStoreSynced returns true if the pod store has been synced at least once. / / Added as a member to the struct to allow injection for testing. PodStoreSynced cache.InformerSynced / / jobStoreSynced returns true if the job store has been synced at least once. / / Added as a member to the struct to allow injection for testing. JobStoreSynced cache.InformerSynced / A TTLCache of pod creates/deletes each rc expects to see expectations controller.ControllerExpectationsInterface / / A store of jobs jobLister batchv1listers.JobLister / / A store of pods, populated by the podController podStore corelisters.PodLister / / Jobs that need to be updated queue workqueue.RateLimitingInterface recorder record.EventRecorder} func NewJobController (podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer KubeClient clientset.Interface) * JobController {eventBroadcaster: = record.NewBroadcaster () eventBroadcaster.StartLogging (glog.Infof) / / TODO: remove the wrapper when every clients have moved to use the clientset. EventBroadcaster.StartRecordingToSink (& v1core.EventSinkImpl {Interface: v1core.New (kubeClient.CoreV1 (). RESTClient ()) .events ("")} if kubeClient! = nil & & kubeClient.CoreV1 () .RESTClient () .GetRateLimiter ()! = nil {metrics.RegisterMetricAndTrackRateLimiterUsage ("job_controller", kubeClient.CoreV1 (). RESTClient (). GetRateLimiter ())} jm: = & JobController {kubeClient: kubeClient PodControl: controller.RealPodControl {KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder (scheme.Scheme, v1.EventSource {Component: "job-controller"),}, expectations: controller.NewControllerExpectations (), queue: workqueue.NewNamedRateLimitingQueue (workqueue.NewItemExponentialFailureRateLimiter (DefaultJobBackOff, MaxJobBackOff), "job") Recorder: eventBroadcaster.NewRecorder (scheme.Scheme, v1.EventSource {Component: "job-controller"}),} jobInformer.Informer () .AddEventHandler (cache.ResourceEventHandlerFuncs {AddFunc: jm.enqueueController, UpdateFunc: jm.updateJob, DeleteFunc: jm.enqueueController }) jm.jobLister = jobInformer.Lister () jm.jobStoreSynced = jobInformer.Informer () .HasSynced podInformer.Informer () .AddEventHandler (cache.ResourceEventHandlerFuncs {AddFunc: jm.addPod, UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod }) jm.podStore = podInformer.Lister () jm.podStoreSynced = podInformer.Informer (). HasSynced jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob return jm}

Construct JobController and initialize related data, such as rate limiter queue

Watch pod and job object

Register add/del/update EventHandler for podInformer

Register add/del/update EventHandler for jobInformer

Register updataHandler as updateJobStatus, which is used to update Job status

Register syncHandler as syncJob, which is used to process Job in queue

JobController Run// Run the main goroutine responsible for watching and syncing jobs.func (jm * JobController) Run (workers int, stopCh = ActiveDeadlineSeconds, indicates the job DeadlineExceeded,jobFailed.

If jobFailed, use sync.WaitGroup to concurrently wait for all previously filtered activePods to be deleted. If deleted successfully, failed + = acitve, active = 0, and set Condition Failed to true.

If job not failed, jobNeedSync is true, and the DeletionTimestamp of job is empty (not marked for deletion), manageJob is called to add or del the Job-managed pods according to a complex policy.

If job not failed and job.Spec.Completions is nil, it means This type of job is complete when any pod exits with success. So if succeeded > 0 & & active = = 0, it means job completed.

If job not failed and job.Spec.Completions is not nil, it means This type of job signals success by having that number of successes. So if succeeded > = job.Spec.Completions, it means job completed.

If job completed, update its Conditions Complete to true and set CompletionTime.

Next, invoke updateJobStatus updates the job status in the etcd, and if the update fails, it returns false, and the job will join the queue again. If jobHaveNewFailure is true and the Job Condition displays the Job not Finished, false is returned and the job joins the queue again.

ManageJob// manageJob is the core method responsible for managing the number of running// pods according to what is specified in the job.Spec.// Does NOT modify .func (jm * JobController) manageJob (activePods [] * v1.Pod, succeeded int32, job * batch.Job) (int32, error) {var activeLock sync.Mutex active: = int32 (len (activePods)) parallelism: = * job.Spec.Parallelism jobKey Err: = controller.KeyFunc (job) if err! = nil {utilruntime.HandleError (fmt.Errorf ("Couldn't get key for job% # v:% v", job, err)) return 0, nil} var errCh chan error if active > parallelism {diff: = active-parallelism errCh = make (chan error) Diff) jm.expectations.ExpectDeletions (jobKey, int (diff)) glog.V (4). Infof ("Too many pods running job Q, need d, deleting d", jobKey, parallelism, diff) / / Sort the pods in the order such that not-ready

< ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. sort.Sort(controller.ActivePods(activePods)) active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { go func(ix int32) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of deletes because the informer won't observe this deletion glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) jm.expectations.DeletionObserved(jobKey) activeLock.Lock() active++ activeLock.Unlock() errCh 0 { wantActive = active } else { wantActive = parallelism } } else { // Job specifies a specific number of completions. Therefore, number // active should not ever exceed number of remaining completions. wantActive = *job.Spec.Completions - succeeded if wantActive >

Parallelism {wantActive = parallelism}} diff: = wantActive-active if diff

< 0 { utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff = 0 } jm.expectations.ExpectCreations(jobKey, int(diff)) errCh = make(chan error, diff) glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff wait := sync.WaitGroup{} // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would // likely all fail with the same error. For example a project with a // low quota that attempts to create a large number of pods will be // prevented from spamming the API service with the pod create requests // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff >

0; batchSize = integer.Int32Min (2*batchSize, diff) {errorCount: = len (errCh) wait.Add (int (batchSize)) for I: = int32 (0); I < batchSize Err + {go func () {defer wait.Done () err: = jm.podControl.CreatePodsWithControllerRef (job.Namespace, & job.Spec.Template, job, metav1.NewControllerRef (job) ControllerKind) if err! = nil & & errors.IsTimeout (err) {/ / Pod is created but its initialization has timed out. / / If the initialization is successful eventually, the / / controller will observe the creation via the informer. / / If the initialization fails, or if the pod keeps / / uninitialized for a long time, the informer will not / / receive any update, and the controller will create a new / / pod when the expectation expires. Return} if err! = nil {defer utilruntime.HandleError (err) / / Decrement the expected number of creates because the informer won't observe this pod Glog.V (2) Infof ("Failed creation Decrementing expectations for job% qbat% Q ", job.Namespace Job.Name) jm.expectations.CreationObserved (jobKey) activeLock.Lock () active-- activeLock.Unlock ( ) errCh

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report