Skip to content

Commit 60f73b6

Browse files
committed
Support simultaneous image unpack.
Signed-off-by: Lantao Liu <[email protected]>
1 parent ea13c9f commit 60f73b6

3 files changed

Lines changed: 270 additions & 3 deletions

File tree

client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ type RemoteContext struct {
294294
PlatformMatcher platforms.MatchComparer
295295

296296
// Unpack is done after an image is pulled to extract into a snapshotter.
297+
// It is done simultaneously for schema 2 images when they are pulled.
297298
// If an image is not unpacked on pull, it can be unpacked any time
298299
// afterwards. Unpacking is required to run an image.
299300
Unpack bool

pull.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232

3333
// Pull downloads the provided content into containerd's content store
3434
// and returns a platform specific image object
35-
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
35+
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) {
3636
pullCtx := defaultRemoteContext()
3737
for _, o := range opts {
3838
if err := o(c, pullCtx); err != nil {
@@ -61,6 +61,30 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
6161
}
6262
defer done(ctx)
6363

64+
var unpacks int32
65+
if pullCtx.Unpack {
66+
// unpacker only supports schema 2 image, for schema 1 this is noop.
67+
u, err := c.newUnpacker(ctx, pullCtx)
68+
if err != nil {
69+
return nil, errors.Wrap(err, "create unpacker")
70+
}
71+
unpackWrapper, eg := u.handlerWrapper(ctx, &unpacks)
72+
defer func() {
73+
if err := eg.Wait(); err != nil {
74+
if retErr == nil {
75+
retErr = errors.Wrap(err, "unpack")
76+
}
77+
}
78+
}()
79+
wrapper := pullCtx.HandlerWrapper
80+
pullCtx.HandlerWrapper = func(h images.Handler) images.Handler {
81+
if wrapper == nil {
82+
return unpackWrapper(h)
83+
}
84+
return wrapper(unpackWrapper(h))
85+
}
86+
}
87+
6488
img, err := c.fetch(ctx, pullCtx, ref, 1)
6589
if err != nil {
6690
return nil, err
@@ -69,8 +93,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
6993
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
7094

7195
if pullCtx.Unpack {
72-
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
73-
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
96+
if unpacks == 0 {
97+
// Try to unpack is none is done previously.
98+
// This is at least required for schema 1 image.
99+
if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil {
100+
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
101+
}
74102
}
75103
}
76104

unpacker.go

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package containerd
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"sync"
24+
"sync/atomic"
25+
26+
"github.com/containerd/containerd/content"
27+
"github.com/containerd/containerd/images"
28+
"github.com/containerd/containerd/log"
29+
"github.com/containerd/containerd/rootfs"
30+
"github.com/opencontainers/go-digest"
31+
"github.com/opencontainers/image-spec/identity"
32+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
33+
"github.com/pkg/errors"
34+
"github.com/sirupsen/logrus"
35+
"golang.org/x/sync/errgroup"
36+
)
37+
38+
type layerState struct {
39+
layer rootfs.Layer
40+
downloaded bool
41+
unpacked bool
42+
}
43+
44+
type unpacker struct {
45+
updateCh chan ocispec.Descriptor
46+
snapshotter string
47+
config UnpackConfig
48+
c *Client
49+
}
50+
51+
func (c *Client) newUnpacker(ctx context.Context, rCtx *RemoteContext) (*unpacker, error) {
52+
snapshotter, err := c.resolveSnapshotterName(ctx, rCtx.Snapshotter)
53+
if err != nil {
54+
return nil, err
55+
}
56+
var config UnpackConfig
57+
for _, o := range rCtx.UnpackOpts {
58+
if err := o(ctx, &config); err != nil {
59+
return nil, err
60+
}
61+
}
62+
return &unpacker{
63+
updateCh: make(chan ocispec.Descriptor, 128),
64+
snapshotter: snapshotter,
65+
config: config,
66+
c: c,
67+
}, nil
68+
}
69+
70+
func (u *unpacker) unpack(ctx context.Context, config ocispec.Descriptor, layers []ocispec.Descriptor) error {
71+
p, err := content.ReadBlob(ctx, u.c.ContentStore(), config)
72+
if err != nil {
73+
return err
74+
}
75+
76+
var i ocispec.Image
77+
if err := json.Unmarshal(p, &i); err != nil {
78+
return errors.Wrap(err, "unmarshal image config")
79+
}
80+
diffIDs := i.RootFS.DiffIDs
81+
if len(layers) != len(diffIDs) {
82+
return errors.Errorf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs))
83+
}
84+
85+
var (
86+
sn = u.c.SnapshotService(u.snapshotter)
87+
a = u.c.DiffService()
88+
cs = u.c.ContentStore()
89+
90+
states []layerState
91+
chain []digest.Digest
92+
)
93+
for i, desc := range layers {
94+
states = append(states, layerState{
95+
layer: rootfs.Layer{
96+
Blob: desc,
97+
Diff: ocispec.Descriptor{
98+
MediaType: ocispec.MediaTypeImageLayer,
99+
Digest: diffIDs[i],
100+
},
101+
},
102+
})
103+
}
104+
for {
105+
var layer ocispec.Descriptor
106+
select {
107+
case layer = <-u.updateCh:
108+
case <-ctx.Done():
109+
return ctx.Err()
110+
}
111+
log.G(ctx).WithField("desc", layer).Debug("layer downloaded")
112+
for i := range states {
113+
if states[i].layer.Blob.Digest != layer.Digest {
114+
continue
115+
}
116+
states[i].downloaded = true
117+
break
118+
}
119+
for i := range states {
120+
if !states[i].downloaded {
121+
break
122+
}
123+
if states[i].unpacked {
124+
continue
125+
}
126+
127+
log.G(ctx).WithFields(logrus.Fields{
128+
"desc": states[i].layer.Blob,
129+
"diff": states[i].layer.Diff,
130+
}).Debug("unpack layer")
131+
132+
unpacked, err := rootfs.ApplyLayerWithOpts(ctx, states[i].layer, chain, sn, a,
133+
u.config.SnapshotOpts, u.config.ApplyOpts)
134+
if err != nil {
135+
return err
136+
}
137+
138+
if unpacked {
139+
// Set the uncompressed label after the uncompressed
140+
// digest has been verified through apply.
141+
cinfo := content.Info{
142+
Digest: states[i].layer.Blob.Digest,
143+
Labels: map[string]string{
144+
"containerd.io/uncompressed": states[i].layer.Diff.Digest.String(),
145+
},
146+
}
147+
if _, err := cs.Update(ctx, cinfo, "labels.containerd.io/uncompressed"); err != nil {
148+
return err
149+
}
150+
}
151+
152+
chain = append(chain, states[i].layer.Diff.Digest)
153+
states[i].unpacked = true
154+
log.G(ctx).WithFields(logrus.Fields{
155+
"desc": states[i].layer.Blob,
156+
"diff": states[i].layer.Diff,
157+
}).Debug("layer unpacked")
158+
}
159+
// Check whether all layers are unpacked.
160+
if states[len(states)-1].unpacked {
161+
break
162+
}
163+
}
164+
165+
chainID := identity.ChainID(chain).String()
166+
cinfo := content.Info{
167+
Digest: config.Digest,
168+
Labels: map[string]string{
169+
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", u.snapshotter): chainID,
170+
},
171+
}
172+
_, err = cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", u.snapshotter))
173+
if err != nil {
174+
return err
175+
}
176+
log.G(ctx).WithFields(logrus.Fields{
177+
"config": config.Digest,
178+
"chainID": chainID,
179+
}).Debug("image unpacked")
180+
return nil
181+
}
182+
183+
func (u *unpacker) handlerWrapper(uctx context.Context, unpacks *int32) (func(images.Handler) images.Handler, *errgroup.Group) {
184+
eg, uctx := errgroup.WithContext(uctx)
185+
return func(f images.Handler) images.Handler {
186+
var (
187+
lock sync.Mutex
188+
layers []ocispec.Descriptor
189+
schema1 bool
190+
)
191+
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
192+
children, err := f.Handle(ctx, desc)
193+
if err != nil {
194+
return children, err
195+
}
196+
197+
// `Pull` only supports one platform, so there is only
198+
// one manifest to handle, and manifest list can be
199+
// safely skipped.
200+
// TODO: support multi-platform unpack.
201+
switch desc.MediaType {
202+
case images.MediaTypeDockerSchema1Manifest:
203+
lock.Lock()
204+
schema1 = true
205+
lock.Unlock()
206+
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
207+
lock.Lock()
208+
for _, child := range children {
209+
if child.MediaType == images.MediaTypeDockerSchema2Config ||
210+
child.MediaType == ocispec.MediaTypeImageConfig {
211+
continue
212+
}
213+
layers = append(layers, child)
214+
}
215+
lock.Unlock()
216+
case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig:
217+
lock.Lock()
218+
l := append([]ocispec.Descriptor{}, layers...)
219+
lock.Unlock()
220+
if len(l) > 0 {
221+
atomic.AddInt32(unpacks, 1)
222+
eg.Go(func() error {
223+
return u.unpack(uctx, desc, l)
224+
})
225+
}
226+
case images.MediaTypeDockerSchema2LayerGzip, images.MediaTypeDockerSchema2Layer,
227+
ocispec.MediaTypeImageLayerGzip, ocispec.MediaTypeImageLayer:
228+
lock.Lock()
229+
update := !schema1
230+
lock.Unlock()
231+
if update {
232+
u.updateCh <- desc
233+
}
234+
}
235+
return children, nil
236+
})
237+
}, eg
238+
}

0 commit comments

Comments
 (0)