Skip to content

Commit 7804b51

Browse files
KevinTMtzndixita
authored andcommitted
CPU and Memory manager event when using pod level resources
1 parent 924b324 commit 7804b51

9 files changed

Lines changed: 171 additions & 10 deletions

File tree

pkg/kubelet/allocation/allocation_manager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package allocation
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"path/filepath"
2324
"slices"
@@ -41,6 +42,7 @@ import (
4142
"k8s.io/kubernetes/pkg/features"
4243
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
4344
"k8s.io/kubernetes/pkg/kubelet/cm"
45+
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
4446
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
4547
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
4648
"k8s.io/kubernetes/pkg/kubelet/config"
@@ -701,6 +703,13 @@ func (m *manager) canAdmitPod(allocatedPods []*v1.Pod, pod *v1.Pod) (bool, strin
701703
if result := podAdmitHandler.Admit(attrs); !result.Admit {
702704
klog.InfoS("Pod admission denied", "podUID", attrs.Pod.UID, "pod", klog.KObj(attrs.Pod), "reason", result.Reason, "message", result.Message)
703705
return false, result.Reason, result.Message
706+
} else if result.Admit && len(result.Errors) > 0 && result.Reason == admission.PodLevelResourcesIncompatible && utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) {
707+
for _, err := range result.Errors {
708+
var admissionWarning admission.Error
709+
if errors.As(err, &admissionWarning) {
710+
m.recorder.Event(attrs.Pod, v1.EventTypeWarning, admission.PodLevelResourcesIncompatible, admissionWarning.Error())
711+
}
712+
}
704713
}
705714
}
706715

pkg/kubelet/cm/admission/errors.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,19 @@ import (
2020
"errors"
2121
"fmt"
2222

23+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2324
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
2425
)
2526

2627
const (
2728
ErrorReasonUnexpected = "UnexpectedAdmissionError"
29+
30+
// Explicit reason when CPU/Memory manager's policy is incompatible with pod level resources.
31+
PodLevelResourcesIncompatible = "PodLevelResourcesIncompatible"
32+
33+
// Warnings for pod level resources when manager's policy incompatibility.
34+
CPUManagerPodLevelResourcesError = "CPUManagerPodLevelResourcesError"
35+
MemoryManagerPodLevelResourcesError = "MemoryManagerPodLevelResourcesError"
2836
)
2937

