Skip to content

Commit 7274e33

Browse files
committed
CRI: Make stats respect sandbox's platform
To further some ongoing work in containerd to make as much code as possible able to be used on any platform (to handle runtimes that can virtualize/emulate a variety of different OSes), this change makes stats able to be handled on any of the supported stat types (just linux and windows). To accomplish this, we use the platform the sandbox returns from its `Platform` rpc to decide what format the containers in a given sandbox are returning metrics in, then we can typecast/marshal accordingly. Signed-off-by: Danny Canter <[email protected]>
1 parent 8781329 commit 7274e33

8 files changed

Lines changed: 565 additions & 621 deletions

pkg/cri/sbserver/container_stats.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,12 @@ func (c *criService) ContainerStats(ctx context.Context, in *runtime.ContainerSt
4040
return nil, fmt.Errorf("unexpected metrics response: %+v", resp.Metrics)
4141
}
4242

43-
cs, err := c.containerMetrics(cntr.Metadata, resp.Metrics[0])
43+
handler, err := c.getMetricsHandler(ctx, cntr.SandboxID)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
cs, err := handler(cntr.Metadata, resp.Metrics[0])
4449
if err != nil {
4550
return nil, fmt.Errorf("failed to decode container metrics: %w", err)
4651
}

pkg/cri/sbserver/container_stats_list.go

Lines changed: 300 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@ package sbserver
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
23+
"reflect"
2224
"time"
2325

26+
wstats "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats"
27+
cg1 "github.com/containerd/cgroups/v3/cgroup1/stats"
28+
cg2 "github.com/containerd/cgroups/v3/cgroup2/stats"
2429
"github.com/containerd/containerd/api/services/tasks/v1"
2530
"github.com/containerd/containerd/api/types"
31+
"github.com/containerd/containerd/errdefs"
2632
"github.com/containerd/containerd/pkg/cri/store/stats"
33+
"github.com/containerd/containerd/protobuf"
34+
"github.com/containerd/typeurl/v2"
2735
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
2836

2937
containerstore "github.com/containerd/containerd/pkg/cri/store/container"
@@ -42,14 +50,48 @@ func (c *criService) ListContainerStats(
4250
if err != nil {
4351
return nil, fmt.Errorf("failed to fetch metrics for tasks: %w", err)
4452
}
45-
criStats, err := c.toCRIContainerStats(resp.Metrics, containers)
53+
criStats, err := c.toCRIContainerStats(ctx, resp.Metrics, containers)
4654
if err != nil {
4755
return nil, fmt.Errorf("failed to convert to cri containerd stats format: %w", err)
4856
}
4957
return criStats, nil
5058
}
5159

