Skip to content

Commit b16287a

Browse files
committed
Wire workload preemption into pod group cycle
1 parent f14d532 commit b16287a

3 files changed

Lines changed: 196 additions & 18 deletions

File tree

pkg/scheduler/schedule_one_podgroup.go

Lines changed: 95 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@ import (
2525

2626
v1 "k8s.io/api/core/v1"
2727
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28+
"k8s.io/apimachinery/pkg/util/sets"
2829
utilfeature "k8s.io/apiserver/pkg/util/feature"
2930
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
3031
"k8s.io/klog/v2"
32+
extenderv1 "k8s.io/kube-scheduler/extender/v1"
3133
fwk "k8s.io/kube-scheduler/framework"
3234
"k8s.io/kubernetes/pkg/features"
3335
"k8s.io/kubernetes/pkg/scheduler/framework"
36+
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
37+
"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
3438
"k8s.io/kubernetes/pkg/scheduler/metrics"
3539
"k8s.io/utils/ptr"
3640
)
@@ -217,13 +221,63 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
217221
return
218222
}
219223

220-
result := sched.podGroupSchedulingAlgorithm(podGroupCycleCtx, schedFwk, podGroupInfo)
224+
postFilterMode := runAllPostFilter
225+
if utilfeature.DefaultFeatureGate.Enabled(features.WorkloadAwarePreemption) {
226+
postFilterMode = runWithoutDefaultPreemption
227+
}
228+
229+
result := sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupInfo, postFilterMode)
221230
metrics.PodGroupSchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
222231

232+
// Run workload aware preemption if required. If the preemption is successful,
233+
// we need to put the victims back into the queue.
234+
if result.status == podGroupRequiresWorkloadAwarePreemption {
235+
status := sched.workloadAwarePreemption(podGroupCycleCtx, result, schedFwk, podGroupInfo)
236+
if status.IsSuccess() {
237+
result.status = podGroupWaitingOnPreemption
238+
} else {
239+
result.status = podGroupUnschedulable
240+
}
241+
}
242+
223243
// submitPodGroupAlgorithmResult can dispatch binding goroutines, so should be called with the noncancelable ctx.
224244
sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupInfo, result, start)
225245
}
226246

