Skip to content

Commit cbea9be

Browse files
committed
storage: store the diffID and use for validation
when copying a partial image, store the expected diffID so that it can be later used to validate the obtained layer stream. Signed-off-by: Giuseppe Scrivano <[email protected]>
1 parent 4be7a1e commit cbea9be

File tree

2 files changed

+98
-26
lines changed

2 files changed

+98
-26
lines changed

storage/storage_dest.go

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
541541
}
542542
s.lock.Unlock()
543543
// Note: commitLayer locks on-demand.
544-
if err := s.commitLayer(index, info, -1); err != nil {
544+
if stopQueue, err := s.commitLayer(index, info, -1); stopQueue || err != nil {
545545
return err
546546
}
547547
s.lock.Lock()
@@ -570,15 +570,17 @@ func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest
570570
// commitLayer commits the specified layer with the given index to the storage.
571571
// size can usually be -1; it can be provided if the layer is not known to be already present in uncompressedOrTocDigest.
572572
//
573+
// If the layer cannot be committed yet, the function returns (true, nil).
574+
//
573575
// Note that the previous layer is expected to already be committed.
574576
//
575577
// Caution: this function must be called without holding `s.lock`. Callers
576578
// must guarantee that, at any given time, at most one goroutine may execute
577579
// `commitLayer()`.
578-
func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) error {
580+
func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) (bool, error) {
579581
// Already committed? Return early.
580582
if _, alreadyCommitted := s.indexToStorageID[index]; alreadyCommitted {
581-
return nil
583+
return false, nil
582584
}
583585

584586
// Start with an empty string or the previous layer ID. Note that
@@ -592,7 +594,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
592594
// Carry over the previous ID for empty non-base layers.
593595
if info.emptyLayer {
594596
s.indexToStorageID[index] = &lastLayer
595-
return nil
597+
return false, nil
596598
}
597599

598600
// Check if there's already a layer with the ID that we'd give to the result of applying
@@ -613,14 +615,14 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
613615
CanSubstitute: false,
614616
})
615617
if err != nil {
616-
return fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err)
618+
return false, fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err)
617619
}
618620
if !has {
619-
return fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String())
621+
return false, fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String())
620622
}
621623
diffIDOrTOCDigest, haveDiffIDOrTOCDigest = s.getDiffIDOrTOCDigest(info.digest)
622624
if !haveDiffIDOrTOCDigest {
623-
return fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String())
625+
return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String())
624626
}
625627
}
626628
id := diffIDOrTOCDigest.Hex()
@@ -631,28 +633,57 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
631633
// There's already a layer that should have the right contents, just reuse it.
632634
lastLayer = layer.ID
633635
s.indexToStorageID[index] = &lastLayer
634-
return nil
636+
return false, nil
635637
}
636638

637639
s.lock.Lock()
638640
diffOutput, ok := s.diffOutputs[info.digest]
639641
s.lock.Unlock()
640642
if ok {
643+
if s.manifest == nil {
644+
logrus.Debugf("Skipping commit for TOC=%q, manifest not yet available", id)
645+
return true, nil
646+
}
647+
648+
man, err := manifest.FromBlob(s.manifest, manifest.GuessMIMEType(s.manifest))
649+
if err != nil {
650+
return false, fmt.Errorf("parsing manifest: %w", err)
651+
}
652+
653+
cb, err := s.getConfigBlob(man.ConfigInfo())
654+
if err != nil {
655+
return false, err
656+
}
657+
658+
// retrieve the expected uncompressed digest from the config blob.
659+
configOCI := &imgspecv1.Image{}
660+
if err := json.Unmarshal(cb, configOCI); err != nil {
661+
return false, err
662+
}
663+
if index >= len(configOCI.RootFS.DiffIDs) {
664+
return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index)
665+
}
666+
641667
layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
642668
if err != nil {
643-
return err
669+
return false, err
644670
}
645671

