@@ -328,6 +328,10 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
328328 return err
329329 }
330330
331+ if err := removeIngestLease (ctx , tx , ref ); err != nil {
332+ return err
333+ }
334+
331335 // if not shared content, delete active ingest on backend
332336 if expected == "" {
333337 return cs .Store .Abort (ctx , bref )
@@ -395,6 +399,11 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (
395399 return err
396400 }
397401
402+ leased , err := addIngestLease (ctx , tx , wOpts .Ref )
403+ if err != nil {
404+ return err
405+ }
406+
398407 brefb := bkt .Get (bucketKeyRef )
399408 if brefb == nil {
400409 sid , err := bkt .NextSequence ()
@@ -409,6 +418,18 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (
409418 } else {
410419 bref = string (brefb )
411420 }
421+ if ! leased {
422+ // Add timestamp to allow aborting once stale
423+ // When lease is set the ingest shoudl be aborted
424+ // after lease it belonged to is deleted.
425+ // Expiration can be configurable in the future to
426+ // give more control to the daemon, however leases
427+ // already give users more control of expiration.
428+ expireAt := time .Now ().UTC ().Add (24 * 3600 * time .Second )
429+ if err := writeExpireAt (expireAt , bkt ); err != nil {
430+ return err
431+ }
432+ }
412433
413434 if shared {
414435 if err := bkt .Put (bucketKeyExpected , []byte (wOpts .Desc .Digest )); err != nil {
@@ -543,6 +564,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
543564 if err != nil {
544565 return err
545566 }
567+ if err := removeIngestLease (ctx , tx , nw .ref ); err != nil {
568+ return err
569+ }
546570 return addContentLease (ctx , tx , dgst )
547571 })
548572}
@@ -697,6 +721,30 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
697721 return bkt .Put (bucketKeySize , sizeEncoded )
698722}
699723
724+ func readExpireAt (bkt * bolt.Bucket ) (* time.Time , error ) {
725+ v := bkt .Get (bucketKeyExpireAt )
726+ if v == nil {
727+ return nil , nil
728+ }
729+ t := & time.Time {}
730+ if err := t .UnmarshalBinary (v ); err != nil {
731+ return nil , err
732+ }
733+ return t , nil
734+ }
735+
736+ func writeExpireAt (expire time.Time , bkt * bolt.Bucket ) error {
737+ expireAt , err := expire .MarshalBinary ()
738+ if err != nil {
739+ return err
740+ }
741+ if err := bkt .Put (bucketKeyExpireAt , expireAt ); err != nil {
742+ return err
743+ }
744+
745+ return nil
746+ }
747+
700748func (cs * contentStore ) garbageCollect (ctx context.Context ) (d time.Duration , err error ) {
701749 cs .l .Lock ()
702750 t1 := time .Now ()
@@ -707,7 +755,8 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
707755 cs .l .Unlock ()
708756 }()
709757
710- seen := map [string ]struct {}{}
758+ contentSeen := map [string ]struct {}{}
759+ ingestSeen := map [string ]struct {}{}
711760 if err := cs .db .View (func (tx * bolt.Tx ) error {
712761 v1bkt := tx .Bucket (bucketKeyVersion )
713762 if v1bkt == nil {
@@ -730,7 +779,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
730779 if bbkt != nil {
731780 if err := bbkt .ForEach (func (ck , cv []byte ) error {
732781 if cv == nil {
733- seen [string (ck )] = struct {}{}
782+ contentSeen [string (ck )] = struct {}{}
734783 }
735784 return nil
736785 }); err != nil {
@@ -742,9 +791,17 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
742791 if ibkt != nil {
743792 if err := ibkt .ForEach (func (ref , v []byte ) error {
744793 if v == nil {
745- expected := ibkt .Bucket (ref ).Get (bucketKeyExpected )
794+ bkt := ibkt .Bucket (ref )
795+ // expected here may be from a different namespace
796+ // so much be explicitly retained from the ingest
797+ // in case it was removed from the other namespace
798+ expected := bkt .Get (bucketKeyExpected )
746799 if len (expected ) > 0 {
747- seen [string (expected )] = struct {}{}
800+ contentSeen [string (expected )] = struct {}{}
801+ }
802+ bref := bkt .Get (bucketKeyRef )
803+ if len (bref ) > 0 {
804+ ingestSeen [string (bref )] = struct {}{}
748805 }
749806 }
750807 return nil
@@ -760,13 +817,48 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
760817 }
761818
762819 err = cs .Store .Walk (ctx , func (info content.Info ) error {
763- if _ , ok := seen [info .Digest .String ()]; ! ok {
820+ if _ , ok := contentSeen [info .Digest .String ()]; ! ok {
764821 if err := cs .Store .Delete (ctx , info .Digest ); err != nil {
765822 return err
766823 }
767824 log .G (ctx ).WithField ("digest" , info .Digest ).Debug ("removed content" )
768825 }
769826 return nil
770827 })
828+ if err != nil {
829+ return
830+ }
831+
832+ // If the content store has implemented a more efficient walk function
833+ // then use that else fallback to reading all statuses which may
834+ // cause reading of unneeded metadata.
835+ type statusWalker interface {
836+ WalkStatusRefs (context.Context , func (string ) error ) error
837+ }
838+ if w , ok := cs .Store .(statusWalker ); ok {
839+ err = w .WalkStatusRefs (ctx , func (ref string ) error {
840+ if _ , ok := ingestSeen [ref ]; ! ok {
841+ if err := cs .Store .Abort (ctx , ref ); err != nil {
842+ return err
843+ }
844+ log .G (ctx ).WithField ("ref" , ref ).Debug ("cleanup aborting ingest" )
845+ }
846+ return nil
847+ })
848+ } else {
849+ var statuses []content.Status
850+ statuses , err = cs .Store .ListStatuses (ctx )
851+ if err != nil {
852+ return 0 , err
853+ }
854+ for _ , status := range statuses {
855+ if _ , ok := ingestSeen [status .Ref ]; ! ok {
856+ if err = cs .Store .Abort (ctx , status .Ref ); err != nil {
857+ return
858+ }
859+ log .G (ctx ).WithField ("ref" , status .Ref ).Debug ("cleanup aborting ingest" )
860+ }
861+ }
862+ }
771863 return
772864}
0 commit comments