Skip to content

Commit ddbeb3f

Browse files
committed
Adds Windows shim reconnect logs support
Signed-off-by: Justin Terry (VM) <[email protected]>
1 parent ac01f20 commit ddbeb3f

File tree

2 files changed

+183
-20
lines changed

2 files changed

+183
-20
lines changed

runtime/v2/shim/shim_windows.go

+94-20
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"os"
2828
"os/exec"
2929
"sync"
30+
"syscall"
3031
"unsafe"
3132

3233
winio "github.com/Microsoft/go-winio"
@@ -39,6 +40,10 @@ import (
3940
"golang.org/x/sys/windows"
4041
)
4142

43+
const (
44+
errorConnectionAborted syscall.Errno = 1236
45+
)
46+
4247
// setupSignals creates a new signal handler for all signals
4348
func setupSignals() (chan os.Signal, error) {
4449
signals := make(chan os.Signal, 32)
@@ -119,21 +124,100 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
119124
}
120125
}
121126

127+
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
128+
// by using Windows Named Pipes for logging. When containerd restarts it tries
129+
// to reconnect to any shims. This means that the connection to the logger will
130+
// be severed but when containerd starts up it should reconnect and start
131+
// logging again. We abstract all of this logic behind what looks like a simple
132+
// `io.Writer` that can reconnect in the liftime and buffers logs while
133+
// disconnected.
122134
type deferredShimWriteLogger struct {
135+
mu sync.Mutex
136+
123137
ctx context.Context
124138

125-
wg sync.WaitGroup
139+
connected bool
140+
aborted bool
141+
142+
buffer bytes.Buffer
126143

144+
l net.Listener
127145
c net.Conn
128146
conerr error
129147
}
130148

149+
// beginAccept issues an accept to wait for a connection. Once a conneciton
150+
// occurs drains any outstanding buffer. While draining the buffer any writes
151+
// are blocked. If the buffer fails to fully drain due to a connection drop a
152+
// call to `beginAccept` is re-issued waiting for another connection from
153+
// containerd.
154+
func (dswl *deferredShimWriteLogger) beginAccept() {
155+
dswl.mu.Lock()
156+
if dswl.connected {
157+
return
158+
}
159+
dswl.mu.Unlock()
160+
161+
c, err := dswl.l.Accept()
162+
if err == errorConnectionAborted {
163+
dswl.mu.Lock()
164+
dswl.aborted = true
165+
dswl.l.Close()
166+
dswl.conerr = errors.New("connection closed")
167+
dswl.mu.Unlock()
168+
return
169+
}
170+
dswl.mu.Lock()
171+
dswl.connected = true
172+
dswl.c = c
173+
174+
// Drain the buffer
175+
if dswl.buffer.Len() > 0 {
176+
_, err := dswl.buffer.WriteTo(dswl.c)
177+
if err != nil {
178+
// We lost our connection draining the buffer.
179+
dswl.connected = false
180+
dswl.c.Close()
181+
go dswl.beginAccept()
182+
}
183+
}
184+
dswl.mu.Unlock()
185+
}
186+
131187
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
132-
dswl.wg.Wait()
133-
if dswl.c == nil {
188+
dswl.mu.Lock()
189+
defer dswl.mu.Unlock()
190+
191+
if dswl.aborted {
134192
return 0, dswl.conerr
135193
}
136-
return dswl.c.Write(p)
194+
195+
if dswl.connected {
196+
// We have a connection. beginAccept would of drained the buffer so we just write our data to
197+
// the connection directly.
198+
written, err := dswl.c.Write(p)
199+
if err != nil {
200+
// We lost the connection.
201+
dswl.connected = false
202+
dswl.c.Close()
203+
go dswl.beginAccept()
204+
205+
// We weren't able to write the full `p` bytes. Buffer the rest
206+
if written != len(p) {
207+
w, err := dswl.buffer.Write(p[written:])
208+
if err != nil {
209+
// We failed to buffer. Return this error
210+
return written + w, err
211+
}
212+
written += w
213+
}
214+
}
215+
216+
return written, nil
217+
}
218+
219+
// We are disconnected. Buffer the contents.
220+
return dswl.buffer.Write(p)
137221
}
138222

139223
// openLog on Windows acts as the server of the log pipe. This allows the
@@ -143,26 +227,16 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
143227
if err != nil {
144228
return nil, err
145229
}
230+
231+
dswl := &deferredShimWriteLogger{
232+
ctx: ctx,
233+
}
146234
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
147235
if err != nil {
148236
return nil, err
149237
}
150-
dswl := &deferredShimWriteLogger{
151-
ctx: ctx,
152-
}
153-
// TODO: JTERRY75 - this will not work with restarts. Only the first
154-
// connection will work and all +1 connections will return 'use of closed
155-
// network connection'. Make this reconnect aware.
156-
dswl.wg.Add(1)
157-
go func() {
158-
c, conerr := l.Accept()
159-
if conerr != nil {
160-
l.Close()
161-
dswl.conerr = conerr
162-
}
163-
dswl.c = c
164-
dswl.wg.Done()
165-
}()
238+
dswl.l = l
239+
go dswl.beginAccept()
166240
return dswl, nil
167241
}
168242

runtime/v2/shim/shim_windows_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// +build windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package shim
20+
21+
import (
22+
"bytes"
23+
"context"
24+
"fmt"
25+
"io"
26+
"testing"
27+
28+
winio "github.com/Microsoft/go-winio"
29+
"github.com/containerd/containerd/namespaces"
30+
)
31+
32+
func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) {
33+
expected := []byte(expectedStr)
34+
actual := make([]byte, len(expected))
35+
read, err := rdr.Read(actual)
36+
if err != nil {
37+
t.Fatalf("failed to read with: %v", err)
38+
}
39+
if read != len(expected) {
40+
t.Fatalf("failed to read len %v bytes read: %v", len(expected), actual)
41+
}
42+
if !bytes.Equal(expected, actual) {
43+
t.Fatalf("expected '%v' != actual '%v'", expected, actual)
44+
}
45+
}
46+
47+
func writeValueTo(wr io.Writer, value string, t *testing.T) {
48+
expected := []byte(value)
49+
written, err := wr.Write(expected)
50+
if err != nil {
51+
t.Fatalf("failed to write with: %v", err)
52+
}
53+
if len(expected) != written {
54+
t.Fatalf("failed to write len %v bytes wrote: %v", len(expected), written)
55+
}
56+
}
57+
58+
func runOneTest(ns, id string, writer io.Writer, t *testing.T) {
59+
// Write on closed
60+
go writeValueTo(writer, "Hello World!", t)
61+
62+
// Connect
63+
c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
64+
if err != nil {
65+
t.Fatal("should have successfully connected to log")
66+
}
67+
defer c.Close()
68+
69+
// Read the deferred buffer.
70+
readValueFrom(c, "Hello World!", t)
71+
72+
go writeValueTo(writer, "Hello Next!", t)
73+
readValueFrom(c, "Hello Next!", t)
74+
}
75+
76+
func TestOpenLog(t *testing.T) {
77+
ns := "openlognamespace"
78+
id := "openlogid"
79+
ctx := namespaces.WithNamespace(context.TODO(), ns)
80+
writer, err := openLog(ctx, id)
81+
if err != nil {
82+
t.Fatalf("failed openLog with %v", err)
83+
}
84+
85+
// Do three iterations of write/open/read/write/read/close
86+
for i := 0; i < 3; i++ {
87+
runOneTest(ns, id, writer, t)
88+
}
89+
}

0 commit comments

Comments
 (0)