From 1ac97c2c131a76aa509161f83d318a6ac73df1dd Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 30 May 2025 00:07:34 -0400 Subject: [PATCH] *: properly shutdown non-groupable shims to prevent resource leaks Previously, to address issue #11708, PR #11793 changed containerd to always invoke the shim binary to establish shim connections, rather than reusing the sandbox shim. However, this change did not ensure that the Shutdown API was called to stop the shim process. Starting with containerd v2.0.0, the Shutdown API is only invoked for sandbox containers (when container.SandboxID is empty). This approach works for groupable shims, where multiple containers share a single socket address and only require a single Shutdown call. However, for non-groupable shims, each container requires its own Shutdown call during cleanup to avoid leaking shim processes. Additionally, PR #11793 introduced a corner case during upgrades: - T1: An old container-shim-runc-v2 (<=v1.7.X) is running for pod A. - T2: containerd is upgraded to v2.X.Y. - T3: A new container A-C1 is created in pod A using the new shim-runc-v2 binary. - T4: bootstrap.json indicates version:3 protocol, but it is downgraded to version:2 in memory. - T5: containerd is restarted. - T6: containerd fails to connect to A-C1. - T7: The A-C1 container is left in EXITED status in the CRI plugin. To address this, ensure that loadShimTask downgrades to version:2 if necessary, and always invoke the Shutdown API for each non-groupable shim during cleanup to prevent resource leaks and handle upgrade scenarios correctly. (Introduced by #11793) Signed-off-by: Wei Fu --- core/runtime/v2/shim.go | 5 + core/runtime/v2/shim_load.go | 21 ++- integration/issue10467_linux_test.go | 3 +- integration/release_upgrade_linux_test.go | 217 +++++++++++++++++++--- 4 files changed, 220 insertions(+), 26 deletions(-) diff --git a/core/runtime/v2/shim.go b/core/runtime/v2/shim.go index e279fde2b5a74..8881e0185a9e9 100644 --- a/core/runtime/v2/shim.go +++ b/core/runtime/v2/shim.go @@ -556,6 +556,11 @@ func (s *shimTask) delete(ctx context.Context, sandboxed bool, removeTask func(c removeTask(ctx, s.ID()) } + const supportSandboxAPIVersion = 3 + if _, apiVer := s.ShimInstance.Endpoint(); apiVer < supportSandboxAPIVersion { + sandboxed = false + } + // Don't shutdown sandbox as there may be other containers running. // Let controller decide when to shutdown. if !sandboxed { diff --git a/core/runtime/v2/shim_load.go b/core/runtime/v2/shim_load.go index 5f5e2b64275ed..09a82710b787e 100644 --- a/core/runtime/v2/shim_load.go +++ b/core/runtime/v2/shim_load.go @@ -205,7 +205,26 @@ func loadShimTask(ctx context.Context, bundle *Bundle, onClose func()) (_ *shimT defer cancel() if _, err := s.PID(ctx); err != nil { - return nil, err + if !errdefs.IsNotImplemented(err) { + return nil, err + } + + downgrader, ok := shim.(clientVersionDowngrader) + if ok { + if derr := downgrader.Downgrade(); derr == nil { + log.G(ctx).WithError(err).WithField("id", shim.ID()). + Warning("failed to call task.PID, downgrading client API version to try again") + + s, err = newShimTask(shim) + if err != nil { + return nil, fmt.Errorf("failed to create shim task after downgrading: %w", err) + } + _, err = s.PID(ctx) + } + } + if err != nil { + return nil, err + } } return s, nil } diff --git a/integration/issue10467_linux_test.go b/integration/issue10467_linux_test.go index c9690dda3773b..b039635ee383f 100644 --- a/integration/issue10467_linux_test.go +++ b/integration/issue10467_linux_test.go @@ -66,7 +66,8 @@ func TestIssue10467(t *testing.T) { }) t.Log("Prepare pods for current release") - upgradeCaseFunc, hookFunc := shouldManipulateContainersInPodAfterUpgrade("")(t, 2, previousProc.criRuntimeService(t), previousProc.criImageService(t)) + upgradeCaseFuncs, hookFunc := shouldManipulateContainersInPodAfterUpgrade("")(t, 2, previousProc.criRuntimeService(t), previousProc.criImageService(t)) + upgradeCaseFunc := upgradeCaseFuncs[0] needToCleanup = false require.Nil(t, hookFunc) diff --git a/integration/release_upgrade_linux_test.go b/integration/release_upgrade_linux_test.go index caf825ca62540..e10609df60873 100644 --- a/integration/release_upgrade_linux_test.go +++ b/integration/release_upgrade_linux_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "io" + "net" "os" "os/exec" "path/filepath" @@ -35,10 +36,14 @@ import ( "github.com/stretchr/testify/require" criruntime "k8s.io/cri-api/pkg/apis/runtime/v1" + apitask "github.com/containerd/containerd/api/runtime/task/v3" + shimcore "github.com/containerd/containerd/v2/core/runtime/v2" cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis" "github.com/containerd/containerd/v2/integration/images" "github.com/containerd/containerd/v2/integration/remote" "github.com/containerd/containerd/v2/pkg/namespaces" + shimbinary "github.com/containerd/containerd/v2/pkg/shim" + "github.com/containerd/ttrpc" ) // upgradeVerifyCaseFunc is used to verify the behavior after upgrade. @@ -47,7 +52,11 @@ type upgradeVerifyCaseFunc func(*testing.T, cri.RuntimeService, cri.ImageManager // beforeUpgradeHookFunc is a hook before upgrade. type beforeUpgradeHookFunc func(*testing.T) -type setupUpgradeVerifyCase func(*testing.T, int, cri.RuntimeService, cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc) +// setupUpgradeVerifyCase returns a list of upgradeVerifyCaseFunc. +// +// Each upgradeVerifyCaseFunc is used to verify the behavior after restarting +// with current release. +type setupUpgradeVerifyCase func(*testing.T, int, cri.RuntimeService, cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) // TODO: Support Windows func TestUpgrade(t *testing.T) { @@ -65,6 +74,9 @@ func TestUpgrade(t *testing.T) { if version == "1.7" { t.Run("recover-ungroupable-shim", runUpgradeTestCaseWithExistingConfig(version, previousReleaseBinDir, true, shouldManipulateContainersInPodAfterUpgrade("runcv1"))) + + t.Run("should-address-shim-version-mismatches", + runUpgradeTestCase(version, previousReleaseBinDir, shouldAdjustShimVersionDuringRestarting)) } }) } @@ -73,7 +85,7 @@ func TestUpgrade(t *testing.T) { func runUpgradeTestCase( previousVersion string, previousReleaseBinDir string, - setupUpgradeVerifyCase func(*testing.T, int, cri.RuntimeService, cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc), + setupUpgradeVerifyCase func(*testing.T, int, cri.RuntimeService, cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc), ) func(t *testing.T) { return runUpgradeTestCaseWithExistingConfig( previousVersion, @@ -89,7 +101,7 @@ func runUpgradeTestCaseWithExistingConfig( previousVersion string, previousReleaseBinDir string, usingExistingConfig bool, - setupUpgradeVerifyCase func(*testing.T, int, cri.RuntimeService, cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc), + setupUpgradeVerifyCase func(*testing.T, int, cri.RuntimeService, cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc), ) func(t *testing.T) { return func(t *testing.T) { // NOTE: Using t.TempDir() here is to ensure there are no leaky @@ -126,7 +138,7 @@ func runUpgradeTestCaseWithExistingConfig( }) t.Log("Prepare pods for current release") - upgradeCaseFunc, hookFunc := setupUpgradeVerifyCase(t, taskVersion, previousProc.criRuntimeService(t), previousProc.criImageService(t)) + upgradeCaseFuncs, hookFunc := setupUpgradeVerifyCase(t, taskVersion, previousProc.criRuntimeService(t), previousProc.criImageService(t)) needToCleanup = false t.Log("Gracefully stop previous release's containerd process") @@ -155,13 +167,86 @@ func runUpgradeTestCaseWithExistingConfig( require.NoError(t, currentProc.wait(5*time.Minute)) }) - t.Log("Verifing") - upgradeCaseFunc(t, currentProc.criRuntimeService(t), currentProc.criImageService(t)) + for idx, upgradeCaseFunc := range upgradeCaseFuncs { + t.Logf("Verifing upgrade case %d", idx+1) + upgradeCaseFunc(t, currentProc.criRuntimeService(t), currentProc.criImageService(t)) + + if idx == len(upgradeCaseFuncs)-1 { + break + } + + t.Log("Gracefully restarting containerd process") + require.NoError(t, currentProc.kill(syscall.SIGTERM)) + require.NoError(t, currentProc.wait(5*time.Minute)) + currentProc = newCtrdProc(t, "containerd", workDir, nil) + require.NoError(t, currentProc.isReady()) + } } } +// shouldAdjustShimVersionDuringRestarting verifies that the shim manager +// can handle shim proto version mismatches during a containerd restart. +// +// Steps: +// 1. Use containerd-shim-runc-v2 from v1.7.x to set up a running pod. +// 2. After upgrading, use the new containerd-shim-runc-v2 to create a new container in the same pod. +// The new shim returns bootstrap.json with version 3. +// The shim manager auto-downgrades the version to 2, but does not update bootstrap.json. +// 3. Restart the containerd process; the new container should be recovered successfully. +func shouldAdjustShimVersionDuringRestarting(t *testing.T, _ int, + rSvc cri.RuntimeService, iSvc cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + + var busyboxImage = images.Get(images.BusyBox) + + pullImagesByCRI(t, iSvc, busyboxImage) + + podCtx := newPodTCtx(t, rSvc, "running-pod", "sandbox") + + cntr1 := podCtx.createContainer("running", busyboxImage, + criruntime.ContainerState_CONTAINER_RUNNING, + WithCommand("sleep", "1d")) + + var cntr2 string + + createNewContainerInPodFunc := func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { + t.Log("Creating new container in the previous pod") + cntr2 = podCtx.createContainer("new-container", busyboxImage, + criruntime.ContainerState_CONTAINER_RUNNING, + WithCommand("sleep", "1d")) + } + + shouldBeRunningFunc := func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { + t.Log("Checking the running container in the previous pod") + + pods, err := rSvc.ListPodSandbox(nil) + require.NoError(t, err) + require.Len(t, pods, 1) + + cntrs, err := rSvc.ListContainers(&criruntime.ContainerFilter{ + PodSandboxId: pods[0].Id, + }) + require.NoError(t, err) + require.Len(t, cntrs, 2) + + for _, cntr := range cntrs { + switch cntr.Id { + case cntr1: + assert.Equal(t, criruntime.ContainerState_CONTAINER_RUNNING.String(), cntr.State.String()) + case cntr2: + assert.Equal(t, criruntime.ContainerState_CONTAINER_RUNNING.String(), cntr.State.String()) + default: + t.Errorf("unexpected container %s in pod %s", cntr.Id, pods[0].Id) + } + } + } + return []upgradeVerifyCaseFunc{ + createNewContainerInPodFunc, + shouldBeRunningFunc, + }, nil +} + func shouldRecoverAllThePodsAfterUpgrade(t *testing.T, taskVersion int, - rSvc cri.RuntimeService, iSvc cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + rSvc cri.RuntimeService, iSvc cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { var busyboxImage = images.Get(images.BusyBox) @@ -197,7 +282,7 @@ func shouldRecoverAllThePodsAfterUpgrade(t *testing.T, taskVersion int, syscall.Kill(thirdPodShimPid, syscall.SIGKILL) } - return func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { + verifyFunc := func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { t.Log("List Pods") pods, err := rSvc.ListPodSandbox(nil) @@ -246,11 +331,12 @@ func shouldRecoverAllThePodsAfterUpgrade(t *testing.T, taskVersion int, t.Errorf("unexpected pod %s", pod.Id) } } - }, hookFunc + } + return []upgradeVerifyCaseFunc{verifyFunc}, hookFunc } func execToExistingContainer(t *testing.T, _ int, - rSvc cri.RuntimeService, iSvc cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + rSvc cri.RuntimeService, iSvc cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { var busyboxImage = images.Get(images.BusyBox) @@ -269,7 +355,7 @@ func execToExistingContainer(t *testing.T, _ int, // NOTE: Wait for containerd to flush data into log time.Sleep(2 * time.Second) - return func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { + verifyFunc := func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { pods, err := rSvc.ListPodSandbox(nil) require.NoError(t, err) require.Len(t, pods, 1) @@ -304,7 +390,8 @@ func execToExistingContainer(t *testing.T, _ int, require.NoError(t, err) require.Len(t, stderr, 0) require.Equal(t, "true", string(stdout)) - }, nil + } + return []upgradeVerifyCaseFunc{verifyFunc}, nil } // getFileSize returns file's size. @@ -315,7 +402,9 @@ func getFileSize(t *testing.T, filePath string) int64 { } func shouldManipulateContainersInPodAfterUpgrade(runtimeHandler string) setupUpgradeVerifyCase { - return func(t *testing.T, _ int, rSvc cri.RuntimeService, iSvc cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + return func(t *testing.T, taskVersion int, rSvc cri.RuntimeService, iSvc cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + shimConns := []shimConn{} + var busyboxImage = images.Get(images.BusyBox) pullImagesByCRI(t, iSvc, busyboxImage) @@ -326,6 +415,9 @@ func shouldManipulateContainersInPodAfterUpgrade(runtimeHandler string) setupUpg criruntime.ContainerState_CONTAINER_RUNNING, WithCommand("sleep", "1d")) + t.Logf("Building shim connect for container %s", cntr1) + shimConns = append(shimConns, buildShimClientFromBundle(t, rSvc, cntr1)) + cntr2 := podCtx.createContainer("created", busyboxImage, criruntime.ContainerState_CONTAINER_CREATED, WithCommand("sleep", "1d")) @@ -334,7 +426,7 @@ func shouldManipulateContainersInPodAfterUpgrade(runtimeHandler string) setupUpg criruntime.ContainerState_CONTAINER_EXITED, WithCommand("sleep", "1d")) - return func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { + verifyFunc := func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { // TODO(fuweid): make svc re-connect to new socket podCtx.rSvc = rSvc @@ -370,6 +462,9 @@ func shouldManipulateContainersInPodAfterUpgrade(runtimeHandler string) setupUpg require.NoError(t, rSvc.StartContainer(cntr2)) checkContainerState(t, rSvc, cntr2, criruntime.ContainerState_CONTAINER_RUNNING) + t.Logf("Building shim connect for container %s", cntr2) + shimConns = append(shimConns, buildShimClientFromBundle(t, rSvc, cntr2)) + t.Logf("Stopping running container %s", cntr2) require.NoError(t, rSvc.StopContainer(cntr2, 0)) checkContainerState(t, rSvc, cntr2, criruntime.ContainerState_CONTAINER_EXITED) @@ -388,10 +483,13 @@ func shouldManipulateContainersInPodAfterUpgrade(runtimeHandler string) setupUpg require.True(t, os.IsNotExist(err)) // Create a new container in the previous pod, start, stop, and remove it - podCtx.createContainer("runinpreviouspod", busyboxImage, - criruntime.ContainerState_CONTAINER_EXITED, + cntr4 := podCtx.createContainer("runinpreviouspod", busyboxImage, + criruntime.ContainerState_CONTAINER_RUNNING, WithCommand("sleep", "1d")) + t.Logf("Building shim connect for container %s", cntr4) + shimConns = append(shimConns, buildShimClientFromBundle(t, rSvc, cntr4)) + podCtx.stop(true) podDataDir := podCtx.dataDir() @@ -407,21 +505,33 @@ func shouldManipulateContainersInPodAfterUpgrade(runtimeHandler string) setupUpg t.Log("Creating new running container in new pod") pod2Ctx := newPodTCtxWithRuntimeHandler(t, rSvc, "running-pod-2", "sandbox", runtimeHandler) - pod2Ctx.createContainer("running", busyboxImage, + pod2Cntr := pod2Ctx.createContainer("running", busyboxImage, criruntime.ContainerState_CONTAINER_RUNNING, WithCommand("sleep", "1d")) - }, nil + t.Logf("Building shim connect for container %s", pod2Cntr) + shimConns = append(shimConns, buildShimClientFromBundle(t, rSvc, pod2Cntr)) + + pod2Ctx.stop(true) + + // If connection is closed, it means the shim process exits. + for _, shimCli := range shimConns { + t.Logf("Checking container %s's shim client", shimCli.cntrID) + _, err = shimCli.cli.Connect(context.Background(), &apitask.ConnectRequest{}) + assert.ErrorContains(t, err, "ttrpc: closed", "should be closed after deleting pod") + } + } + return []upgradeVerifyCaseFunc{verifyFunc}, nil } } func shouldRecoverExistingImages(t *testing.T, _ int, - _ cri.RuntimeService, iSvc cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + _ cri.RuntimeService, iSvc cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { images := []string{images.Get(images.BusyBox), images.Get(images.Alpine)} expectedRefs := pullImagesByCRI(t, iSvc, images...) - return func(t *testing.T, _ cri.RuntimeService, iSvc cri.ImageManagerService) { + verifyFunc := func(t *testing.T, _ cri.RuntimeService, iSvc cri.ImageManagerService) { t.Log("List all images") res, err := iSvc.ListImages(nil) require.NoError(t, err) @@ -433,13 +543,14 @@ func shouldRecoverExistingImages(t *testing.T, _ int, require.NoError(t, err) require.Equal(t, expectedRefs[idx], gotImg.Id) } - }, nil + } + return []upgradeVerifyCaseFunc{verifyFunc}, nil } // shouldParseMetricDataCorrectly is to check new release containerd can parse // metric data from existing shim created by previous release. func shouldParseMetricDataCorrectly(t *testing.T, _ int, - rSvc cri.RuntimeService, iSvc cri.ImageManagerService) (upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { + rSvc cri.RuntimeService, iSvc cri.ImageManagerService) ([]upgradeVerifyCaseFunc, beforeUpgradeHookFunc) { imageName := images.Get(images.BusyBox) pullImagesByCRI(t, iSvc, imageName) @@ -481,7 +592,7 @@ done WithLogPath(cntrLogName), ) - return func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { + verifyFunc := func(t *testing.T, rSvc cri.RuntimeService, _ cri.ImageManagerService) { checkContainerState(t, rSvc, cntr, criruntime.ContainerState_CONTAINER_RUNNING) logPath := filepath.Join(podLogDir, cntrLogName) @@ -508,7 +619,8 @@ done // NOTE: Just in case that part of inactive cache has been reclaimed. expectedBytes := uint64(fileSize * 2 / 3) require.True(t, stats.GetMemory().GetUsageBytes().GetValue() > expectedBytes) - }, nil + } + return []upgradeVerifyCaseFunc{verifyFunc}, nil } func newPodTCtx(t *testing.T, rSvc cri.RuntimeService, @@ -596,6 +708,63 @@ func (pCtx *podTCtx) shimPid(version int) uint32 { return shimPid(ctx, t, shimCli) } +// shimConn is a wrapper for shim client with container ID. +type shimConn struct { + cntrID string + cli shimcore.TaskServiceClient +} + +// buildShimClientFromBundle builds a shim client from the bundle directory of the container. +func buildShimClientFromBundle(t *testing.T, rSvc cri.RuntimeService, cid string) shimConn { + cfg := criRuntimeInfo(t, rSvc) + + bundleDir := filepath.Join( + filepath.Dir(cfg["stateDir"].(string)), + "io.containerd.runtime.v2.task", + "k8s.io", + cid, + ) + + t.Logf("Building shim client from bundle %s for container %s", bundleDir, cid) + + version := 2 + addr := "" + + bootstrapJSON := filepath.Join(bundleDir, "bootstrap.json") + addressPath := filepath.Join(bundleDir, "address") + + _, err := os.Stat(bootstrapJSON) + switch { + case err == nil: + rawJSON, err := os.ReadFile(bootstrapJSON) + require.NoError(t, err, "failed to read bootstrap.json for container %s", cid) + var bootstrapData map[string]interface{} + err = json.Unmarshal(rawJSON, &bootstrapData) + require.NoError(t, err, "failed to unmarshal bootstrap.json for container %s", cid) + + version = int(bootstrapData["version"].(float64)) + addr = strings.TrimPrefix(bootstrapData["address"].(string), "unix://") + + case os.IsNotExist(err): + address, err := shimbinary.ReadAddress(addressPath) + require.NoError(t, err, "failed to read address for container %s", cid) + addr = strings.TrimPrefix(address, "unix://") + default: + require.NoError(t, err, "failed to stat bootstrap.json for container %s", cid) + } + + conn, err := net.Dial("unix", addr) + require.NoError(t, err) + + client := ttrpc.NewClient(conn) + cli, err := shimcore.NewTaskClient(client, version) + require.NoError(t, err) + return shimConn{ + cntrID: cid, + cli: cli, + } +} + // dataDir returns pod metadata dir maintained by CRI plugin. func (pCtx *podTCtx) dataDir() string { t := pCtx.t