60+
type metricsHandler func(containerstore.Metadata, *types.Metric) (*runtime.ContainerStats, error)
61+
62+
// Returns a function to be used for transforming container metrics into the right format.
63+
// Uses the platform the given sandbox advertises to implement its logic. If the platform is
64+
// unsupported for metrics this will return a wrapped [errdefs.ErrNotImplemented].
65+
func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (metricsHandler, error) {
66+
sandbox, err := c.sandboxStore.Get(sandboxID)
67+
if err != nil {
68+
return nil, fmt.Errorf("failed to find sandbox id %q: %w", sandboxID, err)
69+
}
70+
controller, err := c.getSandboxController(sandbox.Config, sandbox.RuntimeHandler)
71+
if err != nil {
72+
return nil, fmt.Errorf("failed to get sandbox controller: %w", err)
73+
}
74+
// Grab the platform that this containers sandbox advertises. Reason being, even if
75+
// the host may be {insert platform}, if it virtualizes or emulates a different platform
76+
// it will return stats in that format, and we need to handle the conversion logic based
77+
// off of this info.
78+
p, err := controller.Platform(ctx, sandboxID)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
switch p.OS {
84+
case "windows":
85+
return c.windowsContainerMetrics, nil
86+
case "linux":
87+
return c.linuxContainerMetrics, nil
88+
default:
89+
return nil, fmt.Errorf("container metrics for platform %+v: %w", p, errdefs.ErrNotImplemented)
90+
}
91+
}
92+
5293
func (c *criService) toCRIContainerStats(
94+
ctx context.Context,
5395
stats []*types.Metric,
5496
containers []containerstore.Container,
5597
) (*runtime.ListContainerStatsResponse, error) {
@@ -58,8 +100,29 @@ func (c *criService) toCRIContainerStats(
58100
statsMap[stat.ID] = stat
59101
}
60102
containerStats := new(runtime.ListContainerStatsResponse)
103+
104+
// Unfortunately if no filter was passed we're asking for every containers stats which
105+
// generally belong to multiple different pods, who all might have different platforms.
106+
// To avoid recalculating the right metricsHandler to invoke, if we've already calculated
107+
// the platform and handler for a given sandbox just pull it from our map here.
108+
var (
109+
err error
110+
handler metricsHandler
111+
)
112+
sandboxToMetricsHandler := make(map[string]metricsHandler)
61113
for _, cntr := range containers {
62-
cs, err := c.containerMetrics(cntr.Metadata, statsMap[cntr.ID])
114+
h, ok := sandboxToMetricsHandler[cntr.SandboxID]
115+
if !ok {
116+
handler, err = c.getMetricsHandler(ctx, cntr.SandboxID)
117+
if err != nil {
118+
return nil, fmt.Errorf("failed to get metrics handler for container %q: %w", cntr.ID, err)
119+
}
120+
sandboxToMetricsHandler[cntr.SandboxID] = handler
121+
} else {
122+
handler = h
123+
}
124+
125+
cs, err := handler(cntr.Metadata, statsMap[cntr.ID])
63126
if err != nil {
64127
return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err)
65128
}
@@ -72,7 +135,6 @@ func (c *criService) toCRIContainerStats(
72135
}
73136
cs.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
74137
}
75-
76138
containerStats.Stats = append(containerStats.Stats, cs)
77139
}
78140
return containerStats, nil
@@ -133,7 +195,6 @@ func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, curre
133195
if err != nil {
134196
return 0, fmt.Errorf("failed to update sandbox container stats: %s: %w", containerID, err)
135197
}
136-
137198
} else {
138199
err := c.containerStore.UpdateContainerStats(containerID, newStats)
139200
if err != nil {
@@ -193,3 +254,238 @@ func matchLabelSelector(selector, labels map[string]string) bool {
193254
}
194255
return true
195256
}
257+
258+
func (c *criService) windowsContainerMetrics(
259+
meta containerstore.Metadata,
260+
stats *types.Metric,
261+
) (*runtime.ContainerStats, error) {
262+
var cs runtime.ContainerStats
263+
var usedBytes, inodesUsed uint64
264+
sn, err := c.GetSnapshot(meta.ID)
265+
// If snapshotstore doesn't have cached snapshot information
266+
// set WritableLayer usage to zero
267+
if err == nil {
268+
usedBytes = sn.Size
269+
inodesUsed = sn.Inodes
270+
}
271+
cs.WritableLayer = &runtime.FilesystemUsage{
272+
Timestamp: sn.Timestamp,
273+
FsId: &runtime.FilesystemIdentifier{
274+
Mountpoint: c.imageFSPath,
275+
},
276+
UsedBytes: &runtime.UInt64Value{Value: usedBytes},
277+
InodesUsed: &runtime.UInt64Value{Value: inodesUsed},
278+
}
279+
cs.Attributes = &runtime.ContainerAttributes{
280+
Id: meta.ID,
281+
Metadata: meta.Config.GetMetadata(),
282+
Labels: meta.Config.GetLabels(),
283+
Annotations: meta.Config.GetAnnotations(),
284+
}
285+
286+
if stats != nil {
287+
s, err := typeurl.UnmarshalAny(stats.Data)
288+
if err != nil {
289+
return nil, fmt.Errorf("failed to extract container metrics: %w", err)
290+
}
291+
wstats := s.(*wstats.Statistics).GetWindows()
292+
if wstats == nil {
293+
return nil, errors.New("windows stats is empty")
294+
}
295+
if wstats.Processor != nil {
296+
cs.Cpu = &runtime.CpuUsage{
297+
Timestamp: wstats.Timestamp.UnixNano(),
298+
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: wstats.Processor.TotalRuntimeNS},
299+
}
300+
}
301+
if wstats.Memory != nil {
302+
cs.Memory = &runtime.MemoryUsage{
303+
Timestamp: wstats.Timestamp.UnixNano(),
304+
WorkingSetBytes: &runtime.UInt64Value{
305+
Value: wstats.Memory.MemoryUsagePrivateWorkingSetBytes,
306+
},
307+
}
308+
}
309+
}
310+
return &cs, nil
311+
}
312+
313+
func (c *criService) linuxContainerMetrics(
314+
meta containerstore.Metadata,
315+
stats *types.Metric,
316+
) (*runtime.ContainerStats, error) {
317+
var cs runtime.ContainerStats
318+
var usedBytes, inodesUsed uint64
319+
sn, err := c.GetSnapshot(meta.ID)
320+
// If snapshotstore doesn't have cached snapshot information
321+
// set WritableLayer usage to zero
322+
if err == nil {
323+
usedBytes = sn.Size
324+
inodesUsed = sn.Inodes
325+
}
326+
cs.WritableLayer = &runtime.FilesystemUsage{
327+
Timestamp: sn.Timestamp,
328+
FsId: &runtime.FilesystemIdentifier{
329+
Mountpoint: c.imageFSPath,
330+
},
331+
UsedBytes: &runtime.UInt64Value{Value: usedBytes},
332+
InodesUsed: &runtime.UInt64Value{Value: inodesUsed},
333+
}
334+
cs.Attributes = &runtime.ContainerAttributes{
335+
Id: meta.ID,
336+
Metadata: meta.Config.GetMetadata(),
337+
Labels: meta.Config.GetLabels(),
338+
Annotations: meta.Config.GetAnnotations(),
339+
}
340+
341+
if stats != nil {
342+
var data interface{}
343+
switch {
344+
case typeurl.Is(stats.Data, (*cg1.Metrics)(nil)):
345+
data = &cg1.Metrics{}
346+
case typeurl.Is(stats.Data, (*cg2.Metrics)(nil)):
347+
data = &cg2.Metrics{}
348+
case typeurl.Is(stats.Data, (*wstats.Statistics)(nil)):
349+
data = &wstats.Statistics{}
350+
default:
351+
return nil, errors.New("cannot convert metric data to cgroups.Metrics or windows.Statistics")
352+
}
353+
354+
if err := typeurl.UnmarshalTo(stats.Data, data); err != nil {
355+
return nil, fmt.Errorf("failed to extract container metrics: %w", err)
356+
}
357+
358+
cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, data, protobuf.FromTimestamp(stats.Timestamp))
359+
if err != nil {
360+
return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
361+
}
362+
cs.Cpu = cpuStats
363+
364+
memoryStats, err := c.memoryContainerStats(meta.ID, data, protobuf.FromTimestamp(stats.Timestamp))
365+
if err != nil {
366+
return nil, fmt.Errorf("failed to obtain memory stats: %w", err)
367+
}
368+
cs.Memory = memoryStats
369+
}
370+
371+
return &cs, nil
372+
}
373+
374+
// getWorkingSet calculates workingset memory from cgroup memory stats.
375+
// The caller should make sure memory is not nil.
376+
// workingset = usage - total_inactive_file
377+
func getWorkingSet(memory *cg1.MemoryStat) uint64 {
378+
if memory.Usage == nil {
379+
return 0
380+
}
381+
var workingSet uint64
382+
if memory.TotalInactiveFile < memory.Usage.Usage {
383+
workingSet = memory.Usage.Usage - memory.TotalInactiveFile
384+
}
385+
return workingSet
386+
}
387+
388+
// getWorkingSetV2 calculates workingset memory from cgroupv2 memory stats.
389+
// The caller should make sure memory is not nil.
390+
// workingset = usage - inactive_file
391+
func getWorkingSetV2(memory *cg2.MemoryStat) uint64 {
392+
var workingSet uint64
393+
if memory.InactiveFile < memory.Usage {
394+
workingSet = memory.Usage - memory.InactiveFile
395+
}
396+
return workingSet
397+
}
398+
399+
func isMemoryUnlimited(v uint64) bool {
400+
// Size after which we consider memory to be "unlimited". This is not
401+
// MaxInt64 due to rounding by the kernel.
402+
// TODO: k8s or cadvisor should export this https://github.com/google/cadvisor/blob/2b6fbacac7598e0140b5bc8428e3bdd7d86cf5b9/metrics/prometheus.go#L1969-L1971
403+
const maxMemorySize = uint64(1 << 62)
404+
405+
return v > maxMemorySize
406+
}
407+
408+
// https://github.com/kubernetes/kubernetes/blob/b47f8263e18c7b13dba33fba23187e5e0477cdbd/pkg/kubelet/stats/helper.go#L68-L71
409+
func getAvailableBytes(memory *cg1.MemoryStat, workingSetBytes uint64) uint64 {
410+
// memory limit - working set bytes
411+
if !isMemoryUnlimited(memory.Usage.Limit) {
412+
return memory.Usage.Limit - workingSetBytes
413+
}
414+
return 0
415+
}
416+
417+
func getAvailableBytesV2(memory *cg2.MemoryStat, workingSetBytes uint64) uint64 {
418+
// memory limit (memory.max) for cgroupv2 - working set bytes
419+
if !isMemoryUnlimited(memory.UsageLimit) {
420+
return memory.UsageLimit - workingSetBytes
421+
}
422+
return 0
423+
}
424+
425+
func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats interface{}, timestamp time.Time) (*runtime.CpuUsage, error) {
426+
switch metrics := stats.(type) {
427+
case *cg1.Metrics:
428+
metrics.GetCPU().GetUsage()
429+
if metrics.CPU != nil && metrics.CPU.Usage != nil {
430+
return &runtime.CpuUsage{
431+
Timestamp: timestamp.UnixNano(),
432+
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: metrics.CPU.Usage.Total},
433+
}, nil
434+
}
435+
case *cg2.Metrics:
436+
if metrics.CPU != nil {
437+
// convert to nano seconds
438+
usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000
439+
440+
return &runtime.CpuUsage{
441+
Timestamp: timestamp.UnixNano(),
442+
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
443+
}, nil
444+
}
445+
default:
446+
return nil, fmt.Errorf("unexpected metrics type: %T from %s", metrics, reflect.TypeOf(metrics).Elem().PkgPath())
447+
}
448+
return nil, nil
449+
}
450+
451+
func (c *criService) memoryContainerStats(ID string, stats interface{}, timestamp time.Time) (*runtime.MemoryUsage, error) {
452+
switch metrics := stats.(type) {
453+
case *cg1.Metrics:
454+
if metrics.Memory != nil && metrics.Memory.Usage != nil {
455+
workingSetBytes := getWorkingSet(metrics.Memory)
456+
457+
return &runtime.MemoryUsage{
458+
Timestamp: timestamp.UnixNano(),
459+
WorkingSetBytes: &runtime.UInt64Value{
460+
Value: workingSetBytes,
461+
},
462+
AvailableBytes: &runtime.UInt64Value{Value: getAvailableBytes(metrics.Memory, workingSetBytes)},
463+
UsageBytes: &runtime.UInt64Value{Value: metrics.Memory.Usage.Usage},
464+
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.TotalRSS},
465+
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.TotalPgFault},
466+
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.TotalPgMajFault},
467+
}, nil
468+
}
469+
case *cg2.Metrics:
470+
if metrics.Memory != nil {
471+
workingSetBytes := getWorkingSetV2(metrics.Memory)
472+
473+
return &runtime.MemoryUsage{
474+
Timestamp: timestamp.UnixNano(),
475+
WorkingSetBytes: &runtime.UInt64Value{
476+
Value: workingSetBytes,
477+
},
478+
AvailableBytes: &runtime.UInt64Value{Value: getAvailableBytesV2(metrics.Memory, workingSetBytes)},
479+
UsageBytes: &runtime.UInt64Value{Value: metrics.Memory.Usage},
480+
// Use Anon memory for RSS as cAdvisor on cgroupv2
481+
// see https://github.com/google/cadvisor/blob/a9858972e75642c2b1914c8d5428e33e6392c08a/container/libcontainer/handler.go#L799
482+
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.Anon},
483+
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgfault},
484+
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgmajfault},
485+
}, nil
486+
}
487+
default:
488+
return nil, fmt.Errorf("unexpected metrics type: %T from %s", metrics, reflect.TypeOf(metrics).Elem().PkgPath())
489+
}
490+
return nil, nil
491+
}

0 commit comments

Comments
 (0)