In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "what is the method of Preemption preemptive scheduling". The content of the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the method of Preemption preemptive scheduling".
Changes in ScheduleAlgorithm
In Kubernetes 1.8, the definition of ScheduleAlgorithm Interface has been changed to include a Preempt (...). Therefore, in my blog Kubernetes Scheduler principle analysis (which was based on kubernetes 1.5 at that time), I summarized the scheduler scheduling process: "one by one, the Pods with empty PodSpec.NodeName is selected as the Destination of the Pod through two steps: pre-selection (Predicates) and optimization (Priorities)." Will no longer be accurate.
Now it should be accurate to describe the Pods with empty PodSpec.NodeName one by one, through the two steps of pre-selection (Predicates) and optimization (Priorities), select the most suitable Node as the Destination of the Pod. If the suitable node is still not found after pre-selection and optimization, and the Pod Priority is started, then the Pod will carry out Preempt preemptive scheduling to find the most suitable node and the Pods that requires Evict. "
/ / ScheduleAlgorithm is an interface implemented by things that know how to schedule pods// onto machines.type ScheduleAlgorithm interface {Schedule (* v1.Pod, NodeLister) (selectedMachine string, err error) / / Preempt receives scheduling errors for a pod and tries to create room for / / the pod by preempting lower priority pods if possible / / It returns the node where preemption happened, a list of preempted pods, and error if any. Preempt (* v1.Pod, NodeLister, error) (selectedNode * v1.Node, preemptedPods [] * v1.Pod, err error) / / Predicates () returns a pointer to a map of predicate functions. This is / / exposed for testing. Predicates () map [string] FitPredicate / / Prioritizers returns a slice of priority config. This is exposed for / / testing. Prioritizers () [] PriorityConfig}
Scheduler.scheduleOne starts the real scheduling logic and is responsible for scheduling one Pod at a time. The logic is as follows:
Get a Pod from PodQueue.
The Schedule corresponding to the Algorithm is executed, and pre-selection and optimization are carried out.
AssumePod
Bind Pod, if Bind Failed,ForgetPod.
In 1.8, when the pre-selection and preferred scheduling do not find a suitable node (in fact, it must be that the pre-selection did not find the nodes, preferably just pick the better one), sched.preempt will also be called for preemptive scheduling.
Plugin/pkg/scheduler/scheduler.go:293func (sched * Scheduler) scheduleOne () {pod: = sched.config.NextPod () if pod.DeletionTimestamp! = nil {sched.config.Recorder.Eventf (pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod:% vpod% v", pod.Namespace, pod.Name) glog.V (3). Infof ("Skip schedule deleting pod:% vAcer% v", pod.Namespace Pod.Name) return} glog.V (3). Infof ("Attempting to schedule pod:% v pod.Name% v", pod.Namespace, pod.Name) / / Synchronously attempt to find a fit for the pod. Start: = time.Now () suggestedHost, err: = sched.schedule (pod) metrics.SchedulingAlgorithmLatency.Observe (metrics.SinceInMicroseconds (start)) if err! = nil {/ / schedule () may have failed because the pod would not fit on any host, so we try to / / preempt, with the expectation that the next time the pod is tried for scheduling it / / will fit due to the preemption. It is also possible that a different pod will schedule / / into the resources that were preempted, but this is harmless. If fitError, ok: = err. (* core.FitError); ok {sched.preempt (pod, fitError)} return} / / Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. / / This allows us to keep scheduling without waiting on binding to occur. AssumedPod: = * pod / / assume modifies `assumedPod` by setting NodeName=suggestedHost err = sched.assume (& assumedPod, suggestedHost) if err! = nil {return} / / bind the pod to its host asynchronously (we can do this bhand c of the assumption step above). Go func () {err: = sched.bind (& assumedPod, & v1.Binding {ObjectMeta: metav1.ObjectMeta {Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference {Kind: "Node", Name: suggestedHost },}) metrics.E2eSchedulingLatency.Observe (metrics.SinceInMicroseconds (start)) if err! = nil {glog.Errorf ("Internal error binding pod: (% v)", err)}} ()} Scheduler.preemt
Well, I won't read too much about pre-selection and selection here, because the whole source logic is the same as 1.5, except that 1.8 adds more Predicate and Priority Policys and their implementation. Let's just look at the code for preemptive scheduling Preempt.
Plugin/pkg/scheduler/scheduler.go:191func (sched * Scheduler) preempt (preemptor * v1.Pod, scheduleErr error) (string, error) {if! utilfeature.DefaultFeatureGate.Enabled (features.PodPriority) {glog.V (3) .Infof ("Pod priority feature is not enabled. No preemption is performed.") Return "", nil} preemptor, err: = sched.config.PodPreemptor.GetUpdatedPod (preemptor) if err! = nil {glog.Errorf ("Error getting the updated preemptor pod object:% v", err) return "", err} node, victims, err: = sched.config.Algorithm.Preempt (preemptor, sched.config.NodeLister ScheduleErr) if err! = nil {glog.Errorf ("Error preempting victims to make room for% v return% v.", preemptor.Namespace, preemptor.Name) return ", err} if node = = nil {return", err} glog.Infof ("Preempting% d pod (s) on node% v to make room for% v.", len (victims) Node.Name, preemptor.Namespace, preemptor.Name) annotations: = map [string] string {core.NominatedNodeAnnotationKey: node.Name} err = sched.config.PodPreemptor.UpdatePodAnnotations (preemptor, annotations) if err! = nil {glog.Errorf ("Error in preemption process. Cannot update pod% v annotations:% v ", preemptor.Name, err) return", err} for _, victim: = range victims {if err: = sched.config.PodPreemptor.DeletePod (victim) Err! = nil {glog.Errorf ("Error preempting pod% v return% v", victim.Namespace, victim.Name, err) return "", err} sched.config.Recorder.Eventf (victim, v1.EventTypeNormal, "Preempted", "by% v v1.EventTypeNormal% v on node% v", preemptor.Namespace, preemptor.Name Node.Name)} return node.Name, err}
Check whether PodPriority is enabled in FeaturesGate. If not, no subsequent Preemption operations will be performed.
After the Predicate/Priortiy scheduling process fails, the Pod will update the PodCondition to record the scheduling failure status and the reason for the failure. Therefore, you need to get the updated Pod Object from apiserver.
Call ScheduleAlgorithm.Preempt for preemptive scheduling to select the best node and waiting preempt pods (called victims)
Call apiserver to Annotation:NominatedNodeName=nodeName the pod (called Preemptor)
Traverse the victims and call apiserver to delete these pods one by one
Note: when scheduler fails to call shed.schedule (pod) for pre-selection and preferred scheduling, Pod Bind Node fails, and the Pod will requeue unscheduled Cache podqueue. If a new pod is added to the queue to be scheduled during this pod scheduling process, there will be other pod in front of the pod requeue. The next scheduling is to schedule the pod in front of it first, and the scheduling of these pod may be scheduled to the Node that has just released resources through Preempt. Causes the resource just released by Preemptor to be consumed. When it's time for the last Preemptor scheduling, you may need to trigger the Preempt of a node again.
GenericScheduler.Preempt
ScheduleAlgorithm.Preempt is the key implementation of preemptive scheduling, and its corresponding implementation is in genericScheduler:
Plugin/pkg/scheduler/core/generic_scheduler.go:181// preempt finds nodes with pods that can be preempted to make room for "pod" to// schedule. It chooses one of the nodes and preempts the pods on the node and// returns the node and the list of preempted pods if such a node is found.// TODO (bsalamat): Add priority-based scheduling. More info: today one or more// pending pods (different from the pod that triggered the preemption (s)) may// schedule into some portion of the resources freed up by the preemption (s) / / before the pod that triggered the preemption (s) has a chance to schedule// there, thereby preventing the pod that triggered the preemption (s) from// scheduling. Solution is given at:// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanicsfunc (g * genericScheduler) Preempt (pod * v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (* v1.Node, [] * v1.Pod, error) {/ / Scheduler may return various types of errors. Consider preemption only if / / the error is of type FitError. FitError, ok: = scheduleErr. (* FitError) if! ok | | fitError = = nil {return nil, nil, nil} err: = g.cache.UpdateNodeNameToInfoMap (g.cachedNodeInfoMap) if err! = nil {return nil, nil, err} if! podEligibleToPreemptOthers (pod G.cachedNodeInfoMap) {glog.V (5) .Infof ("Pod% v is not eligible for more preemption.", pod.Name) return nil, nil, nil} allNodes, err: = nodeLister.List () if err! = nil {return nil, nil, err} if len (allNodes) = 0 {return nil, nil ErrNoNodesAvailable} potentialNodes: = nodesWherePreemptionMightHelp (pod, allNodes, fitError.FailedPredicates) if len (potentialNodes) = 0 {glog.V (3). Infof ("Preemption will not help schedule pod% v on any node.", pod.Name) return nil, nil, nil} nodeToPods, err: = selectNodesForPreemption (pod, g.cachedNodeInfoMap, potentialNodes, g.predicates) G.predicateMetaProducer) if err! = nil {return nil, nil, err} for len (nodeToPods) > 0 {node: = pickOneNodeForPreemption (nodeToPods) if node = = nil {return nil, nil, err} passes, pErr: = nodePassesExtendersForPreemption (pod, node.Name, nodeToPods [node], g.cachedNodeInfoMap G.extenders) if passes & & pErr = = nil {return node, nodeToPods [node], err} if pErr! = nil {glog.Errorf ("Error occurred while checking extenders for preemption on node% v:% v", node PErr)} / / Remove the node from the map and try to pick a different node. Delete (nodeToPods, node)} return nil, nil, err} sched.schedule error check
Subsequent Preemption will be triggered only if the error returned by the previous sched.schedule () is of type FitError. FitError means that pod fails some PredicateFunc filtering in the Predicate phase. In other words, preemptive scheduling will be carried out only if the Pod fails in pre-selection.
Update NodeInfo in scheduler cache
To update NodeInfo in scheduler cache, mainly to update scheduled and Assumed Pods on Node, as a consideration for subsequent Preempt Pods, to ensure that Preemption is correct.
PodEligibleToPreemptOthers checks whether pod is qualified for preemptive scheduling
Invoke podEligibleToPreemptOthers to determine whether the pod is suitable for subsequent Preemption. The judgment logic is:
If the Pod already contains Annotation:NominatedNodeName=nodeName (indicating that the pod has been Preempted before), and the Node in the Annotation is Terminating with a lower priority than the pod, then the pod is considered not suitable for subsequent Preemption, and the process ends.
In addition, continue the follow-up process.
The corresponding code is as follows:
Plugin/pkg/scheduler/core/generic_scheduler.go:756func podEligibleToPreemptOthers (pod * v1.Pod, nodeNameToInfo map [string] * schedulercache.NodeInfo) bool {if nodeName, found: = pod.Annotations [NominatedNodeAnnotationKey]; found {if nodeInfo, found: = nodeNameToInfo [nodeName]; found {for _, p: = range nodeInfo.Pods () {if p.DeletionTimestamp! = nil & & util.GetPodPriority (p)
< util.GetPodPriority(pod) { // There is a terminating pod on the nominated node. return false } } } } return true} nodesWherePreemptionMightHelp筛选出Potential Nodes invoke nodesWherePreemptionMightHelp来获取potential nodes。nodesWherePreemptionMightHelp的逻辑是: NodeSelectorNotMatch, PodNotMatchHostName, TaintsTolerationsNotMatch, NodeLabelPresenceViolated, NodeNotReady, NodeNetworkUnavailable, NodeUnschedulable, NodeUnknownCondition 遍历所有的nodes,对每个nodes在sched.schedule()在预选阶段失败的Predicate策略(failedPredicates)进行扫描,如果failedPredicates包含以下Policy,则说明该node不适合作为Preempt的备选节点。 除此之外的Node均作为Potential Nodes。 对应代码如下: func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node { potentialNodes := []*v1.Node{} for _, node := range nodes { unresolvableReasonExist := false failedPredicates, found := failedPredicatesMap[node.Name] // If we assume that scheduler looks at all nodes and populates the failedPredicateMap // (which is the case today), the !found case should never happen, but we'd prefer // to rely less on such assumptions in the code when checking does not impose // significant overhead. for _, failedPredicate := range failedPredicates { switch failedPredicate { case predicates.ErrNodeSelectorNotMatch, predicates.ErrPodNotMatchHostName, predicates.ErrTaintsTolerationsNotMatch, predicates.ErrNodeLabelPresenceViolated, predicates.ErrNodeNotReady, predicates.ErrNodeNetworkUnavailable, predicates.ErrNodeUnschedulable, predicates.ErrNodeUnknownCondition: unresolvableReasonExist = true break // TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors. } } if !found || !unresolvableReasonExist { glog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) potentialNodes = append(potentialNodes, node) } } return potentialNodes} selectNodesForPreemption和selectVictimsOnNode选出可行Nodes及其对应的victims invoke selectNodesForPreemption从Potential Nodes中找出所有可行的Nodes及对应的victim Pods,其对应的逻辑如为:启动max(16, potentialNodesNum)个worker(对应goruntine)通过WaitGroups并发等待所有node的check完成: 遍历该node上所有的scheduled pods(包括assumed pods),将优先级比Preemptor更低的Pods都加入到Potential victims List中,并且将这些victims从NodeInfoCopy中删除,下次进行Predicate时就意味着Node上有更多资源可用。 对Potential victims中元素进行排序,排序规则是按照优先级从高到底排序的,index为0的对应的优先级最高。 检查Preemptor是否能scheduler配置的所有Predicates Policy(基于前面将这些victims从NodeInfoCopy中删除,将所有更低优先级的pods资源全部释放了),如果不通过则返回,表示该node不合适。All Predicate通过后,继续下面流程。 遍历所有的Potential victims list item(已经按照优先级从高到底排序),试着把Potential victims中第一个Pod(优先级最高)加回到NodeInfoCopy中,再检查Preemptor是否能scheduler配置的所有Predicates Policy,如果不满足就把该pod再从NodeInfoCopy中删除,并且正式加入到victims list中。接着对Potential victims中第2,3...个Pod进行同样处理。这样做,是为了保证尽量保留优先级更高的Pods,尽量删除更少的Pods。 最终返回每个可行node及其对应victims list。 selectNodesForPreemption代码如下,其实核心代码在selectVictimsOnNode。 plugin/pkg/scheduler/core/generic_scheduler.go:583func selectNodesForPreemption(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, potentialNodes []*v1.Node, predicates map[string]algorithm.FitPredicate, metadataProducer algorithm.PredicateMetadataProducer,) (map[*v1.Node][]*v1.Pod, error) { nodeNameToPods := map[*v1.Node][]*v1.Pod{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := potentialNodes[i].Name var metaCopy algorithm.PredicateMetadata if meta != nil { metaCopy = meta.ShallowCopy() } pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates) if fits { resultLock.Lock() nodeNameToPods[potentialNodes[i]] = pods resultLock.Unlock() } } workqueue.Parallelize(16, len(potentialNodes), checkNode) return nodeNameToPods, nil} plugin/pkg/scheduler/core/generic_scheduler.go:659func selectVictimsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) { potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} nodeInfoCopy := nodeInfo.Clone() removePod := func(rp *v1.Pod) { nodeInfoCopy.RemovePod(rp) if meta != nil { meta.RemovePod(rp) } } addPod := func(ap *v1.Pod) { nodeInfoCopy.AddPod(ap) if meta != nil { meta.AddPod(ap, nodeInfoCopy) } } // As the first step, remove all the lower priority pods from the node and // check if the given pod can be scheduled. podPriority := util.GetPodPriority(pod) for _, p := range nodeInfoCopy.Pods() { if util.GetPodPriority(p) < podPriority { potentialVictims.Items = append(potentialVictims.Items, p) removePod(p) } } potentialVictims.Sort() // If the new pod does not fit after removing all the lower priority pods, // we are almost done and this node is not suitable for preemption. The only condition // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } return nil, false } victims := []*v1.Pod{} // Try to reprieve as many pods as possible starting from the highest priority one. for _, p := range potentialVictims.Items { lpp := p.(*v1.Pod) addPod(lpp) if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { removePod(lpp) victims = append(victims, lpp) glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name) } } return victims, true} pickOneNodeForPreemption从可行Nodes中找出最合适的一个Node 如果上一步至少找到一个可行node,则调用pickOneNodeForPreemption按照以下逻辑选择一个最合适的node: 选择victims中最高pod优先级最低的那个Node。 如果上一步有不止一个Nodes满足条件,则再对选择所有victims优先级之和最小的那个Node。 如果上一步有不止一个Nodes满足条件,则再选择victims pod数最少的Node。 如果上一步有不止一个Nodes满足条件,则再随机选择一个Node。 以上每一步的Nodes列表,都是基于上一步筛选后的Nodes。 plugin/pkg/scheduler/core/generic_scheduler.go:501func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node { type nodeScore struct { node *v1.Node highestPriority int32 sumPriorities int64 numPods int } if len(nodesToPods) == 0 { return nil } minHighestPriority := int32(math.MaxInt32) minPriorityScores := []*nodeScore{} for node, pods := range nodesToPods { if len(pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. return node } // highestPodPriority is the highest priority among the victims on this node. highestPodPriority := util.GetPodPriority(pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority minPriorityScores = nil } if highestPodPriority == minHighestPriority { minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)}) } } if len(minPriorityScores) == 1 { return minPriorityScores[0].node } // There are a few nodes with minimum highest priority victim. Find the // smallest sum of priorities. minSumPriorities := int64(math.MaxInt64) minSumPriorityScores := []*nodeScore{} for _, nodeScore := range minPriorityScores { var sumPriorities int64 for _, pod := range nodesToPods[nodeScore.node] { // We add MaxInt32+1 to all priorities to make all of them >= 0. This is / / needed so that a node with a few pods with negative priority is not / / picked over a node with a smaller number of pods with the same negative / / priority (and similar scenarios) SumPriorities + = int64 (util.GetPodPriority (pod)) + int64 (math.MaxInt32+1)} if sumPriorities
< minSumPriorities { minSumPriorities = sumPriorities minSumPriorityScores = nil } nodeScore.sumPriorities = sumPriorities if sumPriorities == minSumPriorities { minSumPriorityScores = append(minSumPriorityScores, nodeScore) } } if len(minSumPriorityScores) == 1 { return minSumPriorityScores[0].node } // There are a few nodes with minimum highest priority victim and sum of priorities. // Find one with the minimum number of pods. minNumPods := math.MaxInt32 minNumPodScores := []*nodeScore{} for _, nodeScore := range minSumPriorityScores { if nodeScore.numPods < minNumPods { minNumPods = nodeScore.numPods minNumPodScores = nil } if nodeScore.numPods == minNumPods { minNumPodScores = append(minNumPodScores, nodeScore) } } // At this point, even if there are more than one node with the same score, // return the first one. if len(minNumPodScores) >0 {return minNumPodScores [0] .node} glog.Errorf ("Error in logic of node scoring for preemption. We should never reach here!") Return nil} the most suitable Node still needs to be checked by extender (if configed)
If scheduler configures extender scheduler, you also need to send the pod and (hypothetically) the node that removes the victims to extender.Filter for inspection again through invoke nodePassesExtendersForPreemption, and return the node as the final selected Preempt node only if the check is passed.
For understanding of extender, please refer to how to carry out secondary development of kubernetes scheduler and Kubernetes Scheduler source code analysis. There are not many practical scenarios, and now that custom schedulers are supported, there is less need to use scheduler extender.
Thank you for your reading, the above is the content of "what is the method of Preemption preemptive scheduling". After the study of this article, I believe you have a deeper understanding of what the method of Preemption preemptive scheduling is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.