3038
type Error interface {
@@ -49,14 +57,53 @@ func GetPodAdmitResult(err error) lifecycle.PodAdmitResult {
4957
return lifecycle.PodAdmitResult{Admit: true}
5058
}
5159

60+
var errs []error
61+
// To support multiple pod-level resource errors, we need to check if the error
62+
// is an aggregate error.
63+
var agg utilerrors.Aggregate
64+
if errors.As(err, &agg) {
65+
errs = agg.Errors()
66+
} else {
67+
errs = []error{err}
68+
}
69+
70+
var podLevelWarnings []error
71+
var otherErrs []error
72+
for _, e := range errs {
73+
var admissionErr Error
74+
if errors.As(e, &admissionErr) && (admissionErr.Type() == CPUManagerPodLevelResourcesError || admissionErr.Type() == MemoryManagerPodLevelResourcesError) {
75+
podLevelWarnings = append(podLevelWarnings, e)
76+
} else {
77+
otherErrs = append(otherErrs, e)
78+
}
79+
}
80+
81+
// If all errors are pod-level resource errors, we should treat them as warnings
82+
// and not block pod admission.
83+
if len(otherErrs) == 0 && len(podLevelWarnings) > 0 {
84+
return lifecycle.PodAdmitResult{
85+
Admit: true,
86+
Reason: PodLevelResourcesIncompatible,
87+
Message: "",
88+
Errors: podLevelWarnings,
89+
}
90+
}
91+
92+
if len(otherErrs) == 0 {
93+
// This should not happen if err != nil, but as a safeguard.
94+
return lifecycle.PodAdmitResult{Admit: true}
95+
}
96+
97+
// At this point, we have at least one error that requires pod rejection.
98+
firstErr := otherErrs[0]
5299
var admissionErr Error
53-
if !errors.As(err, &admissionErr) {
54-
admissionErr = &unexpectedAdmissionError{err}
100+
if !errors.As(firstErr, &admissionErr) {
101+
admissionErr = &unexpectedAdmissionError{firstErr}
55102
}
56103

57104
return lifecycle.PodAdmitResult{
105+
Admit: false,
58106
Message: admissionErr.Error(),
59107
Reason: admissionErr.Type(),
60-
Admit: false,
61108
}
62109
}

pkg/kubelet/cm/cpumanager/cpu_manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package cpumanager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"math"
2324
"sync"
@@ -261,6 +262,13 @@ func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
261262

262263
// Call down into the policy to assign this container CPUs if required.
263264
err := m.policy.Allocate(m.state, p, c)
265+
266+
// If it gets this error it means that the pod requires pod level resources but this is not aligned.
267+
// We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning
268+
if errors.As(err, &CPUManagerPodLevelResourcesError{}) {
269+
return err
270+
}
271+
264272
if err != nil {
265273
klog.ErrorS(err, "Allocate error")
266274
return err

pkg/kubelet/cm/cpumanager/policy_static.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package cpumanager
1818

1919
import (
2020
"fmt"
21+
2122
"strconv"
2223

2324
v1 "k8s.io/api/core/v1"
2425
utilfeature "k8s.io/apiserver/pkg/util/feature"
26+
resourcehelper "k8s.io/component-helpers/resource"
2527
"k8s.io/klog/v2"
2628
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
2729
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
@@ -35,13 +37,15 @@ import (
3537
)
3638

3739
const (
38-
3940
// PolicyStatic is the name of the static policy.
4041
// Should options be given, these will be ignored and backward (up to 1.21 included)
4142
// compatible behaviour will be enforced
4243
PolicyStatic policyName = "static"
4344
// ErrorSMTAlignment represents the type of an SMTAlignmentError
4445
ErrorSMTAlignment = "SMTAlignmentError"
46+
47+
// ErrorCPUManagerPodLevelResources represents the type of a CPUManagerPodLevelResourcesError
48+
ErrorCPUManagerPodLevelResources = "CPUManagerPodLevelResourcesError"
4549
)
4650

4751
// SMTAlignmentError represents an error due to SMT alignment
@@ -52,6 +56,15 @@ type SMTAlignmentError struct {
5256
CausedByPhysicalCPUs bool
5357
}
5458

59+
// PodLevelResourcesError represents an error due to pod-level resources not being supported.
60+
type CPUManagerPodLevelResourcesError struct{}
61+
62+
func (e CPUManagerPodLevelResourcesError) Error() string {
63+
return "CPU Manager static policy does not support pod-level resources"
64+
}
65+
66+
func (e CPUManagerPodLevelResourcesError) Type() string { return ErrorCPUManagerPodLevelResources }
67+
5568
func (e SMTAlignmentError) Error() string {
5669
if e.CausedByPhysicalCPUs {
5770
return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore)
@@ -316,6 +329,10 @@ func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, c
316329
func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
317330
numCPUs := p.guaranteedCPUs(pod, container)
318331
if numCPUs == 0 {
332+
if p.isPodWithPodLevelResources(pod) {
333+
return CPUManagerPodLevelResourcesError{}
334+
}
335+
319336
// container belongs in the shared pool (nothing to do; use default cpuset)
320337
return nil
321338
}
@@ -467,6 +484,12 @@ func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int
467484
klog.V(5).InfoS("Exclusive CPU allocation skipped, pod QoS is not guaranteed", "pod", klog.KObj(pod), "containerName", container.Name, "qos", qos)
468485
return 0
469486
}
487+
488+
// The CPU manager static policy does not support pod-level resources.
489+
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
490+
return 0
491+
}
492+
470493
cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
471494
// In-place pod resize feature makes Container.Resources field mutable for CPU & memory.
472495
// AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted.
@@ -814,3 +837,14 @@ func updateAllocationPerNUMAMetric(topo *topology.CPUTopology, allocatedCPUs cpu
814837
metrics.CPUManagerAllocationPerNUMA.WithLabelValues(strconv.Itoa(numaNode)).Set(float64(count))
815838
}
816839
}
840+
841+
func (p *staticPolicy) isPodWithPodLevelResources(pod *v1.Pod) bool {
842+
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
843+
// The Memory manager static policy does not support pod-level resources.
844+
klog.V(5).InfoS("CPU Manager allocation skipped, pod is using pod-level resources which are not supported by the static CPU manager policy", "pod", klog.KObj(pod))
845+
846+
return true
847+
}
848+
849+
return false
850+
}