646-
// FIXME: what to do with the uncompressed digest?
647-
diffOutput.UncompressedDigest = info.digest
672+
// let the storage layer know what was the original uncompressed layer.
673+
flags := make(map[string]interface{})
674+
flags[expectedLayerDiffIDFlag] = configOCI.RootFS.DiffIDs[index]
675+
logrus.Debugf("Setting uncompressed digest to %q for layer %q", configOCI.RootFS.DiffIDs[index], id)
676+
options := &graphdriver.ApplyDiffWithDifferOpts{
677+
Flags: flags,
678+
}
648679

649-
if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil {
680+
if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, options); err != nil {
650681
_ = s.imageRef.transport.store.Delete(layer.ID)
651-
return err
682+
return false, err
652683
}
653684

654685
s.indexToStorageID[index] = &layer.ID
655-
return nil
686+
return false, nil
656687
}
657688

658689
s.lock.Lock()
@@ -661,11 +692,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
661692
if ok {
662693
layer, err := al.PutAs(id, lastLayer, nil)
663694
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
664-
return fmt.Errorf("failed to put layer from digest and labels: %w", err)
695+
return false, fmt.Errorf("failed to put layer from digest and labels: %w", err)
665696
}
666697
lastLayer = layer.ID
667698
s.indexToStorageID[index] = &lastLayer
668-
return nil
699+
return false, nil
669700
}
670701

671702
// Check if we previously cached a file with that blob's contents. If we didn't,
@@ -686,7 +717,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
686717
}
687718
}
688719
if layer == "" {
689-
return fmt.Errorf("locating layer for blob %q: %w", info.digest, err2)
720+
return false, fmt.Errorf("locating layer for blob %q: %w", info.digest, err2)
690721
}
691722
// Read the layer's contents.
692723
noCompression := archive.Uncompressed
@@ -695,7 +726,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
695726
}
696727
diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions)
697728
if err2 != nil {
698-
return fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2)
729+
return false, fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2)
699730
}
700731
// Copy the layer diff to a file. Diff() takes a lock that it holds
701732
// until the ReadCloser that it returns is closed, and PutLayer() wants
@@ -705,7 +736,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
705736
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0o600)
706737
if err != nil {
707738
diff.Close()
708-
return fmt.Errorf("creating temporary file %q: %w", filename, err)
739+
return false, fmt.Errorf("creating temporary file %q: %w", filename, err)
709740
}
710741
// Copy the data to the file.
711742
// TODO: This can take quite some time, and should ideally be cancellable using
@@ -714,7 +745,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
714745
diff.Close()
715746
file.Close()
716747
if err != nil {
717-
return fmt.Errorf("storing blob to file %q: %w", filename, err)
748+
return false, fmt.Errorf("storing blob to file %q: %w", filename, err)
718749
}
719750
// Make sure that we can find this file later, should we need the layer's
720751
// contents again.
@@ -725,7 +756,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
725756
// Read the cached blob and use it as a diff.
726757
file, err := os.Open(filename)
727758
if err != nil {
728-
return fmt.Errorf("opening file %q: %w", filename, err)
759+
return false, fmt.Errorf("opening file %q: %w", filename, err)
729760
}
730761
defer file.Close()
731762
// Build the new layer using the diff, regardless of where it came from.
@@ -735,11 +766,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
735766
UncompressedDigest: diffIDOrTOCDigest,
736767
}, file)
737768
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
738-
return fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
769+
return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
739770
}
740771

741772
s.indexToStorageID[index] = &layer.ID
742-
return nil
773+
return false, nil
743774
}
744775

745776
// Commit marks the process of storing the image as successful and asks for the image to be persisted.
@@ -786,11 +817,13 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
786817

