Skip to content

Commit e63768e

Browse files
authored
Merge pull request #2331 from dmcgowan/fix-image-remove-race
Fix image pull and remove race
2 parents 257d74f + f0b3d5a commit e63768e

4 files changed

Lines changed: 129 additions & 71 deletions

File tree

client.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -338,38 +338,43 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
338338
}
339339
}
340340

341-
imgrec := images.Image{
342-
Name: name,
343-
Target: desc,
344-
Labels: pullCtx.Labels,
345-
}
346-
347-
is := c.ImageService()
348-
if created, err := is.Create(ctx, imgrec); err != nil {
349-
if !errdefs.IsAlreadyExists(err) {
350-
return nil, err
351-
}
352-
353-
updated, err := is.Update(ctx, imgrec)
354-
if err != nil {
355-
return nil, err
356-
}
357-
358-
imgrec = updated
359-
} else {
360-
imgrec = created
361-
}
362-
363341
img := &image{
364342
client: c,
365-
i: imgrec,
343+
i: images.Image{
344+
Name: name,
345+
Target: desc,
346+
Labels: pullCtx.Labels,
347+
},
366348
}
349+
367350
if pullCtx.Unpack {
368351
if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil {
369352
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
370353
}
371354
}
372-
return img, nil
355+
356+
is := c.ImageService()
357+
for {
358+
if created, err := is.Create(ctx, img.i); err != nil {
359+
if !errdefs.IsAlreadyExists(err) {
360+
return nil, err
361+
}
362+
363+
updated, err := is.Update(ctx, img.i)
364+
if err != nil {
365+
// if image was removed, try create again
366+
if errdefs.IsNotFound(err) {
367+
continue
368+
}
369+
return nil, err
370+
}
371+
372+
img.i = updated
373+
} else {
374+
img.i = created
375+
}
376+
return img, nil
377+
}
373378
}
374379

375380
// Push uploads the provided content to a remote resource

