Skip to content

Commit de4bb2d

Browse files
authored
Merge pull request #2692 from jterry75/shim_reconnect
Various runhcs shim fixes
2 parents 43acab8 + ab20312 commit de4bb2d

6 files changed

Lines changed: 354 additions & 29 deletions

File tree

runtime/v2/binary.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"io"
2323
"os"
24+
gruntime "runtime"
2425
"strings"
2526

2627
eventstypes "github.com/containerd/containerd/api/events"
@@ -109,7 +110,22 @@ func (b *binary) Start(ctx context.Context) (_ *shim, err error) {
109110

110111
func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
111112
log.G(ctx).Info("cleaning up dead shim")
112-
cmd, err := client.Command(ctx, b.runtime, b.containerdAddress, b.bundle.Path, "-id", b.bundle.ID, "delete")
113+
114+
// Windows cannot delete the current working directory while an
115+
// executable is in use with it. For the cleanup case we invoke with the
116+
// default work dir and forward the bundle path on the cmdline.
117+
var bundlePath string
118+
if gruntime.GOOS != "windows" {
119+
bundlePath = b.bundle.Path
120+
}
121+
122+
cmd, err := client.Command(ctx,
123+
b.runtime,
124+
b.containerdAddress,
125+
bundlePath,
126+
"-id", b.bundle.ID,
127+
"-bundle", b.bundle.Path,
128+
"delete")
113129
if err != nil {
114130
return nil, err
115131
}

runtime/v2/runhcs/service.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,22 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
195195
if err != nil {
196196
return nil, err
197197
}
198-
path, err := os.Getwd()
199-
if err != nil {
200-
return nil, err
198+
// Forcibly shut down any container in this bundle
199+
rhcs := newRunhcs("")
200+
dopts := &runhcs.DeleteOpts{
201+
Force: true,
201202
}
202-
if err := os.RemoveAll(path); err != nil {
203-
return nil, err
203+
if err := rhcs.Delete(ctx, s.id, dopts); err != nil {
204+
log.G(ctx).WithError(err).Debugf("failed to delete container")
204205
}
206+
207+
opts, ok := ctx.Value(shim.OptsKey{}).(shim.Opts)
208+
if ok && opts.BundlePath != "" {
209+
if err := os.RemoveAll(opts.BundlePath); err != nil {
210+
return nil, err
211+
}
212+
}
213+
205214
return &taskAPI.DeleteResponse{
206215
ExitedAt: time.Now(),
207216
ExitStatus: 255,

runtime/v2/shim/shim.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,16 @@ type OptsKey struct{}
5858

5959
// Opts are context options associated with the shim invocation.
6060
type Opts struct {
61-
Debug bool
61+
BundlePath string
62+
Debug bool
6263
}
6364

6465
var (
6566
debugFlag bool
6667
idFlag string
6768
namespaceFlag string
6869
socketFlag string
70+
bundlePath string
6971
addressFlag string
7072
containerdBinaryFlag string
7173
action string
@@ -76,6 +78,7 @@ func parseFlags() {
7678
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
7779
flag.StringVar(&idFlag, "id", "", "id of the task")
7880
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
81+
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
7982

8083
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
8184
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
@@ -141,7 +144,7 @@ func run(id string, initFunc Init) error {
141144
return fmt.Errorf("shim namespace cannot be empty")
142145
}
143146
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
144-
ctx = context.WithValue(ctx, OptsKey{}, Opts{Debug: debugFlag})
147+
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
145148
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
146149

147150
service, err := initFunc(ctx, idFlag, publisher)

runtime/v2/shim/shim_windows.go

Lines changed: 145 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"os"
2828
"os/exec"
2929
"sync"
30+
"syscall"
3031
"unsafe"
3132

3233
winio "github.com/Microsoft/go-winio"
@@ -39,6 +40,10 @@ import (
3940
"golang.org/x/sys/windows"
4041
)
4142

43+
const (
44+
errorConnectionAborted syscall.Errno = 1236
45+
)
46+
4247
// setupSignals creates a new signal handler for all signals
4348
func setupSignals() (chan os.Signal, error) {
4449
signals := make(chan os.Signal, 32)
@@ -119,21 +124,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
119124
}
120125
}
121126

127+
var _ = (io.WriterTo)(&blockingBuffer{})
128+
var _ = (io.Writer)(&blockingBuffer{})
129+
130+
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
131+
// `capacity` is reached the calls to `Write` will block until a successful call
132+
// to `WriterTo` frees up the buffer space.
133+
//
134+
// Note: This has the same threadding semantics as bytes.Buffer with no
135+
// additional locking so multithreading is not supported.
136+
type blockingBuffer struct {
137+
c *sync.Cond
138+
139+
capacity int
140+
141+
buffer bytes.Buffer
142+
}
143+
144+
func newBlockingBuffer(capacity int) *blockingBuffer {
145+
return &blockingBuffer{
146+
c: sync.NewCond(&sync.Mutex{}),
147+
capacity: capacity,
148+
}
149+
}
150+
151+
func (bb *blockingBuffer) Len() int {
152+
bb.c.L.Lock()
153+
defer bb.c.L.Unlock()
154+
return bb.buffer.Len()
155+
}
156+
157+
func (bb *blockingBuffer) Write(p []byte) (int, error) {
158+
if len(p) > bb.capacity {
159+
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
160+
}
161+
162+
bb.c.L.Lock()
163+
for bb.buffer.Len()+len(p) > bb.capacity {
164+
bb.c.Wait()
165+
}
166+
defer bb.c.L.Unlock()
167+
return bb.buffer.Write(p)
168+
}
169+
170+
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
171+
bb.c.L.Lock()
172+
defer bb.c.L.Unlock()
173+
defer bb.c.Signal()
174+
return bb.buffer.WriteTo(w)
175+
}
176+
177+
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
178+
// by using Windows Named Pipes for logging. When containerd restarts it tries
179+
// to reconnect to any shims. This means that the connection to the logger will
180+
// be severed but when containerd starts up it should reconnect and start
181+
// logging again. We abstract all of this logic behind what looks like a simple
182+
// `io.Writer` that can reconnect in the lifetime and buffers logs while
183+
// disconnected.
122184
type deferredShimWriteLogger struct {
185+
mu sync.Mutex
186+
123187
ctx context.Context
124188

125-
wg sync.WaitGroup
189+
connected bool
190+
aborted bool
191+
192+
buffer *blockingBuffer
126193

194+
l net.Listener
127195
c net.Conn
128196
conerr error
129197
}
130198

199+
// beginAccept issues an accept to wait for a connection. Once a conneciton
200+
// occurs drains any outstanding buffer. While draining the buffer any writes
201+
// are blocked. If the buffer fails to fully drain due to a connection drop a
202+
// call to `beginAccept` is re-issued waiting for another connection from
203+
// containerd.
204+
func (dswl *deferredShimWriteLogger) beginAccept() {
205+
dswl.mu.Lock()
206+
if dswl.connected {
207+
return
208+
}
209+
dswl.mu.Unlock()
210+
211+
c, err := dswl.l.Accept()
212+
if err == errorConnectionAborted {
213+
dswl.mu.Lock()
214+
dswl.aborted = true
215+
dswl.l.Close()
216+
dswl.conerr = errors.New("connection closed")
217+
dswl.mu.Unlock()
218+
return
219+
}
220+
dswl.mu.Lock()
221+
dswl.connected = true
222+
dswl.c = c
223+
224+
// Drain the buffer
225+
if dswl.buffer.Len() > 0 {
226+
_, err := dswl.buffer.WriteTo(dswl.c)
227+
if err != nil {
228+
// We lost our connection draining the buffer.
229+
dswl.connected = false
230+
dswl.c.Close()
231+
go dswl.beginAccept()
232+
}
233+
}
234+
dswl.mu.Unlock()
235+
}
236+
131237
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
132-
dswl.wg.Wait()
133-
if dswl.c == nil {
238+
dswl.mu.Lock()
239+
defer dswl.mu.Unlock()
240+
241+
if dswl.aborted {
134242
return 0, dswl.conerr
135243
}
136-
return dswl.c.Write(p)
244+
245+
if dswl.connected {
246+
// We have a connection. beginAccept would have drained the buffer so we just write our data to
247+
// the connection directly.
248+
written, err := dswl.c.Write(p)
249+
if err != nil {
250+
// We lost the connection.
251+
dswl.connected = false
252+
dswl.c.Close()
253+
go dswl.beginAccept()
254+
255+
// We weren't able to write the full `p` bytes. Buffer the rest
256+
if written != len(p) {
257+
w, err := dswl.buffer.Write(p[written:])
258+
if err != nil {
259+
// We failed to buffer. Return this error
260+
return written + w, err
261+
}
262+
written += w
263+
}
264+
}
265+
266+
return written, nil
267+
}
268+
269+
// We are disconnected. Buffer the contents.
270+
return dswl.buffer.Write(p)
137271
}
138272

139273
// openLog on Windows acts as the server of the log pipe. This allows the
@@ -143,26 +277,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
143277
if err != nil {
144278
return nil, err
145279
}
280+
281+
dswl := &deferredShimWriteLogger{
282+
ctx: ctx,
283+
buffer: newBlockingBuffer(64 * 1024), // 64KB,
284+
}
146285
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
147286
if err != nil {
148287
return nil, err
149288
}
150-
dswl := &deferredShimWriteLogger{
151-
ctx: ctx,
152-
}
153-
// TODO: JTERRY75 - this will not work with restarts. Only the first
154-
// connection will work and all +1 connections will return 'use of closed
155-
// network connection'. Make this reconnect aware.
156-
dswl.wg.Add(1)
157-
go func() {
158-
c, conerr := l.Accept()
159-
if conerr != nil {
160-
l.Close()
161-
dswl.conerr = conerr
162-
}
163-
dswl.c = c
164-
dswl.wg.Done()
165-
}()
289+
dswl.l = l
290+
go dswl.beginAccept()
166291
return dswl, nil
167292
}
168293

0 commit comments

Comments
 (0)