@@ -25,12 +25,16 @@ import (
2525
2626 v1 "k8s.io/api/core/v1"
2727 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28+ "k8s.io/apimachinery/pkg/util/sets"
2829 utilfeature "k8s.io/apiserver/pkg/util/feature"
2930 corev1helpers "k8s.io/component-helpers/scheduling/corev1"
3031 "k8s.io/klog/v2"
32+ extenderv1 "k8s.io/kube-scheduler/extender/v1"
3133 fwk "k8s.io/kube-scheduler/framework"
3234 "k8s.io/kubernetes/pkg/features"
3335 "k8s.io/kubernetes/pkg/scheduler/framework"
36+ "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
37+ "k8s.io/kubernetes/pkg/scheduler/framework/preemption"
3438 "k8s.io/kubernetes/pkg/scheduler/metrics"
3539 "k8s.io/utils/ptr"
3640)
@@ -217,13 +221,63 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
217221 return
218222 }
219223
220- result := sched .podGroupSchedulingAlgorithm (podGroupCycleCtx , schedFwk , podGroupInfo )
224+ postFilterMode := runAllPostFilter
225+ if utilfeature .DefaultFeatureGate .Enabled (features .WorkloadAwarePreemption ) {
226+ postFilterMode = runWithoutDefaultPreemption
227+ }
228+
229+ result := sched .podGroupSchedulingDefaultAlgorithm (podGroupCycleCtx , schedFwk , podGroupInfo , postFilterMode )
221230 metrics .PodGroupSchedulingAlgorithmLatency .Observe (metrics .SinceInSeconds (start ))
222231
232+ // Run workload aware preemption if required. If the preemption is successful,
233+ // we need to put the victims back into the queue.
234+ if result .status == podGroupRequiresWorkloadAwarePreemption {
235+ status := sched .workloadAwarePreemption (podGroupCycleCtx , result , schedFwk , podGroupInfo )
236+ if status .IsSuccess () {
237+ result .status = podGroupWaitingOnPreemption
238+ } else {
239+ result .status = podGroupUnschedulable
240+ }
241+ }
242+
223243 // submitPodGroupAlgorithmResult can dispatch binding goroutines, so should be called with the noncancelable ctx.
224244 sched .submitPodGroupAlgorithmResult (ctx , schedFwk , podGroupInfo , result , start )
225245}
226246
247+ func (sched * Scheduler ) workloadAwarePreemption (ctx context.Context , schedRes podGroupAlgorithmResult , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo ) * fwk.Status {
248+ revertFn := sched .nodeInfoSnapshot .SaveSnapshot ()
249+ defer revertFn ()
250+ executor := preemption .NewWorkloadEvaluator (fmt .Sprintf ("%s-preeemption" , podGroupInfo .GetName ()), schedFwk , func (ctx context.Context ) * fwk.Status {
251+ res := sched .podGroupSchedulingDefaultAlgorithm (ctx , schedFwk , podGroupInfo , runWithoutPostFilter )
252+ if res .status == podGroupFeasible {
253+ return fwk .NewStatus (fwk .Success )
254+ }
255+ return fwk .NewStatus (fwk .Unschedulable , "pod group is not schedulable" )
256+ })
257+
258+ cycleStates := make ([]fwk.CycleState , len (schedRes .podResults ))
259+ for i , podResult := range schedRes .podResults {
260+ cycleStates [i ] = podResult .podCtx .state
261+ }
262+
263+ pg , err := schedFwk .SharedInformerFactory ().Scheduling ().V1alpha2 ().PodGroups ().Lister ().PodGroups (podGroupInfo .Namespace ).Get (podGroupInfo .Name )
264+ if err != nil {
265+ return fwk .AsStatus (fmt .Errorf ("failed to get pod group %s/%s" , podGroupInfo .Namespace , podGroupInfo .Name ))
266+ }
267+
268+ status , victims := executor .Preempt (ctx , pg , cycleStates , podGroupInfo .UnscheduledPods )
269+
270+ if ! status .IsSuccess () {
271+ return status
272+ }
273+
274+ v := & extenderv1.Victims {
275+ Pods : victims ,
276+ }
277+
278+ return schedFwk .PreemptionExecutor ().ActuatePodGroupPreemption (ctx , v , podGroupInfo .UnscheduledPods , pg , "workload-preemption" )
279+ }
280+
227281// algorithmResult stores the scheduling result and status for a scheduling attempt of a single pod.
228282type algorithmResult struct {
229283 // scheduleResult is a scheduling algorithm result.
@@ -257,6 +311,24 @@ const (
257311 // waiting for resources to be released.
258312 // Should be set when the pod group would be feasible, but any member pod requires preemption.
259313 podGroupWaitingOnPreemption podGroupAlgorithmStatus = "waiting_on_preemption"
314+ // podGroupRequiresWorkloadAwarePreemption means that the pod group requires workload aware preemption
315+ // Should be set when the pod group is not feasible, but workload aware preemption is enabled.
316+ podGroupRequiresWorkloadAwarePreemption podGroupAlgorithmStatus = "requires_workload_aware_preemption"
317+ )
318+
319+ type podGroupPostFilterMode int
320+
321+ const (
322+ // The pod group algorithm should try to run default post filter in pod by pod cycle.
323+ runAllPostFilter podGroupPostFilterMode = iota
324+ // The pod group algorithm should not try post filter at all. This is can be used
325+ // by workload aware preemption that tries to check if after removing some
326+ // pods the pod group can be scheduled.
327+ runWithoutPostFilter
328+ // The pod group algorithm should run post filter without running default
329+ // preemption. This mode is expected when workload aware preemption
330+ // is enabled.
331+ runWithoutDefaultPreemption
260332)
261333
262334// podGroupAlgorithmResult stores the scheduling pod scheduling results for a pod group
@@ -272,7 +344,7 @@ type podGroupAlgorithmResult struct {
272344// It tries to schedule each pod using standard filtering and scoring logic in a fixed order.
273345// If a pod requires preemption to be schedulable, subsequent pods in the algorithm
274346// treat that pod as already scheduled on that node with victims being already removed in memory.
275- func (sched * Scheduler ) podGroupSchedulingDefaultAlgorithm (ctx context.Context , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo ) podGroupAlgorithmResult {
347+ func (sched * Scheduler ) podGroupSchedulingDefaultAlgorithm (ctx context.Context , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo , postFilterMode podGroupPostFilterMode ) podGroupAlgorithmResult {
276348 result := podGroupAlgorithmResult {
277349 podResults : make ([]algorithmResult , 0 , len (podGroupInfo .QueuedPodInfos )),
278350 status : podGroupUnschedulable ,
@@ -283,7 +355,7 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
283355
284356 requiresPreemption := false
285357 for _ , podInfo := range podGroupInfo .QueuedPodInfos {
286- podResult , revertFn := sched .podGroupPodSchedulingAlgorithm (ctx , schedFwk , podGroupInfo , podInfo )
358+ podResult , revertFn := sched .podGroupPodSchedulingAlgorithm (ctx , schedFwk , podGroupInfo , podInfo , postFilterMode )
287359 result .podResults = append (result .podResults , podResult )
288360 if ! podResult .status .IsSuccess () && ! podResult .requiresPreemption {
289361 // When a pod is not feasible and doesn't require preemption, it means that it failed scheduling.
@@ -306,12 +378,17 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
306378 }
307379 }
308380
381+ // If the pod group is unschedulable and workload aware preemption is enabled, we need to run workload aware preemption.
382+ if result .status == podGroupUnschedulable && utilfeature .DefaultFeatureGate .Enabled (features .WorkloadAwarePreemption ) {
383+ result .status = podGroupRequiresWorkloadAwarePreemption
384+ }
385+
309386 return result
310387}
311388
312389// podGroupPodSchedulingAlgorithm runs a scheduling algorithm for individual pod from a pod group.
313390// It returns the algorithm result and, if successful or the preemption is required, the permit status together with the revert function.
314- func (sched * Scheduler ) podGroupPodSchedulingAlgorithm (ctx context.Context , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo , podInfo * framework.QueuedPodInfo ) (algorithmResult , func ()) {
391+ func (sched * Scheduler ) podGroupPodSchedulingAlgorithm (ctx context.Context , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo , podInfo * framework.QueuedPodInfo , postFilterMode podGroupPostFilterMode ) (algorithmResult , func ()) {
315392 pod := podInfo .Pod
316393 podCtx := sched .initPodSchedulingContext (ctx , pod )
317394 logger := podCtx .logger
@@ -320,6 +397,17 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
320397
321398 logger .V (4 ).Info ("Attempting to schedule a pod belonging to a pod group" , "podGroup" , klog .KObj (podGroupInfo ), "pod" , klog .KObj (pod ))
322399
400+ switch postFilterMode {
401+ case runWithoutPostFilter :
402+ podCtx .state .SetSkipAllPostFilterPlugins (true )
403+ case runWithoutDefaultPreemption :
404+ skipPostFilterPlugins := podCtx .state .GetSkipPostFilterPlugins ()
405+ if skipPostFilterPlugins == nil {
406+ skipPostFilterPlugins = sets.Set [string ]{}
407+ }
408+ podCtx .state .SetSkipPostFilterPlugins (skipPostFilterPlugins .Insert (names .DefaultPreemption ))
409+ }
410+
323411 requiresPreemption := false
324412 scheduleResult , status := sched .schedulingAlgorithm (ctx , podCtx .state , schedFwk , podInfo , start )
325413 if ! status .IsSuccess () {
@@ -505,7 +593,7 @@ func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context
505593 if err != nil {
506594 return sched .podGroupAlgorithmFailure (ctx , podGroupInfo , fwk .AsStatus (err ))
507595 }
508- result := sched .podGroupSchedulingDefaultAlgorithm (ctx , schedFwk , podGroupInfo )
596+ result := sched .podGroupSchedulingDefaultAlgorithm (ctx , schedFwk , podGroupInfo , runAllPostFilter )
509597 sched .nodeInfoSnapshot .ForgetPlacement ()
510598
511599 results [i ] = placementResult {
@@ -543,10 +631,10 @@ func (sched *Scheduler) podGroupAlgorithmFailure(ctx context.Context, podGroupIn
543631}
544632
545633// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for each pod in the pod group.
546- func (sched * Scheduler ) podGroupSchedulingAlgorithm (ctx context.Context , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo ) podGroupAlgorithmResult {
634+ func (sched * Scheduler ) podGroupSchedulingAlgorithm (ctx context.Context , schedFwk framework.Framework , podGroupInfo * framework.QueuedPodGroupInfo , postFilterMode podGroupPostFilterMode ) podGroupAlgorithmResult {
547635 if utilfeature .DefaultFeatureGate .Enabled (features .TopologyAwareWorkloadScheduling ) {
548636 return sched .podGroupSchedulingPlacementAlgorithm (ctx , schedFwk , podGroupInfo )
549637 } else {
550- return sched .podGroupSchedulingDefaultAlgorithm (ctx , schedFwk , podGroupInfo )
638+ return sched .podGroupSchedulingDefaultAlgorithm (ctx , schedFwk , podGroupInfo , postFilterMode )
551639 }
552640}
0 commit comments