cmd/ctr/commands/content/fetch.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,19 @@ Most of this is experimental and there are few leaps to make this work.`,
6161
var (
6262
ref = clicontext.Args().First()
6363
)
64-
_, err := Fetch(ref, clicontext)
64+
client, ctx, cancel, err := commands.NewClient(clicontext)
65+
if err != nil {
66+
return err
67+
}
68+
defer cancel()
69+
70+
_, err = Fetch(ctx, client, ref, clicontext)
6571
return err
6672
},
6773
}
6874

6975
// Fetch loads all resources into the content store and returns the image
70-
func Fetch(ref string, cliContext *cli.Context) (containerd.Image, error) {
71-
client, ctx, cancel, err := commands.NewClient(cliContext)
72-
if err != nil {
73-
return nil, err
74-
}
75-
defer cancel()
76-
76+
func Fetch(ctx context.Context, client *containerd.Client, ref string, cliContext *cli.Context) (containerd.Image, error) {
7777
resolver, err := commands.GetResolver(ctx, cliContext)
7878
if err != nil {
7979
return nil, err

cmd/ctr/commands/images/pull.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,20 @@ command. As part of this process, we do the following:
5757
if ref == "" {
5858
return fmt.Errorf("please provide an image reference to pull")
5959
}
60-
ctx, cancel := commands.AppContext(context)
60+
61+
client, ctx, cancel, err := commands.NewClient(context)
62+
if err != nil {
63+
return err
64+
}
6165
defer cancel()
6266

63-
img, err := content.Fetch(ref, context)
67+
ctx, done, err := client.WithLease(ctx)
68+
if err != nil {
69+
return err
70+
}
71+
defer done(ctx)
72+
73+
img, err := content.Fetch(ctx, client, ref, context)
6474
if err != nil {
6575
return err
6676
}

rootfs/apply.go

Lines changed: 80 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/containerd/containerd/diff"
2727
"github.com/containerd/containerd/errdefs"
2828
"github.com/containerd/containerd/log"
29+
"github.com/containerd/containerd/mount"
2930
"github.com/containerd/containerd/snapshots"
3031
"github.com/opencontainers/go-digest"
3132
"github.com/opencontainers/image-spec/identity"
@@ -47,76 +48,118 @@ type Layer struct {
4748
// Layers are applied in order they are given, making the first layer the
4849
// bottom-most layer in the layer chain.
4950
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier) (digest.Digest, error) {
50-
var chain []digest.Digest
51-
for _, layer := range layers {
52-
if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil {
53-
// TODO: possibly wait and retry if extraction of same chain id was in progress
54-
return "", err
51+
chain := make([]digest.Digest, len(layers))
52+
for i, layer := range layers {
53+
chain[i] = layer.Diff.Digest
54+
}
55+
chainID := identity.ChainID(chain)
56+
57+
// Just stat top layer, remaining layers will have their existence checked
58+
// on prepare. Calling prepare on upper layers first guarantees that upper
59+
// layers are not removed while calling stat on lower layers
60+
_, err := sn.Stat(ctx, chainID.String())
61+
if err != nil {
62+
if !errdefs.IsNotFound(err) {
63+
return "", errors.Wrapf(err, "failed to stat snapshot %s", chainID)
5564
}
5665

57-
chain = append(chain, layer.Diff.Digest)
66+
if err := applyLayers(ctx, layers, chain, sn, a); err != nil && !errdefs.IsAlreadyExists(err) {
67+
return "", err
68+
}
5869
}
59-
return identity.ChainID(chain), nil
70+
71+
return chainID, nil
6072
}
6173

6274
// ApplyLayer applies a single layer on top of the given provided layer chain,
6375
// using the provided snapshotter and applier. If the layer was unpacked true
6476
// is returned, if the layer already exists false is returned.
6577
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) (bool, error) {
6678
var (
67-
parent = identity.ChainID(chain)
68-
chainID = identity.ChainID(append(chain, layer.Diff.Digest))
69-
diff ocispec.Descriptor
79+
chainID = identity.ChainID(append(chain, layer.Diff.Digest)).String()
80+
applied bool
7081
)
82+
if _, err := sn.Stat(ctx, chainID); err != nil {
83+
if !errdefs.IsNotFound(err) {
84+
return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID)
85+
}
7186

72-
_, err := sn.Stat(ctx, chainID.String())
73-
if err == nil {
74-
log.G(ctx).Debugf("Extraction not needed, layer snapshot %s exists", chainID)
75-
return false, nil
76-
} else if !errdefs.IsNotFound(err) {
77-
return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID)
87+
if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts...); err != nil {
88+
if !errdefs.IsAlreadyExists(err) {
89+
return false, err
90+
}
91+
} else {
92+
applied = true
93+
}
7894
}
95+
return applied, nil
96+
}
7997

80-
key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
98+
func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) error {
99+
var (
100+
parent = identity.ChainID(chain[:len(chain)-1])
101+
chainID = identity.ChainID(chain)
102+
layer = layers[len(layers)-1]
103+
diff ocispec.Descriptor
104+
key string
105+
mounts []mount.Mount
106+
err error
107+
)
81108

82-
// Prepare snapshot with from parent, label as root
83-
mounts, err := sn.Prepare(ctx, key, parent.String(), opts...)
84-
if err != nil {
85-
//TODO: If is snapshot exists error, retry
86-
return false, errors.Wrapf(err, "failed to prepare extraction snapshot %q", key)
109+
for {
110+
key = fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
111+
112+
// Prepare snapshot with from parent, label as root
113+
mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
114+
if err != nil {
115+
if errdefs.IsNotFound(err) && len(layers) > 1 {
116+
if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a); err != nil {
117+
if !errdefs.IsAlreadyExists(err) {
118+
return err
119+
}
120+
}
121+
// Do no try applying layers again
122+
layers = nil
123+
continue
124+
} else if errdefs.IsAlreadyExists(err) {
125+
// Try a different key
126+
continue
127+
}
128+
129+
// Already exists should have the caller retry
130+
return errors.Wrapf(err, "failed to prepare extraction snapshot %q", key)
131+
132+
}
133+
break
87134
}
88135
defer func() {
89136
if err != nil {
90-
log.G(ctx).WithError(err).WithField("key", key).Infof("Apply failure, attempting cleanup")
137+
if !errdefs.IsAlreadyExists(err) {
138+
log.G(ctx).WithError(err).WithField("key", key).Infof("apply failure, attempting cleanup")
139+
}
140+
91141
if rerr := sn.Remove(ctx, key); rerr != nil {
92-
log.G(ctx).WithError(rerr).Warnf("Extraction snapshot %q removal failed", key)
142+
log.G(ctx).WithError(rerr).WithField("key", key).Warnf("extraction snapshot removal failed")
93143
}
94144
}
95145
}()
96146

97147
diff, err = a.Apply(ctx, layer.Blob, mounts)
98148
if err != nil {
99-
return false, errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
149+
err = errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
150+
return err
100151
}
101152
if diff.Digest != layer.Diff.Digest {
102153
err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest)
103-
return false, err
154+
return err
104155
}
105156

106157
if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil {
107-
if !errdefs.IsAlreadyExists(err) {
108-
return false, errors.Wrapf(err, "failed to commit snapshot %s", key)
109-
}
110-
111-
// Destination already exists, cleanup key and return without error
112-
err = nil
113-
if err := sn.Remove(ctx, key); err != nil {
114-
return false, errors.Wrapf(err, "failed to cleanup aborted apply %s", key)
115-
}
116-
return false, nil
158+
err = errors.Wrapf(err, "failed to commit snapshot %s", key)
159+
return err
117160
}
118161

119-
return true, nil
162+
return nil
120163
}
121164

122165
func uniquePart() string {

0 commit comments

Comments
 (0)