Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to create and start DaemonSet Controller

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to create and start DaemonSet Controller". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to create and start DaemonSet Controller.

DaemonSet ControllerDaemonSet Controller Struct

The core structure of DaemonSet Controller includes:

BurstReplcas int: the upper limit of the number of Create and Delete Pods for each sync, which is 250 in the code.

Queue workqueue.RateLimitingInterface: the Delaying Queue that stores the DaemonSet Key (namespaces/name) to be synchronized.

SyncHandler func (dsKey string) error: responsible for synchronizing objects in DaemonSet Queue, including Replicas management, UpdateStrategy upgrade, updating DaemonSet Status, etc., is the core logic of DaemonSet Controller.

Expectations controller.ControllerExpectationsInterface: the TTLCache that maintains the expected number of Create/Delete Pods per DaemonSet object per Sync.

SuspendedDaemonPods map [string] sets.String: key is a collection of DaemonSet, and these DaemonSet contain the Pod of 'wantToRun &! shouldSchedule' on this Node.

If NodeName is specified in the Spec of the DaemonSet, the value of the shouldSchedule is determined based on whether it matches node.Name successfully.

If one of all types of PredicateFailureError appears during Predicate, the shouldSchedule is false.

If InsufficientResourceError appears, shouldSchedule is also false.

ErrDiskConflict

ErrVolumeZoneConflict

ErrMaxVolumeCountExceeded

ErrNodeUnderMemoryPressure

ErrNodeUnderDiskPressure

InsufficientResourceError

WantToRun: True. When DaemonSet Controller is dispatched to Simulate, Predicate (mainly GeneralPredicates and PodToleratesNodeTaints) ignores the following PredicateFailureError (Error of some resource classes) successfully, and other PredicateFailureError is False. If NodeName is specified in the Spec of the DaemonSet, the value of the wantToRun is determined based on whether it matches node.Name successfully.

ShouldSchedule:

FailedPodsBackoff * flowcontrol.Backoff: a co-program is started during DaemonSet Controller Run, and failedPods GC cleanup is forced every 2*MaxDuration (2*15Min). Every time syncDaemonSet deals with the deleted Pods, it will do some delay processing according to the Backoff mechanism of 1sMagne2sMagneE4sMagne8sMagne.15min to achieve the effect of flow control. Prevent kubelet from rejecting some DaemonSet Pods and then being rejected immediately, so there will be a lot of invalid loops, so the Backoff mechanism is added.

Creation and launch of DaemonSet Controller

NewDaemonSetsController is responsible for creating the Controller, and a very important task is to register the EventHandler for the following Informer:

DaemonSetInformer: in the end, AddFunc/DeleteFunc/UpdateFunc is mainly enqueue DaemonSet.

HistoryInformer:

AddFunc: addHistory

UpdateFunc: updateHistory

DeleteFunc: deleteHistory

PodInformer:

AddFunc: addPod

UpdateFunc: updatePod

DeleteFunc: deletePod

NodeInformer:

AddFunc: addNode

UpdateFunc: updateNode

When DamonSet Controller Run starts, it does two main things:

Start 2 workers programs, and each worker is responsible for fetching DaemonSet Key from queue for sync.

Start 1 failedPodsBackoff GC protocol and clean up the Failed Pods corresponding to all DaemonSet/Node in the cluster every other 1Min.

RequeueSuspendedDaemonPods occurs only when deletePod is used. -- Why?

Synchronization of DaemonSet

Worker fetches the DamonSet Key to be synchronized from queue and calls syncDaemonSet to complete automatic management. SyncDaemonSet is the core entry of DaemonSet management.

