@@ -18,12 +18,20 @@ package sbserver
1818
1919import (
2020 "context"
21+ "errors"
2122 "fmt"
23+ "reflect"
2224 "time"
2325
26+ wstats "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats"
27+ cg1 "github.com/containerd/cgroups/v3/cgroup1/stats"
28+ cg2 "github.com/containerd/cgroups/v3/cgroup2/stats"
2429 "github.com/containerd/containerd/api/services/tasks/v1"
2530 "github.com/containerd/containerd/api/types"
31+ "github.com/containerd/containerd/errdefs"
2632 "github.com/containerd/containerd/pkg/cri/store/stats"
33+ "github.com/containerd/containerd/protobuf"
34+ "github.com/containerd/typeurl/v2"
2735 runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
2836
2937 containerstore "github.com/containerd/containerd/pkg/cri/store/container"
@@ -42,14 +50,48 @@ func (c *criService) ListContainerStats(
4250 if err != nil {
4351 return nil , fmt .Errorf ("failed to fetch metrics for tasks: %w" , err )
4452 }
45- criStats , err := c .toCRIContainerStats (resp .Metrics , containers )
53+ criStats , err := c .toCRIContainerStats (ctx , resp .Metrics , containers )
4654 if err != nil {
4755 return nil , fmt .Errorf ("failed to convert to cri containerd stats format: %w" , err )
4856 }
4957 return criStats , nil
5058}
5159
60+ type metricsHandler func (containerstore.Metadata , * types.Metric ) (* runtime.ContainerStats , error )
61+
62+ // Returns a function to be used for transforming container metrics into the right format.
63+ // Uses the platform the given sandbox advertises to implement its logic. If the platform is
64+ // unsupported for metrics this will return a wrapped [errdefs.ErrNotImplemented].
65+ func (c * criService ) getMetricsHandler (ctx context.Context , sandboxID string ) (metricsHandler , error ) {
66+ sandbox , err := c .sandboxStore .Get (sandboxID )
67+ if err != nil {
68+ return nil , fmt .Errorf ("failed to find sandbox id %q: %w" , sandboxID , err )
69+ }
70+ controller , err := c .getSandboxController (sandbox .Config , sandbox .RuntimeHandler )
71+ if err != nil {
72+ return nil , fmt .Errorf ("failed to get sandbox controller: %w" , err )
73+ }
74+ // Grab the platform that this containers sandbox advertises. Reason being, even if
75+ // the host may be {insert platform}, if it virtualizes or emulates a different platform
76+ // it will return stats in that format, and we need to handle the conversion logic based
77+ // off of this info.
78+ p , err := controller .Platform (ctx , sandboxID )
79+ if err != nil {
80+ return nil , err
81+ }
82+
83+ switch p .OS {
84+ case "windows" :
85+ return c .windowsContainerMetrics , nil
86+ case "linux" :
87+ return c .linuxContainerMetrics , nil
88+ default :
89+ return nil , fmt .Errorf ("container metrics for platform %+v: %w" , p , errdefs .ErrNotImplemented )
90+ }
91+ }
92+
5293func (c * criService ) toCRIContainerStats (
94+ ctx context.Context ,
5395 stats []* types.Metric ,
5496 containers []containerstore.Container ,
5597) (* runtime.ListContainerStatsResponse , error ) {
@@ -58,8 +100,29 @@ func (c *criService) toCRIContainerStats(
58100 statsMap [stat .ID ] = stat
59101 }
60102 containerStats := new (runtime.ListContainerStatsResponse )
103+
104+ // Unfortunately if no filter was passed we're asking for every containers stats which
105+ // generally belong to multiple different pods, who all might have different platforms.
106+ // To avoid recalculating the right metricsHandler to invoke, if we've already calculated
107+ // the platform and handler for a given sandbox just pull it from our map here.
108+ var (
109+ err error
110+ handler metricsHandler
111+ )
112+ sandboxToMetricsHandler := make (map [string ]metricsHandler )
61113 for _ , cntr := range containers {
62- cs , err := c .containerMetrics (cntr .Metadata , statsMap [cntr .ID ])
114+ h , ok := sandboxToMetricsHandler [cntr .SandboxID ]
115+ if ! ok {
116+ handler , err = c .getMetricsHandler (ctx , cntr .SandboxID )
117+ if err != nil {
118+ return nil , fmt .Errorf ("failed to get metrics handler for container %q: %w" , cntr .ID , err )
119+ }
120+ sandboxToMetricsHandler [cntr .SandboxID ] = handler
121+ } else {
122+ handler = h
123+ }
124+
125+ cs , err := handler (cntr .Metadata , statsMap [cntr .ID ])
63126 if err != nil {
64127 return nil , fmt .Errorf ("failed to decode container metrics for %q: %w" , cntr .ID , err )
65128 }
@@ -72,7 +135,6 @@ func (c *criService) toCRIContainerStats(
72135 }
73136 cs .Cpu .UsageNanoCores = & runtime.UInt64Value {Value : nanoUsage }
74137 }
75-
76138 containerStats .Stats = append (containerStats .Stats , cs )
77139 }
78140 return containerStats , nil
@@ -133,7 +195,6 @@ func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, curre
133195 if err != nil {
134196 return 0 , fmt .Errorf ("failed to update sandbox container stats: %s: %w" , containerID , err )
135197 }
136-
137198 } else {
138199 err := c .containerStore .UpdateContainerStats (containerID , newStats )
139200 if err != nil {
@@ -193,3 +254,238 @@ func matchLabelSelector(selector, labels map[string]string) bool {
193254 }
194255 return true
195256}
257+
258+ func (c * criService ) windowsContainerMetrics (
259+ meta containerstore.Metadata ,
260+ stats * types.Metric ,
261+ ) (* runtime.ContainerStats , error ) {
262+ var cs runtime.ContainerStats
263+ var usedBytes , inodesUsed uint64
264+ sn , err := c .snapshotStore .Get (meta .ID )
265+ // If snapshotstore doesn't have cached snapshot information
266+ // set WritableLayer usage to zero
267+ if err == nil {
268+ usedBytes = sn .Size
269+ inodesUsed = sn .Inodes
270+ }
271+ cs .WritableLayer = & runtime.FilesystemUsage {
272+ Timestamp : sn .Timestamp ,
273+ FsId : & runtime.FilesystemIdentifier {
274+ Mountpoint : c .imageFSPath ,
275+ },
276+ UsedBytes : & runtime.UInt64Value {Value : usedBytes },
277+ InodesUsed : & runtime.UInt64Value {Value : inodesUsed },
278+ }
279+ cs .Attributes = & runtime.ContainerAttributes {
280+ Id : meta .ID ,
281+ Metadata : meta .Config .GetMetadata (),
282+ Labels : meta .Config .GetLabels (),
283+ Annotations : meta .Config .GetAnnotations (),
284+ }
285+
286+ if stats != nil {
287+ s , err := typeurl .UnmarshalAny (stats .Data )
288+ if err != nil {
289+ return nil , fmt .Errorf ("failed to extract container metrics: %w" , err )
290+ }
291+ wstats := s .(* wstats.Statistics ).GetWindows ()
292+ if wstats == nil {
293+ return nil , errors .New ("windows stats is empty" )
294+ }
295+ if wstats .Processor != nil {
296+ cs .Cpu = & runtime.CpuUsage {
297+ Timestamp : wstats .Timestamp .UnixNano (),
298+ UsageCoreNanoSeconds : & runtime.UInt64Value {Value : wstats .Processor .TotalRuntimeNS },
299+ }
300+ }
301+ if wstats .Memory != nil {
302+ cs .Memory = & runtime.MemoryUsage {
303+ Timestamp : wstats .Timestamp .UnixNano (),
304+ WorkingSetBytes : & runtime.UInt64Value {
305+ Value : wstats .Memory .MemoryUsagePrivateWorkingSetBytes ,
306+ },
307+ }
308+ }
309+ }
310+ return & cs , nil
311+ }
312+
313+ func (c * criService ) linuxContainerMetrics (
314+ meta containerstore.Metadata ,
315+ stats * types.Metric ,
316+ ) (* runtime.ContainerStats , error ) {
317+ var cs runtime.ContainerStats
318+ var usedBytes , inodesUsed uint64
319+ sn , err := c .snapshotStore .Get (meta .ID )
320+ // If snapshotstore doesn't have cached snapshot information
321+ // set WritableLayer usage to zero
322+ if err == nil {
323+ usedBytes = sn .Size
324+ inodesUsed = sn .Inodes
325+ }
326+ cs .WritableLayer = & runtime.FilesystemUsage {
327+ Timestamp : sn .Timestamp ,
328+ FsId : & runtime.FilesystemIdentifier {
329+ Mountpoint : c .imageFSPath ,
330+ },
331+ UsedBytes : & runtime.UInt64Value {Value : usedBytes },
332+ InodesUsed : & runtime.UInt64Value {Value : inodesUsed },
333+ }
334+ cs .Attributes = & runtime.ContainerAttributes {
335+ Id : meta .ID ,
336+ Metadata : meta .Config .GetMetadata (),
337+ Labels : meta .Config .GetLabels (),
338+ Annotations : meta .Config .GetAnnotations (),
339+ }
340+
341+ if stats != nil {
342+ var data interface {}
343+ switch {
344+ case typeurl .Is (stats .Data , (* cg1 .Metrics )(nil )):
345+ data = & cg1.Metrics {}
346+ case typeurl .Is (stats .Data , (* cg2 .Metrics )(nil )):
347+ data = & cg2.Metrics {}
348+ case typeurl .Is (stats .Data , (* wstats .Statistics )(nil )):
349+ data = & wstats.Statistics {}
350+ default :
351+ return nil , errors .New ("cannot convert metric data to cgroups.Metrics or windows.Statistics" )
352+ }
353+
354+ if err := typeurl .UnmarshalTo (stats .Data , data ); err != nil {
355+ return nil , fmt .Errorf ("failed to extract container metrics: %w" , err )
356+ }
357+
358+ cpuStats , err := c .cpuContainerStats (meta .ID , false /* isSandbox */ , data , protobuf .FromTimestamp (stats .Timestamp ))
359+ if err != nil {
360+ return nil , fmt .Errorf ("failed to obtain cpu stats: %w" , err )
361+ }
362+ cs .Cpu = cpuStats
363+
364+ memoryStats , err := c .memoryContainerStats (meta .ID , data , protobuf .FromTimestamp (stats .Timestamp ))
365+ if err != nil {
366+ return nil , fmt .Errorf ("failed to obtain memory stats: %w" , err )
367+ }
368+ cs .Memory = memoryStats
369+ }
370+
371+ return & cs , nil
372+ }
373+
374+ // getWorkingSet calculates workingset memory from cgroup memory stats.
375+ // The caller should make sure memory is not nil.
376+ // workingset = usage - total_inactive_file
377+ func getWorkingSet (memory * cg1.MemoryStat ) uint64 {
378+ if memory .Usage == nil {
379+ return 0
380+ }
381+ var workingSet uint64
382+ if memory .TotalInactiveFile < memory .Usage .Usage {
383+ workingSet = memory .Usage .Usage - memory .TotalInactiveFile
384+ }
385+ return workingSet
386+ }
387+
388+ // getWorkingSetV2 calculates workingset memory from cgroupv2 memory stats.
389+ // The caller should make sure memory is not nil.
390+ // workingset = usage - inactive_file
391+ func getWorkingSetV2 (memory * cg2.MemoryStat ) uint64 {
392+ var workingSet uint64
393+ if memory .InactiveFile < memory .Usage {
394+ workingSet = memory .Usage - memory .InactiveFile
395+ }
396+ return workingSet
397+ }
398+
399+ func isMemoryUnlimited (v uint64 ) bool {
400+ // Size after which we consider memory to be "unlimited". This is not
401+ // MaxInt64 due to rounding by the kernel.
402+ // TODO: k8s or cadvisor should export this https://github.com/google/cadvisor/blob/2b6fbacac7598e0140b5bc8428e3bdd7d86cf5b9/metrics/prometheus.go#L1969-L1971
403+ const maxMemorySize = uint64 (1 << 62 )
404+
405+ return v > maxMemorySize
406+ }
407+
408+ // https://github.com/kubernetes/kubernetes/blob/b47f8263e18c7b13dba33fba23187e5e0477cdbd/pkg/kubelet/stats/helper.go#L68-L71
409+ func getAvailableBytes (memory * cg1.MemoryStat , workingSetBytes uint64 ) uint64 {
410+ // memory limit - working set bytes
411+ if ! isMemoryUnlimited (memory .Usage .Limit ) {
412+ return memory .Usage .Limit - workingSetBytes
413+ }
414+ return 0
415+ }
416+
417+ func getAvailableBytesV2 (memory * cg2.MemoryStat , workingSetBytes uint64 ) uint64 {
418+ // memory limit (memory.max) for cgroupv2 - working set bytes
419+ if ! isMemoryUnlimited (memory .UsageLimit ) {
420+ return memory .UsageLimit - workingSetBytes
421+ }
422+ return 0
423+ }
424+
425+ func (c * criService ) cpuContainerStats (ID string , isSandbox bool , stats interface {}, timestamp time.Time ) (* runtime.CpuUsage , error ) {
426+ switch metrics := stats .(type ) {
427+ case * cg1.Metrics :
428+ metrics .GetCPU ().GetUsage ()
429+ if metrics .CPU != nil && metrics .CPU .Usage != nil {
430+ return & runtime.CpuUsage {
431+ Timestamp : timestamp .UnixNano (),
432+ UsageCoreNanoSeconds : & runtime.UInt64Value {Value : metrics .CPU .Usage .Total },
433+ }, nil
434+ }
435+ case * cg2.Metrics :
436+ if metrics .CPU != nil {
437+ // convert to nano seconds
438+ usageCoreNanoSeconds := metrics .CPU .UsageUsec * 1000
439+
440+ return & runtime.CpuUsage {
441+ Timestamp : timestamp .UnixNano (),
442+ UsageCoreNanoSeconds : & runtime.UInt64Value {Value : usageCoreNanoSeconds },
443+ }, nil
444+ }
445+ default :
446+ return nil , fmt .Errorf ("unexpected metrics type: %T from %s" , metrics , reflect .TypeOf (metrics ).Elem ().PkgPath ())
447+ }
448+ return nil , nil
449+ }
450+
451+ func (c * criService ) memoryContainerStats (ID string , stats interface {}, timestamp time.Time ) (* runtime.MemoryUsage , error ) {
452+ switch metrics := stats .(type ) {
453+ case * cg1.Metrics :
454+ if metrics .Memory != nil && metrics .Memory .Usage != nil {
455+ workingSetBytes := getWorkingSet (metrics .Memory )
456+
457+ return & runtime.MemoryUsage {
458+ Timestamp : timestamp .UnixNano (),
459+ WorkingSetBytes : & runtime.UInt64Value {
460+ Value : workingSetBytes ,
461+ },
462+ AvailableBytes : & runtime.UInt64Value {Value : getAvailableBytes (metrics .Memory , workingSetBytes )},
463+ UsageBytes : & runtime.UInt64Value {Value : metrics .Memory .Usage .Usage },
464+ RssBytes : & runtime.UInt64Value {Value : metrics .Memory .TotalRSS },
465+ PageFaults : & runtime.UInt64Value {Value : metrics .Memory .TotalPgFault },
466+ MajorPageFaults : & runtime.UInt64Value {Value : metrics .Memory .TotalPgMajFault },
467+ }, nil
468+ }
469+ case * cg2.Metrics :
470+ if metrics .Memory != nil {
471+ workingSetBytes := getWorkingSetV2 (metrics .Memory )
472+
473+ return & runtime.MemoryUsage {
474+ Timestamp : timestamp .UnixNano (),
475+ WorkingSetBytes : & runtime.UInt64Value {
476+ Value : workingSetBytes ,
477+ },
478+ AvailableBytes : & runtime.UInt64Value {Value : getAvailableBytesV2 (metrics .Memory , workingSetBytes )},
479+ UsageBytes : & runtime.UInt64Value {Value : metrics .Memory .Usage },
480+ // Use Anon memory for RSS as cAdvisor on cgroupv2
481+ // see https://github.com/google/cadvisor/blob/a9858972e75642c2b1914c8d5428e33e6392c08a/container/libcontainer/handler.go#L799
482+ RssBytes : & runtime.UInt64Value {Value : metrics .Memory .Anon },
483+ PageFaults : & runtime.UInt64Value {Value : metrics .Memory .Pgfault },
484+ MajorPageFaults : & runtime.UInt64Value {Value : metrics .Memory .Pgmajfault },
485+ }, nil
486+ }
487+ default :
488+ return nil , fmt .Errorf ("unexpected metrics type: %T from %s" , metrics , reflect .TypeOf (metrics ).Elem ().PkgPath ())
489+ }
490+ return nil , nil
491+ }
0 commit comments