pkg/kubelet/cm/memorymanager/memory_manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package memorymanager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"runtime"
2324
"sync"
@@ -273,6 +274,12 @@ func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
273274

274275
// Call down into the policy to assign this container memory if required.
275276
if err := m.policy.Allocate(ctx, m.state, pod, container); err != nil {
277+
// If it gets this error it means that the pod requires pod level resources but this is not aligned.
278+
// We do not want the pod to fail to schedule on this error so we Admit it, this is just a warning
279+
if errors.As(err, &MemoryManagerPodLevelResourcesError{}) {
280+
return err
281+
}
282+
276283
logger.Error(err, "Allocate error", "pod", klog.KObj(pod), "containerName", container.Name)
277284
return err
278285
}

pkg/kubelet/cm/memorymanager/policy_static.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
v1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/api/resource"
2929
utilfeature "k8s.io/apiserver/pkg/util/feature"
30+
resourcehelper "k8s.io/component-helpers/resource"
3031
"k8s.io/klog/v2"
3132
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
3233
corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -38,7 +39,12 @@ import (
3839
"k8s.io/kubernetes/pkg/kubelet/metrics"
3940
)
4041

41-
const PolicyTypeStatic policyType = "Static"
42+
const (
43+
PolicyTypeStatic policyType = "Static"
44+
45+
// ErrorMemoryManagerPodLevelResources represents the type of a MemoryManagerPodLevelResourcesError
46+
ErrorMemoryManagerPodLevelResources = "MemoryManagerPodLevelResourcesError"
47+
)
4248