247+
func (sched *Scheduler) workloadAwarePreemption(ctx context.Context, schedRes podGroupAlgorithmResult, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status {
248+
revertFn := sched.nodeInfoSnapshot.SaveSnapshot()
249+
defer revertFn()
250+
executor := preemption.NewWorkloadEvaluator(fmt.Sprintf("%s-preeemption", podGroupInfo.GetName()), schedFwk, func(ctx context.Context) *fwk.Status {
251+
res := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo, runWithoutPostFilter)
252+
if res.status == podGroupFeasible {
253+
return fwk.NewStatus(fwk.Success)
254+
}
255+
return fwk.NewStatus(fwk.Unschedulable, "pod group is not schedulable")
256+
})
257+
258+
cycleStates := make([]fwk.CycleState, len(schedRes.podResults))
259+
for i, podResult := range schedRes.podResults {
260+
cycleStates[i] = podResult.podCtx.state
261+
}
262+
263+
pg, err := schedFwk.SharedInformerFactory().Scheduling().V1alpha2().PodGroups().Lister().PodGroups(podGroupInfo.Namespace).Get(podGroupInfo.Name)
264+
if err != nil {
265+
return fwk.AsStatus(fmt.Errorf("failed to get pod group %s/%s", podGroupInfo.Namespace, podGroupInfo.Name))
266+
}
267+
268+
status, victims := executor.Preempt(ctx, pg, cycleStates, podGroupInfo.UnscheduledPods)
269+
270+
if !status.IsSuccess() {
271+
return status
272+
}
273+
274+
v := &extenderv1.Victims{
275+
Pods: victims,
276+
}
277+
278+
return schedFwk.PreemptionExecutor().ActuatePodGroupPreemption(ctx, v, podGroupInfo.UnscheduledPods, pg, "workload-preemption")
279+
}
280+
227281
// algorithmResult stores the scheduling result and status for a scheduling attempt of a single pod.
228282
type algorithmResult struct {
229283
// scheduleResult is a scheduling algorithm result.
@@ -257,6 +311,24 @@ const (
257311
// waiting for resources to be released.
258312
// Should be set when the pod group would be feasible, but any member pod requires preemption.
259313
podGroupWaitingOnPreemption podGroupAlgorithmStatus = "waiting_on_preemption"
314+
// podGroupRequiresWorkloadAwarePreemption means that the pod group requires workload aware preemption
315+
// Should be set when the pod group is not feasible, but workload aware preemption is enabled.
316+
podGroupRequiresWorkloadAwarePreemption podGroupAlgorithmStatus = "requires_workload_aware_preemption"
317+
)
318+
319+
type podGroupPostFilterMode int
320+
321+
const (
322+
// The pod group algorithm should try to run default post filter in pod by pod cycle.
323+
runAllPostFilter podGroupPostFilterMode = iota
324+
// The pod group algorithm should not try post filter at all. This is can be used
325+
// by workload aware preemption that tries to check if after removing some
326+
// pods the pod group can be scheduled.
327+
runWithoutPostFilter
328+
// The pod group algorithm should run post filter without running default
329+
// preemption. This mode is expected when workload aware preemption
330+
// is enabled.
331+
runWithoutDefaultPreemption
260332
)
261333

262334
// podGroupAlgorithmResult stores the scheduling pod scheduling results for a pod group
@@ -272,7 +344,7 @@ type podGroupAlgorithmResult struct {
272344
// It tries to schedule each pod using standard filtering and scoring logic in a fixed order.
273345
// If a pod requires preemption to be schedulable, subsequent pods in the algorithm
274346
// treat that pod as already scheduled on that node with victims being already removed in memory.
275-
func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) podGroupAlgorithmResult {
347+
func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
276348
result := podGroupAlgorithmResult{
277349
podResults: make([]algorithmResult, 0, len(podGroupInfo.QueuedPodInfos)),
278350
status: podGroupUnschedulable,
@@ -283,7 +355,7 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
283355

284356
requiresPreemption := false
285357
for _, podInfo := range podGroupInfo.QueuedPodInfos {
286-
podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, podInfo)
358+
podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupInfo, podInfo, postFilterMode)
287359
result.podResults = append(result.podResults, podResult)
288360
if !podResult.status.IsSuccess() && !podResult.requiresPreemption {
289361
// When a pod is not feasible and doesn't require preemption, it means that it failed scheduling.
@@ -306,12 +378,17 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
306378
}
307379
}
308380

381+
// If the pod group is unschedulable and workload aware preemption is enabled, we need to run workload aware preemption.
382+
if result.status == podGroupUnschedulable && utilfeature.DefaultFeatureGate.Enabled(features.WorkloadAwarePreemption) {
383+
result.status = podGroupRequiresWorkloadAwarePreemption
384+
}
385+
309386
return result
310387
}
311388

312389
// podGroupPodSchedulingAlgorithm runs a scheduling algorithm for individual pod from a pod group.
313390
// It returns the algorithm result and, if successful or the preemption is required, the permit status together with the revert function.
314-
func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo) (algorithmResult, func()) {
391+
func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) {
315392
pod := podInfo.Pod
316393
podCtx := sched.initPodSchedulingContext(ctx, pod)
317394
logger := podCtx.logger
@@ -320,6 +397,17 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
320397

321398
logger.V(4).Info("Attempting to schedule a pod belonging to a pod group", "podGroup", klog.KObj(podGroupInfo), "pod", klog.KObj(pod))
322399

400+
switch postFilterMode {
401+
case runWithoutPostFilter:
402+
podCtx.state.SetSkipAllPostFilterPlugins(true)
403+
case runWithoutDefaultPreemption:
404+
skipPostFilterPlugins := podCtx.state.GetSkipPostFilterPlugins()
405+
if skipPostFilterPlugins == nil {
406+
skipPostFilterPlugins = sets.Set[string]{}
407+
}
408+
podCtx.state.SetSkipPostFilterPlugins(skipPostFilterPlugins.Insert(names.DefaultPreemption))
409+
}
410+
323411
requiresPreemption := false
324412
scheduleResult, status := sched.schedulingAlgorithm(ctx, podCtx.state, schedFwk, podInfo, start)
325413
if !status.IsSuccess() {
@@ -505,7 +593,7 @@ func (sched *Scheduler) podGroupSchedulingPlacementAlgorithm(ctx context.Context
505593
if err != nil {
506594
return sched.podGroupAlgorithmFailure(ctx, podGroupInfo, fwk.AsStatus(err))
507595
}
508-
result := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo)
596+
result := sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo, runAllPostFilter)
509597
sched.nodeInfoSnapshot.ForgetPlacement()
510598

511599
results[i] = placementResult{
@@ -543,10 +631,10 @@ func (sched *Scheduler) podGroupAlgorithmFailure(ctx context.Context, podGroupIn
543631
}
544632

545633
// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for each pod in the pod group.
546-
func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo) podGroupAlgorithmResult {
634+
func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult {
547635
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareWorkloadScheduling) {
548636
return sched.podGroupSchedulingPlacementAlgorithm(ctx, schedFwk, podGroupInfo)
549637
} else {
550-
return sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo)
638+
return sched.podGroupSchedulingDefaultAlgorithm(ctx, schedFwk, podGroupInfo, postFilterMode)
551639
}
552640
}

