Skip to content

Commit dfc9991

Browse files
committed
Add content ingests to lease and gc
Allow content ingests to be cleaned up during gc. Use a default expiration on content ingests or make use of the lease expiration when provided. Signed-off-by: Derek McGowan <[email protected]>
1 parent 92d147e commit dfc9991

8 files changed

Lines changed: 404 additions & 23 deletions

File tree

content/local/store.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,40 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Statu
322322
return active, nil
323323
}
324324

325+
// WalkStatusRefs is used to walk all status references
326+
// Failed status reads will be logged and ignored, if
327+
// this function is called while references are being altered,
328+
// these error messages may be produced.
329+
func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error {
330+
fp, err := os.Open(filepath.Join(s.root, "ingest"))
331+
if err != nil {
332+
return err
333+
}
334+
335+
defer fp.Close()
336+
337+
fis, err := fp.Readdir(-1)
338+
if err != nil {
339+
return err
340+
}
341+
342+
for _, fi := range fis {
343+
rf := filepath.Join(s.root, "ingest", fi.Name(), "ref")
344+
345+
ref, err := readFileString(rf)
346+
if err != nil {
347+
log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref")
348+
continue
349+
}
350+
351+
if err := fn(ref); err != nil {
352+
return err
353+
}
354+
}
355+
356+
return nil
357+
}
358+
325359
// status works like stat above except uses the path to the ingest.
326360
func (s *store) status(ingestPath string) (content.Status, error) {
327361
dp := filepath.Join(ingestPath, "data")

metadata/buckets.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ var (
7272
bucketKeyCreatedAt = []byte("createdat")
7373
bucketKeyExpected = []byte("expected")
7474
bucketKeyRef = []byte("ref")
75+
bucketKeyExpireAt = []byte("expireat")
7576

7677
deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2
7778
)

metadata/content.go

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,10 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
328328
return err
329329
}
330330

331+
if err := removeIngestLease(ctx, tx, ref); err != nil {
332+
return err
333+
}
334+
331335
// if not shared content, delete active ingest on backend
332336
if expected == "" {
333337
return cs.Store.Abort(ctx, bref)
@@ -395,6 +399,11 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (
395399
return err
396400
}
397401

402+
leased, err := addIngestLease(ctx, tx, wOpts.Ref)
403+
if err != nil {
404+
return err
405+
}
406+
398407
brefb := bkt.Get(bucketKeyRef)
399408
if brefb == nil {
400409
sid, err := bkt.NextSequence()
@@ -409,6 +418,18 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (
409418
} else {
410419
bref = string(brefb)
411420
}
421+
if !leased {
422+
// Add timestamp to allow aborting once stale
423+
// When lease is set the ingest shoudl be aborted
424+
// after lease it belonged to is deleted.
425+
// Expiration can be configurable in the future to
426+
// give more control to the daemon, however leases
427+
// already give users more control of expiration.
428+
expireAt := time.Now().UTC().Add(24 * 3600 * time.Second)
429+
if err := writeExpireAt(expireAt, bkt); err != nil {
430+
return err
431+
}
432+
}
412433

413434
if shared {
414435
if err := bkt.Put(bucketKeyExpected, []byte(wOpts.Desc.Digest)); err != nil {
@@ -543,6 +564,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
543564
if err != nil {
544565
return err
545566
}
567+
if err := removeIngestLease(ctx, tx, nw.ref); err != nil {
568+
return err
569+
}
546570
return addContentLease(ctx, tx, dgst)
547571
})
548572
}
@@ -697,6 +721,30 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
697721
return bkt.Put(bucketKeySize, sizeEncoded)
698722
}
699723

724+
func readExpireAt(bkt *bolt.Bucket) (*time.Time, error) {
725+
v := bkt.Get(bucketKeyExpireAt)
726+
if v == nil {
727+
return nil, nil
728+
}
729+
t := &time.Time{}
730+
if err := t.UnmarshalBinary(v); err != nil {
731+
return nil, err
732+
}
733+
return t, nil
734+
}
735+
736+
func writeExpireAt(expire time.Time, bkt *bolt.Bucket) error {
737+
expireAt, err := expire.MarshalBinary()
738+
if err != nil {
739+
return err
740+
}
741+
if err := bkt.Put(bucketKeyExpireAt, expireAt); err != nil {
742+
return err
743+
}
744+
745+
return nil
746+
}
747+
700748
func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, err error) {
701749
cs.l.Lock()
702750
t1 := time.Now()
@@ -707,7 +755,8 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
707755
cs.l.Unlock()
708756
}()
709757

