Skip to content

Commit 0f75649

Browse files
committed
Fix writer deadlock in local store
The local store could end up in a state where the writer is closed but the reference is locked after a commit on an existing object. Cleans up Commit logic to always close the writer even after an error occurs, guaranteeing the reference is unlocked after commit. Adds a test to the content test suite to verify this behavior. Updates the content store interface definitions to clarify the behavior. Signed-off-by: Derek McGowan <[email protected]>
1 parent 15f19d7 commit 0f75649

File tree

6 files changed

+240
-42
lines changed

6 files changed

+240
-42
lines changed

content/content.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,18 @@ type IngestManager interface {
110110

111111
// Writer handles the write of content into a content store
112112
type Writer interface {
113-
// Close is expected to be called after Commit() when commission is needed.
114-
// Closing a writer without commit allows resuming or aborting.
113+
// Close closes the writer, if the writer has not been
114+
// committed this allows resuming or aborting.
115+
// Calling Close on a closed writer will not error.
115116
io.WriteCloser
116117

117118
// Digest may return empty digest or panics until committed.
118119
Digest() digest.Digest
119120

120121
// Commit commits the blob (but no roll-back is guaranteed on an error).
121122
// size and expected can be zero-value when unknown.
123+
// Commit always closes the writer, even on error.
124+
// ErrAlreadyExists aborts the writer.
122125
Commit(ctx context.Context, size int64, expected digest.Digest, opts ...Opt) error
123126

124127
// Status returns the current state of write

content/local/store.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -524,12 +524,11 @@ func (s *store) writer(ctx context.Context, ref string, total int64, expected di
524524
if err != nil {
525525
return nil, err
526526
}
527-
defer fp.Close()
528527

529528
p := bufPool.Get().(*[]byte)
530-
defer bufPool.Put(p)
531-
532529
offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
530+
bufPool.Put(p)
531+
fp.Close()
533532
if err != nil {
534533
return nil, err
535534
}

content/local/writer.go

+43-28
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/containerd/containerd/content"
2828
"github.com/containerd/containerd/errdefs"
29+
"github.com/containerd/containerd/log"
2930
"github.com/opencontainers/go-digest"
3031
"github.com/pkg/errors"
3132
)
@@ -80,43 +81,36 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest,
8081
}
8182
}
8283