787818
// Extract, commit, or find the layers.
788819
for i, blob := range layerBlobs {
789-
if err := s.commitLayer(i, addedLayerInfo{
820+
if stopQueue, err := s.commitLayer(i, addedLayerInfo{
790821
digest: blob.Digest,
791822
emptyLayer: blob.EmptyLayer,
792823
}, blob.Size); err != nil {
793824
return err
825+
} else if stopQueue {
826+
return fmt.Errorf("Internal error: storageImageDestination.Commit(): commitLayer() not ready to commit for layer %q", blob.Digest)
794827
}
795828
}
796829
var lastLayer string

storage/storage_src.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,14 @@ type storageImageSource struct {
4242
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions
4343
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
4444
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice
45+
46+
// fromDigestToTOC converts from a layer diffID returned by LayerInfosForCopy to the TOC digest. The TOC digest
47+
// must be used for lookups in the underyling storage.
48+
fromDigestToTOC map[digest.Digest]digest.Digest
4549
}
4650

51+
const expectedLayerDiffIDFlag = "expected-layer-diffid"
52+
4753
// newImageSource sets up an image for reading.
4854
func newImageSource(sys *types.SystemContext, imageRef storageReference) (*storageImageSource, error) {
4955
// First, locate the image.
@@ -65,6 +71,7 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora
6571
layerPosition: make(map[digest.Digest]int),
6672
SignatureSizes: []int{},
6773
SignaturesSizes: make(map[digest.Digest][]int),
74+
fromDigestToTOC: make(map[digest.Digest]digest.Digest),
6875
}
6976
image.Compat = impl.AddCompat(image)
7077
if img.Metadata != "" {
@@ -91,6 +98,18 @@ func (s *storageImageSource) Close() error {
9198
func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (rc io.ReadCloser, n int64, err error) {
9299
// We need a valid digest value.
93100
digest := info.Digest
101+
102+
digestIsTOC := false
103+
104+
// If the digest was overriden by LayerInfosForCopy, then we need to use the TOC digest
105+
// to retrieve it from the storage.
106+
s.getBlobMutex.Lock()
107+
if v, ok := s.fromDigestToTOC[digest]; ok {
108+
digest = v
109+
digestIsTOC = true
110+
}
111+
s.getBlobMutex.Unlock()
112+
94113
err = digest.Validate()
95114
if err != nil {
96115
return nil, 0, err
@@ -103,7 +122,12 @@ func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, c
103122
// Check if the blob corresponds to a diff that was used to initialize any layers. Our
104123
// callers should try to retrieve layers using their uncompressed digests, so no need to
105124
// check if they're using one of the compressed digests, which we can't reproduce anyway.
106-
layers, _ := s.imageRef.transport.store.LayersByUncompressedDigest(digest)
125+
var layers []storage.Layer
126+
if digestIsTOC {
127+
layers, _ = s.imageRef.transport.store.LayersByTOCDigest(digest)
128+
} else {
129+
layers, _ = s.imageRef.transport.store.LayersByUncompressedDigest(digest)
130+
}
107131

108132
// If it's not a layer, then it must be a data item.
109133
if len(layers) == 0 {
@@ -273,8 +297,23 @@ func (s *storageImageSource) LayerInfosForCopy(ctx context.Context, instanceDige
273297
if layer.UncompressedSize < 0 {
274298
return nil, fmt.Errorf("uncompressed size for layer %q is unknown", layerID)
275299
}
300+
301+
blobDigest := layer.UncompressedDigest
302+
303+
if layer.Flags != nil {
304+
if v, ok := layer.Flags[expectedLayerDiffIDFlag]; ok {
305+
if expectedDigest, ok := v.(string); ok {
306+
// if the layer is stored by its TOC, report the expected diffID as the layer Digest
307+
// but store the TOC digest so we can later retrieve it from the storage.
308+
blobDigest = digest.Digest(expectedDigest)
309+
s.getBlobMutex.Lock()
310+
s.fromDigestToTOC[blobDigest] = layer.TOCDigest
311+
s.getBlobMutex.Unlock()
312+
}
313+
}
314+
}
276315
blobInfo := types.BlobInfo{
277-
Digest: layer.UncompressedDigest,
316+
Digest: blobDigest,
278317
Size: layer.UncompressedSize,
279318
MediaType: uncompressedLayerType,
280319
}

0 commit comments

Comments
 (0)