Skip to content

Commit 4499128

Browse files
committed
fix: shimv1 leak issue
```go // Delete the initial process and container func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) { p, err := s.getInitProcess() if err != nil { return nil, err } if err := p.Delete(ctx); err != nil { return nil, errdefs.ToGRPC(err) } // The client might canceled the request but the shim service still // moved on. The `delete(s.processes, s.id)` was executed // successfully. So the next Delete call will return `container // must be created` error. The client side should ignore this // issue. s.mu.Lock() delete(s.processes, s.id) s.mu.Unlock() s.platform.Close() return &shimapi.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), Pid: uint32(p.Pid()), }, nil } ``` introduced by containerd#9003 fixes: containerd#9309 Signed-off-by: Wei Fu <[email protected]>
1 parent ebe25d0 commit 4499128

3 files changed

Lines changed: 68 additions & 24 deletions

File tree

integration/issue7496_linux_test.go

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ import (
2828
"testing"
2929
"time"
3030

31-
apitask "github.com/containerd/containerd/api/runtime/task/v2"
31+
apitaskv2 "github.com/containerd/containerd/api/runtime/task/v2"
3232
"github.com/containerd/containerd/integration/images"
3333
"github.com/containerd/containerd/namespaces"
34+
apitaskv1 "github.com/containerd/containerd/runtime/v1/shim/v1"
3435
"github.com/containerd/containerd/runtime/v2/shim"
3536
"github.com/containerd/ttrpc"
3637
"github.com/stretchr/testify/assert"
@@ -47,9 +48,7 @@ func TestIssue7496(t *testing.T) {
4748
require.NoError(t, err)
4849

4950
typ := criCfg.ContainerdConfig.Runtimes[criCfg.ContainerdConfig.DefaultRuntimeName].Type
50-
if !strings.HasSuffix(typ, "runc.v2") {
51-
t.Skipf("default runtime should be runc.v2, but it's not: %s", typ)
52-
}
51+
isShimV1 := typ == "io.containerd.runtime.v1.linux"
5352

5453
ctx := namespaces.WithNamespace(context.Background(), "k8s.io")
5554

@@ -58,14 +57,14 @@ func TestIssue7496(t *testing.T) {
5857
sbID, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler)
5958
require.NoError(t, err)
6059

61-
shimCli := connectToShim(ctx, t, sbID)
60+
sCli := newShimCli(ctx, t, sbID, isShimV1)
6261

6362
delayInSec := 12
6463
t.Logf("[shim pid: %d]: Injecting %d seconds delay to umount2 syscall",
65-
shimPid(ctx, t, shimCli),
64+
sCli.pid(ctx, t),
6665
delayInSec)
6766

68-
doneCh := injectDelayToUmount2(ctx, t, shimCli, delayInSec /* CRI plugin uses 10 seconds to delete task */)
67+
doneCh := injectDelayToUmount2(ctx, t, int(sCli.pid(ctx, t)), delayInSec /* CRI plugin uses 10 seconds to delete task */)
6968

7069
t.Logf("Create a container config and run container in a pod")
7170
pauseImage := images.Get(images.Pause)
@@ -103,13 +102,13 @@ func TestIssue7496(t *testing.T) {
103102
t.Logf("PodSandbox %s has been deleted and start to wait for strace exit", sbID)
104103
select {
105104
case <-time.After(15 * time.Second):
106-
resp, err := shimCli.Connect(ctx, &apitask.ConnectRequest{})
105+
shimPid, err := sCli.connect(ctx)
107106
assert.Error(t, err, "should failed to call shim connect API")
108107

109108
t.Errorf("Strace doesn't exit in time")
110109

111-
t.Logf("Cleanup the shim (pid: %d)", resp.GetShimPid())
112-
syscall.Kill(int(resp.GetShimPid()), syscall.SIGKILL)
110+
t.Logf("Cleanup the shim (pid: %d)", shimPid)
111+
syscall.Kill(int(shimPid), syscall.SIGKILL)
113112
<-doneCh
114113
case <-doneCh:
115114
}
@@ -120,13 +119,11 @@ func TestIssue7496(t *testing.T) {
120119
// example, umount overlayfs rootfs which doesn't with volatile.
121120
//
122121
// REF: https://man7.org/linux/man-pages/man1/strace.1.html
123-
func injectDelayToUmount2(ctx context.Context, t *testing.T, shimCli apitask.TaskService, delayInSec int) chan struct{} {
124-
pid := shimPid(ctx, t, shimCli)
125-
122+
func injectDelayToUmount2(ctx context.Context, t *testing.T, shimPid, delayInSec int) chan struct{} {
126123
doneCh := make(chan struct{})
127124

128125
cmd := exec.CommandContext(ctx, "strace",
129-
"-p", strconv.Itoa(int(pid)), "-f", // attach to all the threads
126+
"-p", strconv.Itoa(shimPid), "-f", // attach to all the threads
130127
"--detach-on=execve", // stop to attach runc child-processes
131128
"--trace=umount2", // only trace umount2 syscall
132129
"-e", "inject=umount2:delay_enter="+strconv.Itoa(delayInSec)+"s",
@@ -162,7 +159,14 @@ func injectDelayToUmount2(ctx context.Context, t *testing.T, shimCli apitask.Tas
162159
return doneCh
163160
}
164161

165-
func connectToShim(ctx context.Context, t *testing.T, id string) apitask.TaskService {
162+
type shimCli struct {
163+
isV1 bool
164+
165+
cliV1 apitaskv1.ShimService
166+
cliV2 apitaskv2.TaskService
167+
}
168+
169+
func newShimCli(ctx context.Context, t *testing.T, id string, isV1 bool) *shimCli {
166170
addr, err := shim.SocketAddress(ctx, containerdEndpoint, id)
167171
require.NoError(t, err)
168172
addr = strings.TrimPrefix(addr, "unix://")
@@ -171,11 +175,34 @@ func connectToShim(ctx context.Context, t *testing.T, id string) apitask.TaskSer
171175
require.NoError(t, err)
172176

173177
client := ttrpc.NewClient(conn)
174-
return apitask.NewTaskClient(client)
178+
179+
cli := &shimCli{isV1: isV1}
180+
if isV1 {
181+
cli.cliV1 = apitaskv1.NewShimClient(client)
182+
} else {
183+
cli.cliV2 = apitaskv2.NewTaskClient(client)
184+
}
185+
return cli
186+
}
187+
188+
func (cli *shimCli) connect(ctx context.Context) (uint32, error) {
189+
if cli.isV1 {
190+
resp, err := cli.cliV1.ShimInfo(ctx, nil)
191+
if err != nil {
192+
return 0, err
193+
}
194+
return resp.GetShimPid(), nil
195+
}
196+
197+
resp, err := cli.cliV2.Connect(ctx, nil)
198+
if err != nil {
199+
return 0, err
200+
}
201+
return resp.GetShimPid(), nil
175202
}
176203

177-
func shimPid(ctx context.Context, t *testing.T, shimCli apitask.TaskService) uint32 {
178-
resp, err := shimCli.Connect(ctx, &apitask.ConnectRequest{})
204+
func (cli *shimCli) pid(ctx context.Context, t *testing.T) uint32 {
205+
pid, err := cli.connect(ctx)
179206
require.NoError(t, err)
180-
return resp.GetShimPid()
207+
return pid
181208
}

integration/issue7496_shutdown_linux_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ package integration
1818

1919
import (
2020
"context"
21+
"strings"
2122
"testing"
2223

2324
"github.com/stretchr/testify/require"
2425

25-
apitask "github.com/containerd/containerd/api/runtime/task/v2"
2626
"github.com/containerd/containerd/namespaces"
2727
)
2828

@@ -38,6 +38,15 @@ func TestIssue7496_ShouldRetryShutdown(t *testing.T) {
3838
// TODO: re-enable if we can retry Shutdown API.
3939
t.Skipf("Please re-enable me if we can retry Shutdown API")
4040

41+
t.Logf("Checking CRI config's default runtime")
42+
criCfg, err := CRIConfig()
43+
require.NoError(t, err)
44+
45+
typ := criCfg.ContainerdConfig.Runtimes[criCfg.ContainerdConfig.DefaultRuntimeName].Type
46+
if !strings.HasSuffix(typ, "runc.v2") {
47+
t.Skipf("default runtime should be runc.v2, but it's not: %s", typ)
48+
}
49+
4150
ctx := namespaces.WithNamespace(context.Background(), "k8s.io")
4251

4352
t.Logf("Create a pod config with shutdown failpoint")
@@ -51,15 +60,15 @@ func TestIssue7496_ShouldRetryShutdown(t *testing.T) {
5160
require.NoError(t, err)
5261

5362
t.Logf("Connect to the shim %s", sbID)
54-
shimCli := connectToShim(ctx, t, sbID)
63+
sCli := newShimCli(ctx, t, sbID, false)
5564

56-
t.Logf("Log shim %s's pid: %d", sbID, shimPid(ctx, t, shimCli))
65+
t.Logf("Log shim %s's pid: %d", sbID, sCli.pid(ctx, t))
5766

5867
t.Logf("StopPodSandbox and RemovePodSandbox")
5968
require.NoError(t, runtimeService.StopPodSandbox(sbID))
6069
require.NoError(t, runtimeService.RemovePodSandbox(sbID))
6170

6271
t.Logf("Check the shim connection")
63-
_, err = shimCli.Connect(ctx, &apitask.ConnectRequest{})
72+
_, err = sCli.connect(ctx)
6473
require.Error(t, err, "should failed to call shim connect API")
6574
}

runtime/v1/linux/task.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"context"
2323
"errors"
2424
"fmt"
25+
"strings"
2526
"sync"
2627

2728
cgroups "github.com/containerd/cgroups/v3/cgroup1"
@@ -94,7 +95,14 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
9495
rsp, shimErr := t.shim.Delete(ctx, empty)
9596
if shimErr != nil {
9697
shimErr = errdefs.FromGRPC(shimErr)
97-
if !errdefs.IsNotFound(shimErr) {
98+
if !errdefs.IsNotFound(shimErr) &&
99+
// NOTE: The last Detete call has deleted the init process
100+
// record in shim service. However, the last call took
101+
// so long and then the client side canceled the call.
102+
// After the client retries the Delete, the shim service
103+
// doesn't find the init process and returns `container
104+
// must be created`. We should tolerate this issue.
105+
!(errdefs.IsFailedPrecondition(shimErr) && strings.Contains(shimErr.Error(), "container must be created")) {
98106
return nil, shimErr
99107
}
100108
}

0 commit comments

Comments
 (0)