pkg/scheduler/schedule_one_podgroup_test.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -408,12 +408,13 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
408408
}
409409

410410
tests := []struct {
411-
name string
412-
plugin *fakePodGroupPlugin
413-
expectedGroupStatus podGroupAlgorithmStatus
414-
expectedPodStatus map[string]*fwk.Status
415-
expectedPreemption map[string]bool
416-
skipForTAS bool
411+
name string
412+
plugin *fakePodGroupPlugin
413+
expectedGroupStatus podGroupAlgorithmStatus
414+
expectedPodStatus map[string]*fwk.Status
415+
expectedPreemption map[string]bool
416+
useWorkloadAwarePreemption bool
417+
skipForTAS bool
417418
}{
418419
{
419420
name: "All pods feasible",
@@ -680,6 +681,35 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
680681
"p3": fwk.NewStatus(fwk.Unschedulable),
681682
},
682683
},
684+
{
685+
name: "One pod require preemption, returning wait on preemption for workload aware preemption",
686+
plugin: &fakePodGroupPlugin{
687+
filterStatus: map[string]*fwk.Status{
688+
"p1": fwk.NewStatus(fwk.Unschedulable),
689+
"p2": nil,
690+
"p3": nil,
691+
},
692+
// This should be returned by postfilter plugins if we exclude preemption from it
693+
postFilterStatus: map[string]*fwk.Status{
694+
"p1": fwk.NewStatus(fwk.Unschedulable),
695+
},
696+
postFilterResult: map[string]*fwk.PostFilterResult{
697+
"p1": nil,
698+
},
699+
permitStatus: map[string]*fwk.Status{
700+
"p2": fwk.NewStatus(fwk.Wait),
701+
"p3": fwk.NewStatus(fwk.Wait),
702+
},
703+
},
704+
expectedGroupStatus: podGroupRequiresWorkloadAwarePreemption,
705+
expectedPodStatus: map[string]*fwk.Status{
706+
"p1": fwk.NewStatus(fwk.Unschedulable),
707+
"p2": nil,
708+
"p3": nil,
709+
},
710+
useWorkloadAwarePreemption: true,
711+
skipForTAS: true,
712+
},
683713
}
684714

685715
for _, tasEnabled := range []bool{true, false} {
@@ -744,7 +774,17 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
744774
t.Fatalf("Failed to update snapshot: %v", err)
745775
}
746776

747-
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, pgInfo)
777+
postFilterMode := runAllPostFilter
778+
if tt.useWorkloadAwarePreemption {
779+
postFilterMode = runWithoutDefaultPreemption
780+
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
781+
features.GenericWorkload: true,
782+
features.GangScheduling: true,
783+
features.WorkloadAwarePreemption: true,
784+
})
785+
}
786+
787+
result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, pgInfo, postFilterMode)
748788

