Skip to content

Commit ab20312

Browse files
committed
Add blocking buffered writes to shim
Signed-off-by: Justin Terry (VM) <[email protected]>
1 parent beb1f43 commit ab20312

2 files changed

Lines changed: 132 additions & 8 deletions

File tree

runtime/v2/shim/shim_windows.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,56 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
124124
}
125125
}
126126

127+
var _ = (io.WriterTo)(&blockingBuffer{})
128+
var _ = (io.Writer)(&blockingBuffer{})
129+
130+
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
131+
// `capacity` is reached the calls to `Write` will block until a successful call
132+
// to `WriterTo` frees up the buffer space.
133+
//
134+
// Note: This has the same threadding semantics as bytes.Buffer with no
135+
// additional locking so multithreading is not supported.
136+
type blockingBuffer struct {
137+
c *sync.Cond
138+
139+
capacity int
140+
141+
buffer bytes.Buffer
142+
}
143+
144+
func newBlockingBuffer(capacity int) *blockingBuffer {
145+
return &blockingBuffer{
146+
c: sync.NewCond(&sync.Mutex{}),
147+
capacity: capacity,
148+
}
149+
}
150+
151+
func (bb *blockingBuffer) Len() int {
152+
bb.c.L.Lock()
153+
defer bb.c.L.Unlock()
154+
return bb.buffer.Len()
155+
}
156+
157+
func (bb *blockingBuffer) Write(p []byte) (int, error) {
158+
if len(p) > bb.capacity {
159+
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
160+
}
161+
162+
bb.c.L.Lock()
163+
for bb.buffer.Len()+len(p) > bb.capacity {
164+
bb.c.Wait()
165+
}
166+
defer bb.c.L.Unlock()
167+
return bb.buffer.Write(p)
168+
}
169+
170+
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
171+
bb.c.L.Lock()
172+
defer bb.c.L.Unlock()
173+
defer bb.c.Signal()
174+
return bb.buffer.WriteTo(w)
175+
}
176+
127177
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
128178
// by using Windows Named Pipes for logging. When containerd restarts it tries
129179
// to reconnect to any shims. This means that the connection to the logger will
@@ -139,7 +189,7 @@ type deferredShimWriteLogger struct {
139189
connected bool
140190
aborted bool
141191

142-
buffer bytes.Buffer
192+
buffer *blockingBuffer
143193

144194
l net.Listener
145195
c net.Conn
@@ -229,7 +279,8 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
229279
}
230280

231281
dswl := &deferredShimWriteLogger{
232-
ctx: ctx,
282+
ctx: ctx,
283+
buffer: newBlockingBuffer(64 * 1024), // 64KB,
233284
}
234285
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
235286
if err != nil {

runtime/v2/shim/shim_windows_test.go

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"io"
2626
"testing"
27+
"time"
2728

2829
winio "github.com/Microsoft/go-winio"
2930
"github.com/containerd/containerd/namespaces"
@@ -37,27 +38,27 @@ func readValueFrom(rdr io.Reader, expectedStr string, t *testing.T) {
3738
t.Fatalf("failed to read with: %v", err)
3839
}
3940
if read != len(expected) {
40-
t.Fatalf("failed to read len %v bytes read: %v", len(expected), actual)
41+
t.Fatalf("failed to read len %v bytes read: %v", len(expected), read)
4142
}
4243
if !bytes.Equal(expected, actual) {
4344
t.Fatalf("expected '%v' != actual '%v'", expected, actual)
4445
}
4546
}
4647