4349
type systemReservedMemory map[int]map[v1.ResourceName]uint64
4450
type reusableMemory map[string]map[string]map[v1.ResourceName]uint64
@@ -60,6 +66,16 @@ type staticPolicy struct {
6066

6167
var _ Policy = &staticPolicy{}
6268

69+
type MemoryManagerPodLevelResourcesError struct{}
70+
71+
func (e MemoryManagerPodLevelResourcesError) Type() string {
72+
return ErrorMemoryManagerPodLevelResources
73+
}
74+
75+
func (e MemoryManagerPodLevelResourcesError) Error() string {
76+
return "Memory Manager static policy does not support pod-level resources"
77+
}
78+
6379
// NewPolicyStatic returns new static policy instance
6480
func NewPolicyStatic(ctx context.Context, machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
6581
var totalSystemReserved uint64
@@ -107,6 +123,10 @@ func (p *staticPolicy) Allocate(ctx context.Context, s state.State, pod *v1.Pod,
107123
return nil
108124
}
109125

126+
if p.isPodWithPodLevelResources(ctx, pod) {
127+
return MemoryManagerPodLevelResourcesError{}
128+
}
129+
110130
podUID := string(pod.UID)
111131
logger.Info("Allocate")
112132
// container belongs in an exclusively allocated pool
@@ -406,6 +426,10 @@ func (p *staticPolicy) GetPodTopologyHints(ctx context.Context, s state.State, p
406426
return nil
407427
}
408428

429+
if p.isPodWithPodLevelResources(ctx, pod) {
430+
return nil
431+
}
432+
409433
reqRsrcs, err := getPodRequestedResources(pod)
410434
if err != nil {
411435
logger.Error(err, "Failed to get pod requested resources", "podUID", pod.UID)
@@ -436,6 +460,10 @@ func (p *staticPolicy) GetTopologyHints(ctx context.Context, s state.State, pod
436460
return nil
437461
}
438462

463+
if p.isPodWithPodLevelResources(ctx, pod) {
464+
return nil
465+
}
466+
439467
requestedResources, err := getRequestedResources(pod, container)
440468
if err != nil {
441469
logger.Error(err, "Failed to get container requested resources", "podUID", pod.UID, "containerName", container.Name)
@@ -1076,3 +1104,16 @@ func isAffinityViolatingNUMAAllocations(machineState state.NUMANodeMap, mask bit
10761104
}
10771105
return false
10781106
}
1107+
1108+
func (p *staticPolicy) isPodWithPodLevelResources(ctx context.Context, pod *v1.Pod) bool {
1109+
logger := klog.FromContext(ctx)
1110+
1111+
if utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources) && resourcehelper.IsPodLevelResourcesSet(pod) {
1112+
// The Memory manager static policy does not support pod-level resources.
1113+
logger.V(5).Info("Memory manager allocation skipped, pod is using pod-level resources which are not supported by the static Memory manager policy", "pod", klog.KObj(pod))
1114+
1115+
return true
1116+
}
1117+
1118+
return false
1119+
}

pkg/kubelet/cm/memorymanager/policy_static_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,9 +2017,9 @@ func TestStaticPolicyAllocate(t *testing.T) {
20172017
}
20182018

20192019
err = p.Allocate(tCtx, s, testCase.pod, &testCase.pod.Spec.Containers[0])
2020-
if !reflect.DeepEqual(err, testCase.expectedError) {
2021-
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
2022-
}
2020+
if (err == nil) != (testCase.expectedError == nil) || (err != nil && testCase.expectedError != nil && err.Error() != testCase.expectedError.Error()) {
2021+
t.Fatalf("The actual error %v is different from the expected one %v", err, testCase.expectedError)
2022+
}
20232023

20242024
if err != nil {
20252025
return

pkg/kubelet/cm/topologymanager/scope.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ package topologymanager
1919
import (
2020
"sync"
2121

22-
"k8s.io/api/core/v1"
22+
v1 "k8s.io/api/core/v1"
23+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
24+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2325
"k8s.io/klog/v2"
26+
"k8s.io/kubernetes/pkg/features"
2427
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
2528
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
2629
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@@ -148,11 +151,21 @@ func (s *scope) admitPolicyNone(pod *v1.Pod) lifecycle.PodAdmitResult {
148151
// It would be better to implement this function in topologymanager instead of scope
149152
// but topologymanager do not track providers anymore
150153
func (s *scope) allocateAlignedResources(pod *v1.Pod, container *v1.Container) error {
154+
isPodLevelResourcesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.PodLevelResources)
155+
156+
var errs []error
151157
for _, provider := range s.hintProviders {
152158
err := provider.Allocate(pod, container)
153-
if err != nil {
159+
if err != nil && isPodLevelResourcesEnabled {
160+
errs = append(errs, err)
161+
} else if err != nil {
154162
return err
155163
}
156164
}
165+
166+
if isPodLevelResourcesEnabled {
167+
return utilerrors.NewAggregate(errs)
168+
}
169+
157170
return nil
158171
}

pkg/kubelet/lifecycle/interfaces.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type PodAdmitResult struct {
3535
Reason string
3636
// a brief message explaining why the pod could not be admitted.
3737
Message string
38+
// all errors for why the pod could not be admitted.
39+
Errors []error
3840
}
3941

4042
// PodAdmitHandler is notified during pod admission.

0 commit comments

Comments
 (0)