83-
if w.fp == nil {
84+
// Ensure even on error the writer is fully closed
85+
defer unlock(w.ref)
86+
fp := w.fp
87+
w.fp = nil
88+
89+
if fp == nil {
8490
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot commit on closed writer")
8591
}
8692

87-
if err := w.fp.Sync(); err != nil {
93+
if err := fp.Sync(); err != nil {
94+
fp.Close()
8895
return errors.Wrap(err, "sync failed")
8996
}
9097

91-
fi, err := w.fp.Stat()
98+
fi, err := fp.Stat()
99+
closeErr := fp.Close()
92100
if err != nil {
93101
return errors.Wrap(err, "stat on ingest file failed")
94102
}
95-
96-
// change to readonly, more important for read, but provides _some_
97-
// protection from this point on. We use the existing perms with a mask
98-
// only allowing reads honoring the umask on creation.
99-
//
100-
// This removes write and exec, only allowing read per the creation umask.
101-
//
102-
// NOTE: Windows does not support this operation
103-
if runtime.GOOS != "windows" {
104-
if err := w.fp.Chmod((fi.Mode() & os.ModePerm) &^ 0333); err != nil {
105-
return errors.Wrap(err, "failed to change ingest file permissions")
106-
}
103+
if closeErr != nil {
104+
return errors.Wrap(err, "failed to close ingest file")
107105
}
108106

109107
if size > 0 && size != fi.Size() {
110-
return errors.Errorf("unexpected commit size %d, expected %d", fi.Size(), size)
111-
}
112-
113-
if err := w.fp.Close(); err != nil {
114-
return errors.Wrap(err, "failed closing ingest")
108+
return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit size %d, expected %d", fi.Size(), size)
115109
}
116110

117111
dgst := w.digester.Digest()
118112
if expected != "" && expected != dgst {
119-
return errors.Errorf("unexpected commit digest %s, expected %s", dgst, expected)
113+
return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit digest %s, expected %s", dgst, expected)
120114
}
121115

122116
var (
@@ -129,27 +123,48 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest,
129123
return err
130124
}
131125

132-
// clean up!!
133-
defer os.RemoveAll(w.path)
134-
135126
if _, err := os.Stat(target); err == nil {
136127
// collision with the target file!
128+
if err := os.RemoveAll(w.path); err != nil {
129+
log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Errorf("failed to remove ingest directory")
130+
}
137131
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", dgst)
138132
}
133+
139134
if err := os.Rename(ingest, target); err != nil {
140135
return err
141136
}
137+
138+
// Ingest has now been made available in the content store, attempt to complete
139+
// setting metadata but errors should only be logged and not returned since
140+
// the content store cannot be cleanly rolled back.
141+
142142
commitTime := time.Now()
143143
if err := os.Chtimes(target, commitTime, commitTime); err != nil {
144-
return err
144+
log.G(ctx).WithField("digest", dgst).Errorf("failed to change file time to commit time")
145145
}
146146

147-
w.fp = nil
148-
unlock(w.ref)
147+
// clean up!!
148+
if err := os.RemoveAll(w.path); err != nil {
149+
log.G(ctx).WithField("ref", w.ref).WithField("path", w.path).Errorf("failed to remove ingest directory")
150+
}
149151

150152
if w.s.ls != nil && base.Labels != nil {
151153
if err := w.s.ls.Set(dgst, base.Labels); err != nil {
152-
return err
154+
log.G(ctx).WithField("digest", dgst).Errorf("failed to set labels")
155+
}
156+
}
157+
158+
// change to readonly, more important for read, but provides _some_
159+
// protection from this point on. We use the existing perms with a mask
160+
// only allowing reads honoring the umask on creation.
161+
//
162+
// This removes write and exec, only allowing read per the creation umask.
163+
//
164+
// NOTE: Windows does not support this operation
165+
if runtime.GOOS != "windows" {
166+
if err := os.Chmod(target, (fi.Mode()&os.ModePerm)&^0333); err != nil {
167+
log.G(ctx).WithField("ref", w.ref).Errorf("failed to make readonly")
153168
}
154169
}
155170

content/testsuite/testsuite.go

+170
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ func ContentSuite(t *testing.T, name string, storeFn func(ctx context.Context, r
5454

5555
t.Run("CrossNamespaceAppend", makeTest(t, name, storeFn, checkCrossNSAppend))
5656
t.Run("CrossNamespaceShare", makeTest(t, name, storeFn, checkCrossNSShare))
57+
58+
t.Run("CommitErrorState", makeTest(t, name, storeFn, checkCommitErrorState))
5759
}
5860

5961
// ContextWrapper is used to decorate new context used inside the test
@@ -312,7 +314,175 @@ func checkCommitExists(ctx context.Context, t *testing.T, cs content.Store) {
312314
} else if !errdefs.IsAlreadyExists(err) {
313315
t.Fatalf("(%d) Unexpected error: %+v", i, err)
314316
}
317+
}
318+
}
319+
320+
func checkRefNotAvailable(ctx context.Context, t *testing.T, cs content.Store, ref string) {
321+
t.Helper()
322+
323+
w, err := cs.Writer(ctx, content.WithRef(ref))
324+
if err == nil {
325+
w.Close()
326+
t.Fatal("writer created with ref, expected to be in use")
327+
}
328+
if !errdefs.IsUnavailable(err) {
329+
t.Fatalf("Expected unavailable error, got %+v", err)
330+
}
331+
}
332+
333+
func checkCommitErrorState(ctx context.Context, t *testing.T, cs content.Store) {
334+
c1, d1 := createContent(256)
335+
_, d2 := createContent(256)
336+
if err := content.WriteBlob(ctx, cs, "c1", bytes.NewReader(c1), ocispec.Descriptor{Digest: d1}); err != nil {
337+
t.Fatal(err)
338+
}
339+
340+
ref := "c1-commiterror-state"
341+
w, err := cs.Writer(ctx, content.WithRef(ref))
342+
if err != nil {
343+
t.Fatal(err)
344+
}
345+
if _, err := w.Write(c1); err != nil {
346+
if err := w.Close(); err != nil {
347+
t.Errorf("Close error: %+v", err)
348+
}
349+
t.Fatal(err)
350+
}
351+
352+
checkRefNotAvailable(ctx, t, cs, ref)
353+
354+
// Check exists
355+
err = w.Commit(ctx, int64(len(c1)), d1)
356+
if err == nil {
357+
t.Fatalf("Expected already exists error")
358+
} else if !errdefs.IsAlreadyExists(err) {
359+
if err := w.Close(); err != nil {
360+
t.Errorf("Close error: %+v", err)
361+
}
362+
t.Fatalf("Unexpected error: %+v", err)
363+
}
364+
365+
w, err = cs.Writer(ctx, content.WithRef(ref))
366+
if err != nil {
367+
t.Fatal(err)
368+
}
369+
370+
checkRefNotAvailable(ctx, t, cs, ref)
371+
372+
if _, err := w.Write(c1); err != nil {
373+
if err := w.Close(); err != nil {
374+
t.Errorf("close error: %+v", err)
375+
}
376+
t.Fatal(err)
377+
}
378+
379+
// Check exists without providing digest
380+
err = w.Commit(ctx, int64(len(c1)), "")
381+
if err == nil {
382+
t.Fatalf("Expected already exists error")
383+
} else if !errdefs.IsAlreadyExists(err) {
384+
if err := w.Close(); err != nil {
385+
t.Errorf("Close error: %+v", err)
386+
}
387+
t.Fatalf("Unexpected error: %+v", err)
388+
}
389+
390+
w, err = cs.Writer(ctx, content.WithRef(ref))
391+
if err != nil {
392+
t.Fatal(err)
393+
}
394+
395+
checkRefNotAvailable(ctx, t, cs, ref)
396+
397+
if _, err := w.Write(append(c1, []byte("more")...)); err != nil {
398+
if err := w.Close(); err != nil {
399+
t.Errorf("close error: %+v", err)
400+
}
401+
t.Fatal(err)
402+
}
403+
404+
// Commit with the wrong digest should produce an error
405+
err = w.Commit(ctx, int64(len(c1))+4, d2)
406+
if err == nil {
407+
t.Fatalf("Expected error from wrong digest")
408+
} else if !errdefs.IsFailedPrecondition(err) {
409+
t.Errorf("Unexpected error: %+v", err)
410+
}
411+
412+
w, err = cs.Writer(ctx, content.WithRef(ref))
413+
if err != nil {
414+
t.Fatal(err)
415+
}
416+
417+
checkRefNotAvailable(ctx, t, cs, ref)
418+
419+
// Commit with wrong size should also produce an error
420+
err = w.Commit(ctx, int64(len(c1)), "")
421+
if err == nil {
422+
t.Fatalf("Expected error from wrong size")
423+
} else if !errdefs.IsFailedPrecondition(err) {
424+
t.Errorf("Unexpected error: %+v", err)
425+
}
426+
427+
w, err = cs.Writer(ctx, content.WithRef(ref))
428+
if err != nil {
429+
t.Fatal(err)
430+
}
431+
432+
checkRefNotAvailable(ctx, t, cs, ref)
433+
434+
// Now expect commit to succeed
435+
if err := w.Commit(ctx, int64(len(c1))+4, ""); err != nil {
436+
if err := w.Close(); err != nil {
437+
t.Errorf("close error: %+v", err)
438+
}
439+
t.Fatalf("Failed to commit: %+v", err)
440+
}
315441

442+
// Create another writer with same reference
443+
w, err = cs.Writer(ctx, content.WithRef(ref))
444+
if err != nil {
445+
t.Fatalf("Failed to open writer: %+v", err)
446+
}
447+
448+
if _, err := w.Write(c1); err != nil {
449+
if err := w.Close(); err != nil {
450+
t.Errorf("close error: %+v", err)
451+
}
452+
t.Fatal(err)
453+
}
454+
455+
checkRefNotAvailable(ctx, t, cs, ref)
456+
457+
// Commit should fail due to already exists
458+
err = w.Commit(ctx, int64(len(c1)), d1)
459+
if err == nil {
460+
t.Fatalf("Expected already exists error")
461+
} else if !errdefs.IsAlreadyExists(err) {
462+
if err := w.Close(); err != nil {
463+
t.Errorf("close error: %+v", err)
464+
}
465+
t.Fatalf("Unexpected error: %+v", err)
466+
}
467+
468+
w, err = cs.Writer(ctx, content.WithRef(ref))
469+
if err != nil {
470+
t.Fatal(err)
471+
}
472+
473+
checkRefNotAvailable(ctx, t, cs, ref)
474+
475+
if err := w.Close(); err != nil {
476+
t.Fatalf("Close failed: %+v", err)
477+
}
478+
479+
// Create another writer with same reference to check available
480+
w, err = cs.Writer(ctx, content.WithRef(ref))
481+
if err != nil {
482+
t.Fatalf("Failed to open writer: %+v", err)
483+
}
484+
if err := w.Close(); err != nil {
485+
t.Fatalf("Close failed: %+v", err)
316486
}
317487
}
318488

0 commit comments

Comments
 (0)