Skip to content

Commit 0717fe6

Browse files
yulianedyalkovafuweid
authored andcommitted
Use named pipes for shim logs
Relating to issue [#2606](#2606) Co-authored-by: Oliver Stenbom <[email protected]> Co-authored-by: Georgi Sabev <[email protected]> Co-authored-by: Giuseppe Capizzi <[email protected]> Co-authored-by: Danail Branekov <[email protected]> Signed-off-by: Oliver Stenbom <[email protected]> Signed-off-by: Georgi Sabev <[email protected]> Signed-off-by: Giuseppe Capizzi <[email protected]> Signed-off-by: Danail Branekov <[email protected]> (cherry picked from commit 1d4105c) Signed-off-by: Wei Fu <[email protected]>
1 parent ea75cfc commit 0717fe6

6 files changed

Lines changed: 370 additions & 14 deletions

File tree

client_test.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"context"
2222
"flag"
2323
"fmt"
24+
"io"
25+
"io/ioutil"
2426
"os"
2527
"os/exec"
2628
"testing"
@@ -36,11 +38,12 @@ import (
3638
)
3739

3840
var (
39-
address string
40-
noDaemon bool
41-
noCriu bool
42-
supportsCriu bool
43-
testNamespace = "testing"
41+
address string
42+
noDaemon bool
43+
noCriu bool
44+
supportsCriu bool
45+
testNamespace = "testing"
46+
ctrdStdioFilePath string
4447

4548
ctrd = &daemon{}
4649
)
@@ -76,13 +79,26 @@ func TestMain(m *testing.M) {
7679
if !noDaemon {
7780
sys.ForceRemoveAll(defaultRoot)
7881

79-
err := ctrd.start("containerd", address, []string{
82+
stdioFile, err := ioutil.TempFile("", "")
83+
if err != nil {
84+
fmt.Fprintf(os.Stderr, "could not create a new stdio temp file: %s\n", err)
85+
os.Exit(1)
86+
}
87+
defer func() {
88+
stdioFile.Close()
89+
os.Remove(stdioFile.Name())
90+
}()
91+
ctrdStdioFilePath = stdioFile.Name()
92+
stdioWriter := io.MultiWriter(stdioFile, buf)
93+
94+
err = ctrd.start("containerd", address, []string{
8095
"--root", defaultRoot,
8196
"--state", defaultState,
8297
"--log-level", "debug",
83-
}, buf, buf)
98+
"--config", createShimDebugConfig(),
99+
}, stdioWriter, stdioWriter)
84100
if err != nil {
85-
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
101+
fmt.Fprintf(os.Stderr, "%s: %s\n", err, buf.String())
86102
os.Exit(1)
87103
}
88104
}
@@ -138,6 +154,7 @@ func TestMain(m *testing.M) {
138154
fmt.Fprintln(os.Stderr, "failed to wait for containerd", err)
139155
}
140156
}
157+
141158
if err := sys.ForceRemoveAll(defaultRoot); err != nil {
142159
fmt.Fprintln(os.Stderr, "failed to remove test root dir", err)
143160
os.Exit(1)
@@ -344,3 +361,19 @@ func TestClientReconnect(t *testing.T) {
344361
t.Errorf("client closed returned error %v", err)
345362
}
346363
}
364+
365+
func createShimDebugConfig() string {
366+
f, err := ioutil.TempFile("", "containerd-config-")
367+
if err != nil {
368+
fmt.Fprintf(os.Stderr, "Failed to create config file: %s\n", err)
369+
os.Exit(1)
370+
}
371+
defer f.Close()
372+
373+
if _, err := f.WriteString("[plugins.linux]\n\tshim_debug = true\n"); err != nil {
374+
fmt.Fprintf(os.Stderr, "Failed to write to config file %s: %s\n", f.Name(), err)
375+
os.Exit(1)
376+
}
377+
378+
return f.Name()
379+
}

cmd/containerd-shim/main_unix.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"context"
2424
"flag"
2525
"fmt"
26+
"io"
2627
"net"
2728
"os"
2829
"os/exec"
@@ -36,6 +37,7 @@ import (
3637

3738
"github.com/containerd/containerd/events"
3839
"github.com/containerd/containerd/namespaces"
40+
shimlog "github.com/containerd/containerd/runtime/v1"
3941
"github.com/containerd/containerd/runtime/v1/linux/proc"
4042
"github.com/containerd/containerd/runtime/v1/shim"
4143
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
@@ -92,12 +94,38 @@ func main() {
9294
runtime.GOMAXPROCS(2)
9395
}
9496

97+
stdout, stderr, err := openStdioKeepAlivePipes(workdirFlag)
98+
if err != nil {
99+
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
100+
os.Exit(1)
101+
}
102+
defer func() {
103+
stdout.Close()
104+
stderr.Close()
105+
}()
106+
95107
if err := executeShim(); err != nil {
96108
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
97109
os.Exit(1)
98110
}
99111
}
100112

113+
// If containerd server process dies, we need the shim to keep stdout/err reader
114+
// FDs so that Linux does not SIGPIPE the shim process if it tries to use its end of
115+
// these pipes.
116+
func openStdioKeepAlivePipes(dir string) (io.ReadCloser, io.ReadCloser, error) {
117+
background := context.Background()
118+
keepStdoutAlive, err := shimlog.OpenShimStdoutLog(background, dir)
119+
if err != nil {
120+
return nil, nil, err
121+
}
122+
keepStderrAlive, err := shimlog.OpenShimStderrLog(background, dir)
123+
if err != nil {
124+
return nil, nil, err
125+
}
126+
return keepStdoutAlive, keepStderrAlive, nil
127+
}
128+
101129
func executeShim() error {
102130
// start handling signals as soon as possible so that things are properly reaped
103131
// or if runtime exits before we hit the handler

container_linux_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import (
2424
"fmt"
2525
"io"
2626
"io/ioutil"
27+
"os"
2728
"os/exec"
29+
"path/filepath"
2830
"runtime"
2931
"strings"
3032
"sync"
@@ -258,6 +260,213 @@ func TestDaemonRestart(t *testing.T) {
258260
<-statusC
259261
}
260262

263+
func TestShimDoesNotLeakPipes(t *testing.T) {
264+
containerdPid := ctrd.cmd.Process.Pid
265+
initialPipes, err := numPipes(containerdPid)
266+
if err != nil {
267+
t.Fatal(err)
268+
}
269+
270+
client, err := newClient(t, address)
271+
if err != nil {
272+
t.Fatal(err)
273+
}
274+
defer client.Close()
275+
276+
var (
277+
image Image
278+
ctx, cancel = testContext()
279+
id = t.Name()
280+
)
281+
defer cancel()
282+
283+
image, err = client.GetImage(ctx, testImage)
284+
if err != nil {
285+
t.Fatal(err)
286+
}
287+
288+
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30")))
289+
if err != nil {
290+
t.Fatal(err)
291+
}
292+
293+
task, err := container.NewTask(ctx, empty())
294+
if err != nil {
295+
t.Fatal(err)
296+
}
297+
298+
exitChannel, err := task.Wait(ctx)
299+
if err != nil {
300+
t.Fatal(err)
301+
}
302+
303+
if err := task.Start(ctx); err != nil {
304+
t.Fatal(err)
305+
}
306+
307+
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
308+
t.Fatal(err)
309+
}
310+
311+
<-exitChannel
312+
313+
if _, err := task.Delete(ctx); err != nil {
314+
t.Fatal(err)
315+
}
316+
317+
if err := container.Delete(ctx, WithSnapshotCleanup); err != nil {
318+
t.Fatal(err)
319+
}
320+
321+
currentPipes, err := numPipes(containerdPid)
322+
if err != nil {
323+
t.Fatal(err)
324+
}
325+
326+
if initialPipes != currentPipes {
327+
t.Errorf("Pipes have leaked after container has been deleted. Initially there were %d pipes, after container deletion there were %d pipes", initialPipes, currentPipes)
328+
}
329+
}
330+
331+
func numPipes(pid int) (int, error) {
332+
cmd := exec.Command("sh", "-c", fmt.Sprintf("lsof -p %d | grep pipe", pid))
333+
334+
var stdout bytes.Buffer
335+
cmd.Stdout = &stdout
336+
if err := cmd.Run(); err != nil {
337+
return 0, err
338+
}
339+
return strings.Count(stdout.String(), "\n"), nil
340+
}
341+
342+
func TestDaemonReconnectsToShimIOPipesOnRestart(t *testing.T) {
343+
client, err := newClient(t, address)
344+
if err != nil {
345+
t.Fatal(err)
346+
}
347+
defer client.Close()
348+
349+
var (
350+
image Image
351+
ctx, cancel = testContext()
352+
id = t.Name()
353+
)
354+
defer cancel()
355+
356+
image, err = client.GetImage(ctx, testImage)
357+
if err != nil {
358+
t.Fatal(err)
359+
}
360+
361+
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30")))
362+
if err != nil {
363+
t.Fatal(err)
364+
}
365+
defer container.Delete(ctx, WithSnapshotCleanup)
366+
367+
task, err := container.NewTask(ctx, empty())
368+
if err != nil {
369+
t.Fatal(err)
370+
}
371+
defer task.Delete(ctx)
372+
373+
_, err = task.Wait(ctx)
374+
if err != nil {
375+
t.Fatal(err)
376+
}
377+
378+
if err := task.Start(ctx); err != nil {
379+
t.Fatal(err)
380+
}
381+
382+
if err := ctrd.Restart(nil); err != nil {
383+
t.Fatal(err)
384+
}
385+
386+
waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second)
387+
serving, err := client.IsServing(waitCtx)
388+
waitCancel()
389+
if !serving {
390+
t.Fatalf("containerd did not start within 2s: %v", err)
391+
}
392+
393+
// After we restared containerd we write some messages to the log pipes, simulating shim writing stuff there.
394+
// Then we make sure that these messages are available on the containerd log thus proving that the server reconnected to the log pipes
395+
runtimeVersion := getRuntimeVersion()
396+
logDirPath := getLogDirPath(runtimeVersion, id)
397+
398+
switch runtimeVersion {
399+
case "v1":
400+
writeToFile(t, filepath.Join(logDirPath, "shim.stdout.log"), fmt.Sprintf("%s writing to stdout\n", id))
401+
writeToFile(t, filepath.Join(logDirPath, "shim.stderr.log"), fmt.Sprintf("%s writing to stderr\n", id))
402+
case "v2":
403+
writeToFile(t, filepath.Join(logDirPath, "log"), fmt.Sprintf("%s writing to log\n", id))
404+
}
405+
406+
statusC, err := task.Wait(ctx)
407+
if err != nil {
408+
t.Fatal(err)
409+
}
410+
411+
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
412+
t.Fatal(err)
413+
}
414+
415+
<-statusC
416+
417+
stdioContents, err := ioutil.ReadFile(ctrdStdioFilePath)
418+
if err != nil {
419+
t.Fatal(err)
420+
}
421+
422+
switch runtimeVersion {
423+
case "v1":
424+
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stdout", id)) {
425+
t.Fatal("containerd did not connect to the shim stdout pipe")
426+
}
427+
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stderr", id)) {
428+
t.Fatal("containerd did not connect to the shim stderr pipe")
429+
}
430+
case "v2":
431+
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to log", id)) {
432+
t.Fatal("containerd did not connect to the shim log pipe")
433+
}
434+
}
435+
}
436+
437+
func writeToFile(t *testing.T, filePath, message string) {
438+
writer, err := os.OpenFile(filePath, os.O_WRONLY, 0600)
439+
if err != nil {
440+
t.Fatal(err)
441+
}
442+
if _, err := writer.WriteString(message); err != nil {
443+
t.Fatal(err)
444+
}
445+
if err := writer.Close(); err != nil {
446+
t.Fatal(err)
447+
}
448+
}
449+
450+
func getLogDirPath(runtimeVersion, id string) string {
451+
switch runtimeVersion {
452+
case "v1":
453+
return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id)
454+
case "v2":
455+
return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id)
456+
default:
457+
panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion))
458+
}
459+
}
460+
461+
func getRuntimeVersion() string {
462+
switch rt := os.Getenv("TEST_RUNTIME"); rt {
463+
case "io.containerd.runc.v1":
464+
return "v2"
465+
default:
466+
return "v1"
467+
}
468+
}
469+
261470
func TestContainerPTY(t *testing.T) {
262471
t.Parallel()
263472

0 commit comments

Comments
 (0)