Pkg/controller/daemon/daemon_controller.go:1208func (dsc * DaemonSetsController) syncDaemonSet (key string) error {... Ds, err: = dsc.dsLister.DaemonSets (namespace) .Get (name) if errors.IsNotFound (err) {klog.V (3). Infof ("daemon set has been deleted% v", key) dsc.expectations.DeleteExpectations (key) return nil} if err! = nil {return fmt.Errorf ("unable to retrieve ds% v from store:% v", key Err)} everything: = metav1.LabelSelector {} if reflect.DeepEqual (ds.Spec.Selector, & everything) {dsc.eventRecorder.Eventf (ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required. ") Return nil} / / Don't process a daemon set until all its creations and deletions have been processed. / / For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, / / then we do not want to call manage on foo until the daemon pods have been created. ... If ds.DeletionTimestamp! = nil {return nil} / / Construct histories of the DaemonSet, and get the hash of current history cur, old, err: = dsc.constructHistory (ds) if err! = nil {return fmt.Errorf ("failed to construct revisions of DaemonSet:% v" Err)} hash: = cur.Labels [apps.DefaultDaemonSetUniqueLabelKey] if! dsc.expectations.SatisfiedExpectations (dsKey) {/ / Only update status. Don't raise observedGeneration since controller didn't process object of that generation. Return dsc.updateDaemonSetStatus (ds, hash, false)} err = dsc.manage (ds, hash) if err! = nil {return err} / / Process rolling updates if we're ready. If dsc.expectations.SatisfiedExpectations (dsKey) {switch ds.Spec.UpdateStrategy.Type {case apps.OnDeleteDaemonSetStrategyType: case apps.RollingUpdateDaemonSetStrategyType: err = dsc.rollingUpdate (ds Hash)} if err! = nil {return err}} err = dsc.cleanupHistory (ds, old) if err! = nil {return fmt.Errorf ("failed to cleanup revisions of DaemonSet:% v", err)} return dsc.updateDaemonSetStatus (ds, hash, true)}

The core process is as follows:

First check whether the DaemonSet object is deleted in the local Store, and if so, delete the data corresponding to the DaemonSet from the expectations.

Check whether the LabelSelector of the DaemonSet object is empty. If so, the syncDaemonSet returns the end without synchronization, and the Pod corresponding to the DaemonSet will not be created.

If its DeletionTimestamp is not empty, which means that the user triggered the deletion, the syncDaemonSet returns to end without synchronization. The Pod corresponding to DaemonSet is handed over to GC Controller to delete.

Then constructHistory gets the Current ControllerRevision of the DaemonSet and all Old ControllerRevisions, and ensures that all ControllerRevisions are typed with Label: "controller-revision-hash: ControllerRevision.Name", updating Current ControllerRevision's Revision = maxRevision (old) + 1.

Check whether the current expectations has been satisfied, and if not, only update the DaemonSet Status, and the synchronization process ends.

DesiredNumberScheduled: the number of DaemonSet Pods that the user expects to schedule, corresponding to the number of pods whose wantToRun is true mentioned earlier.

CurrentNumberScheduled: the number of Pods that the user expects to schedule and is currently running on the Node.

NumberMisscheduled: the number of Pods that the user does not expect to be scheduled (wantToRun is false) and has been running on the corresponding Node, that is, the number of Pods that has been misscheduled.

In NumberReady:CurrentNumberScheduled, Pod Type Ready Condition is the number of Pods of true.

In UpdatedNumberScheduled:CurrentNumberScheduled, the number of Pods that the hash value of Pod Label controller-revision-hash corresponds to the hash value of Current ControllerRevision, that is, the number of Pods that Pod Template has updated.

In NumberAvailable:CurrentNumberScheduled, Pod Type Ready Condition is true, and the number of Pods of Available (Ready time exceeds minReadySeconds).

NumberUnavailable:desiredNumberScheduled-numberAvailable.

Both add and del in expectations are less than 0, which means that Controller expectations has been implemented, then the current expectations has been satisfied.

Expectations has timed out, and the timeout is 5min (not configurable). If it times out, synchronization is required.

If there is no information about the DaemonSet in the expectations, it is satisfied, and the DaemonSet synchronization will be triggered.

Here updateDaemonSetStatus will update the following fields of the Daemonset.Status, and note that the ObservedGeneration will not be updated (nor has it changed).

Call manage to manage DaemonSet Pod: calculate the list of Pod to be deleted and created, and then call syncNodes in batches. Complete the creation and deletion of the Pod. If syncNodes previously found that the corresponding DaemonSet Pod on some Node is Failed, then error is returned after syncNodes. SyncNode will return all the add/del in expectations to zero or even negative. Only in this way will manage be called in syncDaemonSet for Pod management.

If manage returns error, the syncDaemonSet process ends. Otherwise, the following process will be continued.

Check whether the current expectations has been satisfied, and if so, trigger the DaemonSet update according to the UpdateStrategy:

If UpdateStrategy is OnDelete, wait for the user delete Pod, trigger the enqueue of the corresponding DaemonSet, and update the latest Pod Template when syncNodes to create a new Pod.

If UpdateStrategy is RollingUpdate, call rollingUpdate for scrolling updates, which will be analyzed in detail later.

If the DaemonSet update is successful, clean up the oldest ControllerRevisions that exceeds RevisionHistoryLimit as needed (whether the number of Old ControllerRevisions exceeds Spec.RevisionHistoryLimit, default is 10).

UpdateDaemonSetStatus updates the Daemonset.Status, and unlike before, the Status.ObservedGeneration needs to be updated as well.

Scheduling of DaemonSet Pod

In versions before Kubernetes 1.12, by default, DaemonSet Controller completes the scheduling of Daemon Pods, that is, DaemonSet Controller sets the spec.nodeName value of the Pod to be scheduled, and then corresponds to the kubelet watch of Node to the event, and then creates a DaemonSet Pod on this node. In Kubernetes 1.12, ScheduleDaemonSetPods FeatureGate is enabled by default, and the scheduling of DaemonSet is left to default scheduler.

DamonSet Pods Should Be On Node

In manage daemonset, call podsShouldBeOnNode to calculate the DaemonSet Pods you want to start on the Node (nodesNeedingDaemonPods), the DaemonSet Pods you want to delete on the Node (podsToDelete), and the number of Failed DamonSetPods on the Node, and then create and delete the corresponding Pods in the syncNodes based on these three messages.

Func (dsc * DaemonSetsController) manage (ds * apps.DaemonSet, hash string) error {/ / Find out the pods which are created for the nodes by DaemonSet. NodeToDaemonPods, err: = dsc.getNodesToDaemonPods (ds)... For _, node: = range nodeList {nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err: = dsc.podsShouldBeOnNode (node, nodeToDaemonPods, ds) if err! = nil {continue} nodesNeedingDaemonPods = append (nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...) PodsToDelete = append (podsToDelete, podsToDeleteOnNode...) FailedPodsObserved + = failedPodsObservedOnNode} / / Label new pods using the hash label value of the current history when creating them if err = dsc.syncNodes (ds, podsToDelete, nodesNeedingDaemonPods, hash); err! = nil {return err}. Return nil}

How does podsShouldBeOnNode calculate nodesNeedingDaemonPods, podsToDelete, and failedPodsObserved? -- calculate the following three status values by calling nodeShouldRunDaemonPod (node * v1.Node, ds * apps.DaemonSet):

WantToRun: when DaemonSet Controller is dispatched to Simulate, Predicate (mainly GeneralPredicates and PodToleratesNodeTaints) ignores the following PredicateFailureError (all Error of resource classes) is True, and other PredicateFailureError is False. If NodeName is specified in the Spec of the DaemonSet, the value of the wantToRun is determined based on whether it matches node.Name successfully. -ErrDiskConflict;-ErrVolumeZoneConflict;-ErrMaxVolumeCountExceeded;-ErrNodeUnderMemoryPressure;-ErrNodeUnderDiskPressure;-InsufficientResourceError

ShouldSchedule:

-if NodeName is specified in the Spec of DaemonSet, the value of shouldSchedule is determined based on whether it matches node.Name successfully. -if one of all types of PredicateFailureError appears during Predicate, the shouldSchedule is false. -if InsufficientResourceError appears, shouldSchedule is also false.

FailedPodsBackoff * flowcontrol.Backoff: according to 1s, 2s, 4s, 8s. Backoff cycle to deal with (delete and rebuild) Failed DaemonSet Pods to achieve the effect of flow control. A collaborator is started during DaemonSet Controller Run, and failedPods GC cleanup is enforced every other 2*MaxDuration (2*15Min).

ShouldContinueRunning, if one of the following occurs, the value is false, and the other case is true.

ErrNodeSelectorNotMatch

ErrPodNotMatchHostName

ErrNodeLabelPresenceViolated

ErrPodNotFitsHostPorts:

ErrTaintsTolerationsNotMatch, if it is a Taint/Toleration match of type NoExecute, it is true, otherwise it is false, that is, the Taint/Toleration match of type NoExecute is ignored.

ErrPodAffinityNotMatch

ErrServiceAffinityViolated

Unknown predicate failure reason

Then, according to these three state values, we get nodesNeedingDaemonPods [] string, podsToDelete [] string, and failedPodsObserved int.

/ podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:func (dsc * DaemonSetsController) podsShouldBeOnNode (node * v1.Node, nodeToDaemonPods map [string] [] * v1.Pod, ds * apps.DaemonSet,) (nodesNeedingDaemonPods, podsToDelete [] string, failedPodsObserved int, err error) {wantToRun, shouldSchedule, shouldContinueRunning, err: = dsc.nodeShouldRunDaemonPod (node, ds) if err! = nil {return} daemonPods Exists: = nodeToDaemonPods [node.Name] dsKey, _: = cache.MetaNamespaceKeyFunc (ds) dsc.removeSuspendedDaemonPods (node.Name, dsKey) switch {case wantToRun & &! shouldSchedule: / / If daemon pod is supposed to run, but can not be scheduled, add to suspended list. Dsc.addSuspendedDaemonPods (node.Name, dsKey) case shouldSchedule & &! exists: / / If daemon pod is supposed to be running on node, but isn't, create daemon pod. NodesNeedingDaemonPods = append (nodesNeedingDaemonPods, node.Name) case shouldContinueRunning: / / If a daemon pod failed, delete it / / If there's non-daemon pods left on this node, we will create it in the next sync loop var daemonPodsRunning [] * v1.Pod for _ Pod: = range daemonPods {if pod.DeletionTimestamp! = nil {continue} if pod.Status.Phase = = v1.PodFailed {failedPodsObserved++ / / This is a critical place where DS is often fighting with kubelet that rejects pods. / / We need to avoid hot looping and backoff. BackoffKey: = failedPodsBackoffKey (ds, node.Name) now: = dsc.failedPodsBackoff.Clock.Now () inBackoff: = dsc.failedPodsBackoff.IsInBackOffSinceUpdate (backoffKey Now) if inBackoff {delay: = dsc.failedPodsBackoff.Get (backoffKey) klog.V (4). Infof ("Deleting failed pod% s s has been limited by backoff% on node% v remaining" Pod.Namespace, pod.Name, node.Name, delay) dsc.enqueueDaemonSetAfter (ds, delay) continue} dsc.failedPodsBackoff.Next (backoffKey Now) msg: = fmt.Sprintf ("Found failed daemon pod% s on node% s, will try to kill it", pod.Namespace, pod.Name, node.Name) klog.V (2) .Infof (msg) / / Emit an event so that it's discoverable to users. Dsc.eventRecorder.Eventf (ds, v1.EventTypeWarning, FailedDaemonPodReason, msg) podsToDelete = append (podsToDelete, pod.Name)} else {daemonPodsRunning = append (daemonPodsRunning, pod)}} / / If daemon pod is supposed to be running on node But more than 1 daemon pod is running, delete the excess daemon pods. / / Sort the daemon pods by creation time, so the oldest is preserved. If len (daemonPodsRunning) > 1 {sort.Sort (podByCreationTimestampAndPhase (daemonPodsRunning)) for I: = 1

< len(daemonPodsRunning); i++ { podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) } } case !shouldContinueRunning && exists: // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. for _, pod := range daemonPods { podsToDelete = append(podsToDelete, pod.Name) } } return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil}// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a summary. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) { newPod := NewPod(ds, node.Name) // Because these bools require an && of all their required conditions, we start // with all bools set to true and set a bool to false if a condition is not met. // A bool should probably not be set to true after this line. wantToRun, shouldSchedule, shouldContinueRunning = true, true, true // If the daemon set specifies a node name, check that it matches with node.Name. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { return false, false, false, nil } reasons, nodeInfo, err := dsc.simulate(newPod, node, ds) if err != nil { klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err) return false, false, false, err } // TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason, // e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning" // into one result, e.g. selectedNode. var insufficientResourceErr error for _, r := range reasons { klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason()) switch reason := r.(type) { case *predicates.InsufficientResourceError: insufficientResourceErr = reason case *predicates.PredicateFailureError: var emitEvent bool // we try to partition predicates into two partitions here: intentional on the part of the operator and not. switch reason { // intentional case predicates.ErrNodeSelectorNotMatch, predicates.ErrPodNotMatchHostName, predicates.ErrNodeLabelPresenceViolated, // this one is probably intentional since it's a workaround for not having // pod hard anti affinity. predicates.ErrPodNotFitsHostPorts: return false, false, false, nil case predicates.ErrTaintsTolerationsNotMatch: // DaemonSet is expected to respect taints and tolerations fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo) if err != nil { return false, false, false, err } if !fitsNoExecute { return false, false, false, nil } wantToRun, shouldSchedule = false, false // unintentional case predicates.ErrDiskConflict, predicates.ErrVolumeZoneConflict, predicates.ErrMaxVolumeCountExceeded, predicates.ErrNodeUnderMemoryPressure, predicates.ErrNodeUnderDiskPressure: // wantToRun and shouldContinueRunning are likely true here. They are // absolutely true at the time of writing the comment. See first comment // of this method. shouldSchedule = false emitEvent = true // unexpected case predicates.ErrPodAffinityNotMatch, predicates.ErrServiceAffinityViolated: klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason()) return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason()) default: klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason()) wantToRun, shouldSchedule, shouldContinueRunning = false, false, false emitEvent = true } if emitEvent { dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason()) } } } // only emit this event if insufficient resource is the only thing // preventing the daemon pod from scheduling if shouldSchedule && insufficientResourceErr != nil { dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error()) shouldSchedule = false } return} 如果shouldSchedule && !exists,则会把该Pod加入到nodesNeedingDaemonPods中。 如果shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase == v1.PodFailed则检查是否在流控周期(15min, hardcode)中,如果已经超过流控周期,会把该Pod加入到podsToDelete中,否则将再次入队列。 如果shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase != v1.PodFailed则会把该Pod加入到daemonPodsRunning中记录着该DamonSet在该Node上正在运行的非Failed的Pods,如果daemonPodsRunning不止一个,则需要按照创建时间排序,将不是最早创建的其他所有DaemonSet Pods都加入到podsToDelete中。 在nodeShouldRunDaemonPod中调用simulate仿真调度返回Pod和Node的匹配结果,根据algorithm.PredicateFailureReason结果知道wantToRun,shouldSchedule,shouldContinueRunning的值。下面我们看看simulate中的调度逻辑。 // Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates// and PodToleratesNodeTaints predicatefunc Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var predicateFails []algorithm.PredicateFailureReason // If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } return len(predicateFails) == 0, predicateFails, nil } critical := kubelettypes.IsCriticalPod(pod) fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } if critical { // If the pod is marked as critical and support for critical pod annotations is enabled, // check predicates for critical pods only. fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo) } else { fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo) } if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } return len(predicateFails) == 0, predicateFails, nil} 如果是启用了ScheduleDaemonSetPods FeatureGate,则Predicate逻辑如下。这里并没有真正的完成调度,只是做了三个predicate检查,最终的调度还是会交给default scheduler。default scheduler又是如何控制DaemonSet Pod和Node绑定关系的呢,先买个关子。 PodFitsHost: 检查Pod.spec.nodeName非空时是否与Node Name匹配; PodMatchNodeSelector: 检查Pod的NodeSelector和NodeAffinity是否与Node匹配; PodToleratesNodeTaints: 检查Pod的NoExecute和NoSchedule类型的Toleration是否与Node Taint匹配。 如果是没启用ScheduleDaemonSetPods FeatureGate,则Predicate逻辑如下。这里并没有真正的完成调度,只是做了几个predicate检查,最终的调度还是会交给DaemonSet Controller。 PodFitsResources:检查Node剩余可分配资源是否能满足Pod请求; PodFitsHost: 检查Pod.spec.nodeName非空时是否与Node Name匹配; PodFitsHostPorts: 检查DaemonSet Pods请求的协议&Host端口是否已经被占用; PodMatchNodeSelector: 检查Pod的NodeSelector和NodeAffinity是否与Node匹配; PodFitsHost:检查Pod.spec.nodeName非空时是否与Node Name匹配; PodFitsHostPorts:检查DaemonSet Pods请求的协议&Host端口是否已经被占用; PodMatchNodeSelector: 检查Pod的NodeSelector和NodeAffinity是否与Node匹配; PodToleratesNodeTaints:检查Pod的NoExecute和NoSchedule类型的Toleration是否与Node Taint匹配。 如果是Critical DaemonSet Pod,则再进行EssentialPredicates,包括: 如果不是Critical DaemonSet Pod,则再进行GeneralPredicates, Sync Nodes 前面通过podsShouldBeOnNode得到了nodesNeedingDaemonPods []string, podsToDelete []string, failedPodsObserved int,接下来就该去创建和删除对应的Pods了。 // syncNodes deletes given pods and creates new daemon set pods on the given nodes// returns slice with erros if anyfunc (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { // We need to set expectations before creating/deleting pods to avoid race conditions. dsKey, err := controller.KeyFunc(ds) if err != nil { return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } createDiff := len(nodesNeedingDaemonPods) deleteDiff := len(podsToDelete) if createDiff >

Dsc.burstReplicas {createDiff = dsc.burstReplicas} if deleteDiff > dsc.burstReplicas {deleteDiff = dsc.burstReplicas} dsc.expectations.SetExpectations (dsKey, createDiff, deleteDiff) / / error channel to communicate back failures. Make the buffer big enough to avoid any blocking errCh: = make (chan error, createDiff+deleteDiff) klog.V (4). Infof ("Nodes needing daemon pods for daemon set% s:% + v, creating% d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait: = sync.WaitGroup {} / / If the returned error is not nil we have a parse error. / / The controller handles this via the hash. Generation, err: = util.GetTemplateGeneration (ds) if err! = nil {generation = nil} template: = util.CreatePodTemplate (ds.Namespace, ds.Spec.Template, generation, hash) / / Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize / / and double with each successful iteration in a kind of "slow start". / / This handles attempts to start large numbers of pods that would / / likely all fail with the same error. For example a project with a / / low quota that attempts to create a large number of pods will be / / prevented from spamming the API service with the pod create requests / / after one of its pods fails. Conveniently, this also prevents the / / event spam that those failures would generate. BatchSize: = integer.IntMin (createDiff, controller.SlowStartInitialBatchSize) for pos: = 0; createDiff > pos; batchSize, pos = integer.IntMin (2*batchSize, createDiff- (pos+batchSize)), pos+batchSize {errorCount: = len (errCh) createWait.Add (batchSize) for I: = pos; I < pos+batchSize ITunes + {go func (ix int) {defer createWait.Done () var err error podTemplate: = & template if utilfeature.DefaultFeatureGate.Enabled (features.ScheduleDaemonSetPods) { PodTemplate = template.DeepCopy () / / The pod's NodeAffinity will be updated to make sure the Pod is bound / / to the target node by default scheduler. It is safe to do so because there / / should be no conflicting node affinity with the target node. PodTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity (podTemplate.Spec.Affinity, nodesNeedingDaemonPods [ix]) err = dsc.podControl.CreatePodsWithControllerRef (ds.Namespace, podTemplate, ds, metav1.NewControllerRef (ds) ControllerKind)} else {err = dsc.podControl.CreatePodsOnNode (nodesNeedingDaemonPods [ix], ds.Namespace, podTemplate, ds, metav1.NewControllerRef (ds) ControllerKind)} if err! = nil & & errors.IsTimeout (err) {/ / Pod is created but its initialization has timed out. / / If the initialization is successful eventually, the / / controller will observe the creation via the informer. / / If the initialization fails, or if the pod keeps / / uninitialized for a long time, the informer will not / / receive any update, and the controller will create a new / / pod when the expectation expires. Return} if err! = nil {klog.V (2). Infof ("Failed creation, decrementing expectations for set% Q amp% Q", ds.Namespace Ds.Name) dsc.expectations.CreationObserved (dsKey) errCh = maxUnavailable {klog.V (4) .Infof ("Number of unavailable DaemonSet pods:% d, is equal to or exceeds allowed maximum:% d", numUnavailable MaxUnavailable) break} klog.V (4). Infof ("Marking pod% s for deletion", ds.Name, pod.Name) oldPodsToDelete = append (oldPodsToDelete, pod.Name) numUnavailable++} return dsc.syncNodes (ds, oldPodsToDelete, [] string {}, hash)}

Select all OldPods based on the latest hash value

Calculate the sum of those! available and those Pods that are expected to be scheduled but not yet running as numUnavailable.

Divide the OldPods into oldAvailablePods and oldUnavailablePods, and add the oldUnavailablePods with empty DeletionTimestamp to the Pods list to be deleted (oldPodsToDelete).

Traverse the oldAvailablePods and add it to the oldPodsToDelete one by one until the numUnavailable reaches maxUnavailable. The maximum number of Pods added from oldAvailablePods to oldPodsToDelete is (maxUnavailable-1).

Therefore, oldPodsToDelete includes all oldUnavailablePods with empty DeletionTimestamp and up to (maxUnavailable-1) oldAvailablePods.

Finally, call syncNodes to start deleting the DaemonSet Pods in the oldPodsToDelete.

Node update

The Node Add event is simple, traversing all the DaemonSets objects and calling nodeShouldRunDaemonPod to figure out whether each DaemonSet should be started on that Node. If you want to start, add DaemonSet to Queue and let syncDaemonSet handle it.

For the Node Update event, you need to determine the fields of the Update, and then decide whether to join the Queue for syncDaemonSet according to the situation.

Func (dsc * DaemonSetsController) updateNode (old, cur interface {}) {oldNode: = old. (* v1.Node) curNode: = cur. (* v1.Node) if shouldIgnoreNodeUpdate (* oldNode, * curNode) {return} dsList Err: = dsc.dsLister.List (labels.Everything ()) if err! = nil {klog.V (4). Infof ("Error listing daemon sets:% v", err) return} / / TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). For _, ds: = range dsList {_, oldShouldSchedule, oldShouldContinueRunning, err: = dsc.nodeShouldRunDaemonPod (oldNode, ds) if err! = nil {continue} _, currentShouldSchedule, currentShouldContinueRunning, err: = dsc.nodeShouldRunDaemonPod (curNode Ds) if err! = nil {continue} if (oldShouldSchedule! = currentShouldSchedule) | | (oldShouldContinueRunning! = currentShouldContinueRunning) {dsc.enqueueDaemonSet (ds)}

If the Node Condition has not changed, the Node change event cannot be ignored.

In addition to Node Condition and ResourceVersion, this change event cannot be ignored if the old and new Node objects are inconsistent.

For changes that cannot be ignored, call nodeShouldRunDaemonPod for oldNode,currentNode to determine whether ShouldSchedule and ShouldContinueRunning are consistent, and queue the DaemonSet Object into syncDaemonSet for processing as long as the ShouldSchedule or ShouldContinueRunning changes.

DaemonSet Controller principal logic

At this point, I believe you have a deeper understanding of "how to create and start DaemonSet Controller". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report