749789
if result.status != tt.expectedGroupStatus {
750790
t.Errorf("Expected group status: %v, got: %v", tt.expectedGroupStatus, result.status)

test/integration/scheduler/podgroup/podgroup_test.go

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,15 @@ func TestPodGroupScheduling(t *testing.T) {
7373
lowPriorityBlockerPod := st.MakePod().Name("low-priority-blocker").Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").
7474
ZeroTerminationGracePeriod().Priority(10).Obj()
7575

76+
lowP1 := st.MakePod().Name("low-p1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").
77+
ZeroTerminationGracePeriod().Priority(10).Obj()
78+
lowP2 := st.MakePod().Name("low-p2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").
79+
ZeroTerminationGracePeriod().Priority(10).Obj()
80+
lowP3 := st.MakePod().Name("low-p3").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").
81+
ZeroTerminationGracePeriod().Priority(10).Obj()
82+
lowP4 := st.MakePod().Name("low-p4").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").
83+
ZeroTerminationGracePeriod().Priority(10).Obj()
84+
7685
otherP1 := st.MakePod().Name("other-p1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").
7786
PodGroupName("pg2").Priority(100).Obj()
7887
otherP2 := st.MakePod().Name("other-p2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").
@@ -95,12 +104,14 @@ func TestPodGroupScheduling(t *testing.T) {
95104
waitForPodsGatedOnPreEnqueue []string
96105
waitForPodsUnschedulable []string
97106
waitForPodsScheduled []string
107+
waitForPodsRemoved []string
98108
waitForAnyPodsScheduled *waitForAnyPodsScheduled
99109
}
100110

101111
tests := []struct {
102-
name string
103-
steps []step
112+
name string
113+
enableWorkloadAwarePreemption bool
114+
steps []step
104115
}{
105116
{
106117
name: "gang schedules when pod group and resources are available",
@@ -384,13 +395,44 @@ func TestPodGroupScheduling(t *testing.T) {
384395
},
385396
},
386397
},
398+
{
399+
name: "gang schedules with workload-aware preemption",
400+
enableWorkloadAwarePreemption: true,
401+
steps: []step{
402+
{
403+
name: "Create low priority pods that take up all node resources",
404+
createPods: []*v1.Pod{lowP1, lowP2, lowP3, lowP4},
405+
},
406+
{
407+
name: "Wait for all low priority pods to be scheduled",
408+
waitForPodsScheduled: []string{"low-p1", "low-p2", "low-p3", "low-p4"},
409+
},
410+
{
411+
name: "Create the Workload object",
412+
createPodGroup: gangPodGroup,
413+
},
414+
{
415+
name: "Create high priority gang pods",
416+
createPods: []*v1.Pod{p1, p2, p3, p4},
417+
},
418+
{
419+
name: "Verify all gang pods are scheduled successfully (after workload-aware preemption)",
420+
waitForPodsScheduled: []string{"p1", "p2", "p3", "p4"},
421+
},
422+
{
423+
name: "Verify preemption victims were removed",
424+
waitForPodsRemoved: []string{"low-p1", "low-p2", "low-p3", "low-p4"},
425+
},
426+
},
427+
},
387428
}
388429

389430
for _, tt := range tests {
390431
t.Run(tt.name, func(t *testing.T) {
391432
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
392-
features.GenericWorkload: true,
393-
features.GangScheduling: true,
433+
features.GenericWorkload: true,
434+
features.GangScheduling: true,
435+
features.WorkloadAwarePreemption: tt.enableWorkloadAwarePreemption,
394436
})
395437

396438
podgroupmanager.DefaultSchedulingTimeoutDuration = 5 * time.Second
@@ -473,6 +515,14 @@ func TestPodGroupScheduling(t *testing.T) {
473515
t.Fatalf("Step %d: Failed to wait for pod %s to be scheduled: %v", i, podName, err)
474516
}
475517
}
518+
case step.waitForPodsRemoved != nil:
519+
for _, podName := range step.waitForPodsRemoved {
520+
err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false,
521+
testutils.PodDeleted(testCtx.Ctx, cs, ns, podName))
522+
if err != nil {
523+
t.Fatalf("Step %d: Failed to wait for pod %s to be removed: %v", i, podName, err)
524+
}
525+
}
476526
case step.waitForAnyPodsScheduled != nil:
477527
err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false,
478528
func(ctx context.Context) (bool, error) {

0 commit comments

Comments
 (0)