710-
seen := map[string]struct{}{}
758+
contentSeen := map[string]struct{}{}
759+
ingestSeen := map[string]struct{}{}
711760
if err := cs.db.View(func(tx *bolt.Tx) error {
712761
v1bkt := tx.Bucket(bucketKeyVersion)
713762
if v1bkt == nil {
@@ -730,7 +779,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
730779
if bbkt != nil {
731780
if err := bbkt.ForEach(func(ck, cv []byte) error {
732781
if cv == nil {
733-
seen[string(ck)] = struct{}{}
782+
contentSeen[string(ck)] = struct{}{}
734783
}
735784
return nil
736785
}); err != nil {
@@ -742,9 +791,17 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
742791
if ibkt != nil {
743792
if err := ibkt.ForEach(func(ref, v []byte) error {
744793
if v == nil {
745-
expected := ibkt.Bucket(ref).Get(bucketKeyExpected)
794+
bkt := ibkt.Bucket(ref)
795+
// expected here may be from a different namespace
796+
// so much be explicitly retained from the ingest
797+
// in case it was removed from the other namespace
798+
expected := bkt.Get(bucketKeyExpected)
746799
if len(expected) > 0 {
747-
seen[string(expected)] = struct{}{}
800+
contentSeen[string(expected)] = struct{}{}
801+
}
802+
bref := bkt.Get(bucketKeyRef)
803+
if len(bref) > 0 {
804+
ingestSeen[string(bref)] = struct{}{}
748805
}
749806
}
750807
return nil
@@ -760,13 +817,48 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
760817
}
761818

762819
err = cs.Store.Walk(ctx, func(info content.Info) error {
763-
if _, ok := seen[info.Digest.String()]; !ok {
820+
if _, ok := contentSeen[info.Digest.String()]; !ok {
764821
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
765822
return err
766823
}
767824
log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
768825
}
769826
return nil
770827
})
828+
if err != nil {
829+
return
830+
}
831+
832+
// If the content store has implemented a more efficient walk function
833+
// then use that else fallback to reading all statuses which may
834+
// cause reading of unneeded metadata.
835+
type statusWalker interface {
836+
WalkStatusRefs(context.Context, func(string) error) error
837+
}
838+
if w, ok := cs.Store.(statusWalker); ok {
839+
err = w.WalkStatusRefs(ctx, func(ref string) error {
840+
if _, ok := ingestSeen[ref]; !ok {
841+
if err := cs.Store.Abort(ctx, ref); err != nil {
842+
return err
843+
}
844+
log.G(ctx).WithField("ref", ref).Debug("cleanup aborting ingest")
845+
}
846+
return nil
847+
})
848+
} else {
849+
var statuses []content.Status
850+
statuses, err = cs.Store.ListStatuses(ctx)
851+
if err != nil {
852+
return 0, err
853+
}
854+
for _, status := range statuses {
855+
if _, ok := ingestSeen[status.Ref]; !ok {
856+
if err = cs.Store.Abort(ctx, status.Ref); err != nil {
857+
return
858+
}
859+
log.G(ctx).WithField("ref", status.Ref).Debug("cleanup aborting ingest")
860+
}
861+
}
862+
}
771863
return
772864
}

metadata/content_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ func TestContentLeased(t *testing.T) {
8989
if err := checkContentLeased(lctx, db, expected); err != nil {
9090
t.Fatal("lease checked failed:", err)
9191
}
92+
if err := checkIngestLeased(lctx, db, "test-1"); err == nil {
93+
t.Fatal("test-1 should not be leased after write")
94+
} else if !errdefs.IsNotFound(err) {
95+
t.Fatal("lease checked failed:", err)
96+
}
9297

9398
lctx, _, err = createLease(ctx, db, "lease-2")
9499
if err != nil {
@@ -105,6 +110,48 @@ func TestContentLeased(t *testing.T) {
105110
if err := checkContentLeased(lctx, db, expected); err != nil {
106111
t.Fatal("lease checked failed:", err)
107112
}
113+
if err := checkIngestLeased(lctx, db, "test-2"); err == nil {
114+
t.Fatal("test-2 should not be leased")
115+
} else if !errdefs.IsNotFound(err) {
116+
t.Fatal("lease checked failed:", err)
117+
}
118+
}
119+
120+
func TestIngestLeased(t *testing.T) {
121+
ctx, db, cancel := testDB(t)
122+
defer cancel()
123+
124+
cs := db.ContentStore()
125+
126+
blob := []byte("any content")
127+
expected := digest.FromBytes(blob)
128+
129+
lctx, _, err := createLease(ctx, db, "lease-1")
130+
if err != nil {
131+
t.Fatal(err)
132+
}
133+
134+
w, err := cs.Writer(lctx,
135+
content.WithRef("test-1"),
136+
content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected}))
137+
if err != nil {
138+
t.Fatal(err)
139+
}
140+
err = checkIngestLeased(lctx, db, "test-1")
141+
w.Close()
142+
if err != nil {
143+
t.Fatal("lease checked failed:", err)
144+
}
145+
146+
if err := cs.Abort(lctx, "test-1"); err != nil {
147+
t.Fatal(err)
148+
}
149+
150+
if err := checkIngestLeased(lctx, db, "test-1"); err == nil {
151+
t.Fatal("test-1 should not be leased after write")
152+
} else if !errdefs.IsNotFound(err) {
153+
t.Fatal("lease checked failed:", err)
154+
}
108155
}
109156

110157
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
@@ -146,3 +193,27 @@ func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
146193
return nil
147194
})
148195
}
196+
197+
func checkIngestLeased(ctx context.Context, db *DB, ref string) error {
198+
ns, ok := namespaces.Namespace(ctx)
199+
if !ok {
200+
return errors.New("no namespace in context")
201+
}
202+
lease, ok := leases.FromContext(ctx)
203+
if !ok {
204+
return errors.New("no lease in context")
205+
}
206+
207+
return db.View(func(tx *bolt.Tx) error {
208+
bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectIngests)
209+
if bkt == nil {
210+
return errors.Wrapf(errdefs.ErrNotFound, "bucket not found %s", lease)
211+
}
212+
v := bkt.Get([]byte(ref))
213+
if v == nil {
214+
return errors.Wrap(errdefs.ErrNotFound, "object not leased")
215+
}
216+
217+
return nil
218+
})
219+
}

metadata/db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
275275
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
276276
m.dirtySS[n.Key[:idx]] = struct{}{}
277277
}
278-
} else if n.Type == ResourceContent {
278+
} else if n.Type == ResourceContent || n.Type == ResourceIngest {
279279
m.dirtyCS = true
280280
}
281281
return remove(ctx, tx, n)

0 commit comments

Comments
 (0)