@@ -27,6 +27,7 @@ import (
2727 "os"
2828 "os/exec"
2929 "sync"
30+ "syscall"
3031 "unsafe"
3132
3233 winio "github.com/Microsoft/go-winio"
@@ -39,6 +40,10 @@ import (
3940 "golang.org/x/sys/windows"
4041)
4142
43+ const (
44+ errorConnectionAborted syscall.Errno = 1236
45+ )
46+
4247// setupSignals creates a new signal handler for all signals
4348func setupSignals () (chan os.Signal , error ) {
4449 signals := make (chan os.Signal , 32 )
@@ -119,21 +124,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
119124 }
120125}
121126
127+ var _ = (io .WriterTo )(& blockingBuffer {})
128+ var _ = (io .Writer )(& blockingBuffer {})
129+
130+ // blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
131+ // `capacity` is reached the calls to `Write` will block until a successful call
132+ // to `WriterTo` frees up the buffer space.
133+ //
134+ // Note: This has the same threadding semantics as bytes.Buffer with no
135+ // additional locking so multithreading is not supported.
136+ type blockingBuffer struct {
137+ c * sync.Cond
138+
139+ capacity int
140+
141+ buffer bytes.Buffer
142+ }
143+
144+ func newBlockingBuffer (capacity int ) * blockingBuffer {
145+ return & blockingBuffer {
146+ c : sync .NewCond (& sync.Mutex {}),
147+ capacity : capacity ,
148+ }
149+ }
150+
151+ func (bb * blockingBuffer ) Len () int {
152+ bb .c .L .Lock ()
153+ defer bb .c .L .Unlock ()
154+ return bb .buffer .Len ()
155+ }
156+
157+ func (bb * blockingBuffer ) Write (p []byte ) (int , error ) {
158+ if len (p ) > bb .capacity {
159+ return 0 , errors .Errorf ("len(p) (%d) too large for capacity (%d)" , len (p ), bb .capacity )
160+ }
161+
162+ bb .c .L .Lock ()
163+ for bb .buffer .Len ()+ len (p ) > bb .capacity {
164+ bb .c .Wait ()
165+ }
166+ defer bb .c .L .Unlock ()
167+ return bb .buffer .Write (p )
168+ }
169+
170+ func (bb * blockingBuffer ) WriteTo (w io.Writer ) (int64 , error ) {
171+ bb .c .L .Lock ()
172+ defer bb .c .L .Unlock ()
173+ defer bb .c .Signal ()
174+ return bb .buffer .WriteTo (w )
175+ }
176+
177+ // deferredShimWriteLogger exists to solve the upstream loggin issue presented
178+ // by using Windows Named Pipes for logging. When containerd restarts it tries
179+ // to reconnect to any shims. This means that the connection to the logger will
180+ // be severed but when containerd starts up it should reconnect and start
181+ // logging again. We abstract all of this logic behind what looks like a simple
182+ // `io.Writer` that can reconnect in the lifetime and buffers logs while
183+ // disconnected.
122184type deferredShimWriteLogger struct {
185+ mu sync.Mutex
186+
123187 ctx context.Context
124188
125- wg sync.WaitGroup
189+ connected bool
190+ aborted bool
191+
192+ buffer * blockingBuffer
126193
194+ l net.Listener
127195 c net.Conn
128196 conerr error
129197}
130198
199+ // beginAccept issues an accept to wait for a connection. Once a conneciton
200+ // occurs drains any outstanding buffer. While draining the buffer any writes
201+ // are blocked. If the buffer fails to fully drain due to a connection drop a
202+ // call to `beginAccept` is re-issued waiting for another connection from
203+ // containerd.
204+ func (dswl * deferredShimWriteLogger ) beginAccept () {
205+ dswl .mu .Lock ()
206+ if dswl .connected {
207+ return
208+ }
209+ dswl .mu .Unlock ()
210+
211+ c , err := dswl .l .Accept ()
212+ if err == errorConnectionAborted {
213+ dswl .mu .Lock ()
214+ dswl .aborted = true
215+ dswl .l .Close ()
216+ dswl .conerr = errors .New ("connection closed" )
217+ dswl .mu .Unlock ()
218+ return
219+ }
220+ dswl .mu .Lock ()
221+ dswl .connected = true
222+ dswl .c = c
223+
224+ // Drain the buffer
225+ if dswl .buffer .Len () > 0 {
226+ _ , err := dswl .buffer .WriteTo (dswl .c )
227+ if err != nil {
228+ // We lost our connection draining the buffer.
229+ dswl .connected = false
230+ dswl .c .Close ()
231+ go dswl .beginAccept ()
232+ }
233+ }
234+ dswl .mu .Unlock ()
235+ }
236+
131237func (dswl * deferredShimWriteLogger ) Write (p []byte ) (int , error ) {
132- dswl .wg .Wait ()
133- if dswl .c == nil {
238+ dswl .mu .Lock ()
239+ defer dswl .mu .Unlock ()
240+
241+ if dswl .aborted {
134242 return 0 , dswl .conerr
135243 }
136- return dswl .c .Write (p )
244+
245+ if dswl .connected {
246+ // We have a connection. beginAccept would have drained the buffer so we just write our data to
247+ // the connection directly.
248+ written , err := dswl .c .Write (p )
249+ if err != nil {
250+ // We lost the connection.
251+ dswl .connected = false
252+ dswl .c .Close ()
253+ go dswl .beginAccept ()
254+
255+ // We weren't able to write the full `p` bytes. Buffer the rest
256+ if written != len (p ) {
257+ w , err := dswl .buffer .Write (p [written :])
258+ if err != nil {
259+ // We failed to buffer. Return this error
260+ return written + w , err
261+ }
262+ written += w
263+ }
264+ }
265+
266+ return written , nil
267+ }
268+
269+ // We are disconnected. Buffer the contents.
270+ return dswl .buffer .Write (p )
137271}
138272
139273// openLog on Windows acts as the server of the log pipe. This allows the
@@ -143,26 +277,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
143277 if err != nil {
144278 return nil , err
145279 }
280+
281+ dswl := & deferredShimWriteLogger {
282+ ctx : ctx ,
283+ buffer : newBlockingBuffer (64 * 1024 ), // 64KB,
284+ }
146285 l , err := winio .ListenPipe (fmt .Sprintf ("\\ \\ .\\ pipe\\ containerd-shim-%s-%s-log" , ns , id ), nil )
147286 if err != nil {
148287 return nil , err
149288 }
150- dswl := & deferredShimWriteLogger {
151- ctx : ctx ,
152- }
153- // TODO: JTERRY75 - this will not work with restarts. Only the first
154- // connection will work and all +1 connections will return 'use of closed
155- // network connection'. Make this reconnect aware.
156- dswl .wg .Add (1 )
157- go func () {
158- c , conerr := l .Accept ()
159- if conerr != nil {
160- l .Close ()
161- dswl .conerr = conerr
162- }
163- dswl .c = c
164- dswl .wg .Done ()
165- }()
289+ dswl .l = l
290+ go dswl .beginAccept ()
166291 return dswl , nil
167292}
168293
0 commit comments