47-
func writeValueTo(wr io.Writer, value string, t *testing.T) {
48+
func writeValueTo(wr io.Writer, value string) {
4849
expected := []byte(value)
4950
written, err := wr.Write(expected)
5051
if err != nil {
51-
t.Fatalf("failed to write with: %v", err)
52+
panic(fmt.Sprintf("failed to write with: %v", err))
5253
}
5354
if len(expected) != written {
54-
t.Fatalf("failed to write len %v bytes wrote: %v", len(expected), written)
55+
panic(fmt.Sprintf("failed to write len %v bytes wrote: %v", len(expected), written))
5556
}
5657
}
5758

5859
func runOneTest(ns, id string, writer io.Writer, t *testing.T) {
5960
// Write on closed
60-
go writeValueTo(writer, "Hello World!", t)
61+
go writeValueTo(writer, "Hello World!")
6162

6263
// Connect
6364
c, err := winio.DialPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
@@ -69,7 +70,7 @@ func runOneTest(ns, id string, writer io.Writer, t *testing.T) {
6970
// Read the deferred buffer.
7071
readValueFrom(c, "Hello World!", t)
7172

72-
go writeValueTo(writer, "Hello Next!", t)
73+
go writeValueTo(writer, "Hello Next!")
7374
readValueFrom(c, "Hello Next!", t)
7475
}
7576

@@ -87,3 +88,75 @@ func TestOpenLog(t *testing.T) {
8788
runOneTest(ns, id, writer, t)
8889
}
8990
}
91+
92+
func TestBlockingBufferWriteNotEnoughCapacity(t *testing.T) {
93+
bb := newBlockingBuffer(5)
94+
val := make([]byte, 10)
95+
w, err := bb.Write(val)
96+
if err == nil {
97+
t.Fatal("write should of failed capacity check")
98+
}
99+
if w != 0 {
100+
t.Fatal("write should of not written any bytes on failed capacity check")
101+
}
102+
}
103+
104+
func TestBlockingBufferLoop(t *testing.T) {
105+
nameBytes := []byte(t.Name())
106+
nameBytesLen := len(nameBytes)
107+
bb := newBlockingBuffer(nameBytesLen)
108+
for i := 0; i < 3; i++ {
109+
writeValueTo(bb, t.Name())
110+
if bb.Len() != nameBytesLen {
111+
t.Fatalf("invalid buffer bytes len after write: (%d)", bb.buffer.Len())
112+
}
113+
buf := &bytes.Buffer{}
114+
w, err := bb.WriteTo(buf)
115+
if err != nil {
116+
t.Fatalf("should not have failed WriteTo: (%v)", err)
117+
}
118+
if w != int64(nameBytesLen) {
119+
t.Fatalf("should have written all bytes, wrote (%d)", w)
120+
}
121+
readValueFrom(buf, t.Name(), t)
122+
if bb.Len() != 0 {
123+
t.Fatalf("invalid buffer bytes len after read: (%d)", bb.buffer.Len())
124+
}
125+
}
126+
}
127+
func TestBlockingBuffer(t *testing.T) {
128+
nameBytes := []byte(t.Name())
129+
nameBytesLen := len(nameBytes)
130+
bb := newBlockingBuffer(nameBytesLen)
131+
132+
// Write the first value
133+
writeValueTo(bb, t.Name())
134+
if bb.Len() != nameBytesLen {
135+
t.Fatalf("buffer len != %d", nameBytesLen)
136+
}
137+
138+
// We should now have filled capacity the next write should block
139+
done := make(chan struct{})
140+
go func() {
141+
writeValueTo(bb, t.Name())
142+
close(done)
143+
}()
144+
select {
145+
case <-done:
146+
t.Fatal("third write should of blocked")
147+
case <-time.After(10 * time.Millisecond):
148+
buff := &bytes.Buffer{}
149+
_, err := bb.WriteTo(buff)
150+
if err != nil {
151+
t.Fatalf("failed to drain buffer with: %v", err)
152+
}
153+
if bb.Len() != 0 {
154+
t.Fatalf("buffer len != %d", 0)
155+
}
156+
readValueFrom(buff, t.Name(), t)
157+
}
158+
<-done
159+
if bb.Len() != nameBytesLen {
160+
t.Fatalf("buffer len != %d", nameBytesLen)
161+
}
162+
}

0 commit comments

Comments
 (0)