Skip to content

Commit 8b51a95

Browse files
committed
fix: shimv1 leak issue
```go // Delete the initial process and container func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) { p, err := s.getInitProcess() if err != nil { return nil, err } if err := p.Delete(ctx); err != nil { return nil, errdefs.ToGRPC(err) } // The client might canceled the request but the shim service still // moved on. The `delete(s.processes, s.id)` was executed // successfully. So the next Delete call will return `container // must be created` error. The client side should ignore this // issue. s.mu.Lock() delete(s.processes, s.id) s.mu.Unlock() s.platform.Close() return &shimapi.DeleteResponse{ ExitStatus: uint32(p.ExitStatus()), ExitedAt: protobuf.ToTimestamp(p.ExitedAt()), Pid: uint32(p.Pid()), }, nil } ``` introduced by #9004 fixes: #9309 Signed-off-by: Wei Fu <[email protected]> (cherry picked from commit 4499128) Signed-off-by: Wei Fu <[email protected]>
1 parent 5a75c52 commit 8b51a95

2 files changed

Lines changed: 56 additions & 20 deletions

File tree

integration/issue7496_linux_test.go

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ import (
2929
"time"
3030

3131
"github.com/containerd/containerd/namespaces"
32+
apitaskv1 "github.com/containerd/containerd/runtime/v1/shim/v1"
3233
"github.com/containerd/containerd/runtime/v2/shim"
33-
apitask "github.com/containerd/containerd/runtime/v2/task"
34+
apitaskv2 "github.com/containerd/containerd/runtime/v2/task"
3435
"github.com/containerd/ttrpc"
36+
gogotypes "github.com/gogo/protobuf/types"
3537
"github.com/stretchr/testify/assert"
3638
"github.com/stretchr/testify/require"
3739
exec "golang.org/x/sys/execabs"
@@ -46,9 +48,7 @@ func TestIssue7496(t *testing.T) {
4648
require.NoError(t, err)
4749

4850
typ := criCfg.ContainerdConfig.Runtimes[criCfg.ContainerdConfig.DefaultRuntimeName].Type
49-
if !strings.HasSuffix(typ, "runc.v2") {
50-
t.Skipf("default runtime should be runc.v2, but it's not: %s", typ)
51-
}
51+
isShimV1 := typ == "io.containerd.runtime.v1.linux"
5252

5353
ctx := namespaces.WithNamespace(context.Background(), "k8s.io")
5454

@@ -57,14 +57,14 @@ func TestIssue7496(t *testing.T) {
5757
sbID, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler)
5858
require.NoError(t, err)
5959

60-
shimCli := connectToShim(ctx, t, sbID)
60+
sCli := newShimCli(ctx, t, sbID, isShimV1)
6161

6262
delayInSec := 12
6363
t.Logf("[shim pid: %d]: Injecting %d seconds delay to umount2 syscall",
64-
shimPid(ctx, t, shimCli),
64+
sCli.pid(ctx, t),
6565
delayInSec)
6666

67-
doneCh := injectDelayToUmount2(ctx, t, shimCli, delayInSec /* CRI plugin uses 10 seconds to delete task */)
67+
doneCh := injectDelayToUmount2(ctx, t, int(sCli.pid(ctx, t)), delayInSec /* CRI plugin uses 10 seconds to delete task */)
6868

6969
t.Logf("Create a container config and run container in a pod")
7070
pauseImage := GetImage(Pause)
@@ -102,13 +102,13 @@ func TestIssue7496(t *testing.T) {
102102
t.Logf("PodSandbox %s has been deleted and start to wait for strace exit", sbID)
103103
select {
104104
case <-time.After(15 * time.Second):
105-
resp, err := shimCli.Connect(ctx, &apitask.ConnectRequest{})
105+
shimPid, err := sCli.connect(ctx)
106106
assert.Error(t, err, "should failed to call shim connect API")
107107

108108
t.Errorf("Strace doesn't exit in time")
109109

110-
t.Logf("Cleanup the shim (pid: %d)", resp.ShimPid)
111-
syscall.Kill(int(resp.ShimPid), syscall.SIGKILL)
110+
t.Logf("Cleanup the shim (pid: %d)", shimPid)
111+
syscall.Kill(int(shimPid), syscall.SIGKILL)
112112
<-doneCh
113113
case <-doneCh:
114114
}
@@ -119,13 +119,11 @@ func TestIssue7496(t *testing.T) {
119119
// example, umount overlayfs rootfs which doesn't with volatile.
120120
//
121121
// REF: https://man7.org/linux/man-pages/man1/strace.1.html
122-
func injectDelayToUmount2(ctx context.Context, t *testing.T, shimCli apitask.TaskService, delayInSec int) chan struct{} {
123-
pid := shimPid(ctx, t, shimCli)
124-
122+
func injectDelayToUmount2(ctx context.Context, t *testing.T, shimPid, delayInSec int) chan struct{} {
125123
doneCh := make(chan struct{})
126124

127125
cmd := exec.CommandContext(ctx, "strace",
128-
"-p", strconv.Itoa(int(pid)), "-f", // attach to all the threads
126+
"-p", strconv.Itoa(shimPid), "-f", // attach to all the threads
129127
"--detach-on=execve", // stop to attach runc child-processes
130128
"--trace=umount2", // only trace umount2 syscall
131129
"-e", "inject=umount2:delay_enter="+strconv.Itoa(delayInSec)+"s",
@@ -161,7 +159,14 @@ func injectDelayToUmount2(ctx context.Context, t *testing.T, shimCli apitask.Tas
161159
return doneCh
162160
}
163161

164-
func connectToShim(ctx context.Context, t *testing.T, id string) apitask.TaskService {
162+
type shimCli struct {
163+
isV1 bool
164+
165+
cliV1 apitaskv1.ShimService
166+
cliV2 apitaskv2.TaskService
167+
}
168+
169+
func newShimCli(ctx context.Context, t *testing.T, id string, isV1 bool) *shimCli {
165170
addr, err := shim.SocketAddress(ctx, containerdEndpoint, id)
166171
require.NoError(t, err)
167172
addr = strings.TrimPrefix(addr, "unix://")
@@ -170,11 +175,34 @@ func connectToShim(ctx context.Context, t *testing.T, id string) apitask.TaskSer
170175
require.NoError(t, err)
171176

172177
client := ttrpc.NewClient(conn)
173-
return apitask.NewTaskClient(client)
178+
179+
cli := &shimCli{isV1: isV1}
180+
if isV1 {
181+
cli.cliV1 = apitaskv1.NewShimClient(client)
182+
} else {
183+
cli.cliV2 = apitaskv2.NewTaskClient(client)
184+
}
185+
return cli
186+
}
187+
188+
func (cli *shimCli) connect(ctx context.Context) (uint32, error) {
189+
if cli.isV1 {
190+
resp, err := cli.cliV1.ShimInfo(ctx, &gogotypes.Empty{})
191+
if err != nil {
192+
return 0, err
193+
}
194+
return resp.ShimPid, nil
195+
}
196+
197+
resp, err := cli.cliV2.Connect(ctx, &apitaskv2.ConnectRequest{})
198+
if err != nil {
199+
return 0, err
200+
}
201+
return resp.ShimPid, nil
174202
}
175203

176-
func shimPid(ctx context.Context, t *testing.T, shimCli apitask.TaskService) uint32 {
177-
resp, err := shimCli.Connect(ctx, &apitask.ConnectRequest{})
204+
func (cli *shimCli) pid(ctx context.Context, t *testing.T) uint32 {
205+
pid, err := cli.connect(ctx)
178206
require.NoError(t, err)
179-
return resp.ShimPid
207+
return pid
180208
}

runtime/v1/linux/task.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"context"
2424
"errors"
2525
"fmt"
26+
"strings"
2627
"sync"
2728

2829
"github.com/containerd/cgroups"
@@ -96,7 +97,14 @@ func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
9697
rsp, shimErr := t.shim.Delete(ctx, empty)
9798
if shimErr != nil {
9899
shimErr = errdefs.FromGRPC(shimErr)
99-
if !errdefs.IsNotFound(shimErr) {
100+
if !errdefs.IsNotFound(shimErr) &&
101+
// NOTE: The last Detete call has deleted the init process
102+
// record in shim service. However, the last call took
103+
// so long and then the client side canceled the call.
104+
// After the client retries the Delete, the shim service
105+
// doesn't find the init process and returns `container
106+
// must be created`. We should tolerate this issue.
107+
!(errdefs.IsFailedPrecondition(shimErr) && strings.Contains(shimErr.Error(), "container must be created")) {
100108
return nil, shimErr
101109
}
102110
}

0 commit comments

Comments
 (0)