Skip to content

Commit dc09ed1

Browse files
committed
Add image handler wrapper
Gives clients more control of the pull process, allowing the client to operate on a descriptor after it has been pulled. This could be useful for filtering output or tracking children before they dispatched to. This can also be used to call custom unpackers to have visibility into a pulled config in parallel to the downloads. Signed-off-by: Derek McGowan <[email protected]>
1 parent f5b0fa2 commit dc09ed1

4 files changed

Lines changed: 225 additions & 162 deletions

File tree

client.go

Lines changed: 21 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"github.com/containerd/containerd/content"
4343
contentproxy "github.com/containerd/containerd/content/proxy"
4444
"github.com/containerd/containerd/defaults"
45-
"github.com/containerd/containerd/errdefs"
4645
"github.com/containerd/containerd/events"
4746
"github.com/containerd/containerd/images"
4847
"github.com/containerd/containerd/leases"
@@ -53,15 +52,13 @@ import (
5352
"github.com/containerd/containerd/plugin"
5453
"github.com/containerd/containerd/remotes"
5554
"github.com/containerd/containerd/remotes/docker"
56-
"github.com/containerd/containerd/remotes/docker/schema1"
5755
"github.com/containerd/containerd/snapshots"
5856
snproxy "github.com/containerd/containerd/snapshots/proxy"
5957
"github.com/containerd/typeurl"
6058
ptypes "github.com/gogo/protobuf/types"
6159
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
6260
specs "github.com/opencontainers/runtime-spec/specs-go"
6361
"github.com/pkg/errors"
64-
"golang.org/x/sync/semaphore"
6562
"google.golang.org/grpc"
6663
"google.golang.org/grpc/health/grpc_health_v1"
6764
)
@@ -283,6 +280,12 @@ type RemoteContext struct {
283280
// handlers.
284281
BaseHandlers []images.Handler
285282

283+
// HandlerWrapper wraps the handler which gets sent to dispatch.
284+
// Unlike BaseHandlers, this can run before and after the built
285+
// in handlers, allowing operations to run on the descriptor
286+
// after it has completed transferring.
287+
HandlerWrapper func(images.Handler) images.Handler
288+
286289
// ConvertSchema1 is whether to convert Docker registry schema 1
287290
// manifests. If this option is false then any image which resolves
288291
// to schema 1 will return an error since schema 1 is not supported.
@@ -347,161 +350,6 @@ func (c *Client) Fetch(ctx context.Context, ref string, opts ...RemoteOpt) (imag
347350
return c.fetch(ctx, fetchCtx, ref, 0)
348351
}
349352

350-
// Pull downloads the provided content into containerd's content store
351-
// and returns a platform specific image object
352-
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
353-
pullCtx := defaultRemoteContext()
354-
for _, o := range opts {
355-
if err := o(c, pullCtx); err != nil {
356-
return nil, err
357-
}
358-
}
359-
360-
if pullCtx.PlatformMatcher == nil {
361-
if len(pullCtx.Platforms) > 1 {
362-
return nil, errors.New("cannot pull multiplatform image locally, try Fetch")
363-
} else if len(pullCtx.Platforms) == 0 {
364-
pullCtx.PlatformMatcher = platforms.Default()
365-
} else {
366-
p, err := platforms.Parse(pullCtx.Platforms[0])
367-
if err != nil {
368-
return nil, errors.Wrapf(err, "invalid platform %s", pullCtx.Platforms[0])
369-
}
370-
371-
pullCtx.PlatformMatcher = platforms.Only(p)
372-
}
373-
}
374-
375-
ctx, done, err := c.WithLease(ctx)
376-
if err != nil {
377-
return nil, err
378-
}
379-
defer done(ctx)
380-
381-
img, err := c.fetch(ctx, pullCtx, ref, 1)
382-
if err != nil {
383-
return nil, err
384-
}
385-
386-
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
387-
388-
if pullCtx.Unpack {
389-
if err := i.Unpack(ctx, pullCtx.Snapshotter); err != nil {
390-
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
391-
}
392-
}
393-
394-
return i, nil
395-
}
396-
397-
func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) {
398-
store := c.ContentStore()
399-
name, desc, err := rCtx.Resolver.Resolve(ctx, ref)
400-
if err != nil {
401-
return images.Image{}, errors.Wrapf(err, "failed to resolve reference %q", ref)
402-
}
403-
404-
fetcher, err := rCtx.Resolver.Fetcher(ctx, name)
405-
if err != nil {
406-
return images.Image{}, errors.Wrapf(err, "failed to get fetcher for %q", name)
407-
}
408-
409-
var (
410-
handler images.Handler
411-
412-
isConvertible bool
413-
converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error)
414-
limiter *semaphore.Weighted
415-
)
416-
417-
if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 {
418-
schema1Converter := schema1.NewConverter(store, fetcher)
419-
420-
handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...)
421-
422-
isConvertible = true
423-
424-
converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) {
425-
return schema1Converter.Convert(ctx)
426-
}
427-
} else {
428-
// Get all the children for a descriptor
429-
childrenHandler := images.ChildrenHandler(store)
430-
// Set any children labels for that content
431-
childrenHandler = images.SetChildrenLabels(store, childrenHandler)
432-
// Filter children by platforms
433-
childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher)
434-
// Sort and limit manifests if a finite number is needed
435-
if limit > 0 {
436-
childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
437-
}
438-
439-
// set isConvertible to true if there is application/octet-stream media type
440-
convertibleHandler := images.HandlerFunc(
441-
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
442-
if desc.MediaType == docker.LegacyConfigMediaType {
443-
isConvertible = true
444-
}
445-
446-
return []ocispec.Descriptor{}, nil
447-
},
448-
)
449-
450-
handler = images.Handlers(append(rCtx.BaseHandlers,
451-
remotes.FetchHandler(store, fetcher),
452-
convertibleHandler,
453-
childrenHandler,
454-
)...)
455-
456-
converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) {
457-
return docker.ConvertManifest(ctx, store, desc)
458-
}
459-
}
460-
461-
if rCtx.MaxConcurrentDownloads > 0 {
462-
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
463-
}
464-
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
465-
return images.Image{}, err
466-
}
467-
468-
if isConvertible {
469-
if desc, err = converterFunc(ctx, desc); err != nil {
470-
return images.Image{}, err
471-
}
472-
}
473-
474-
img := images.Image{
475-
Name: name,
476-
Target: desc,
477-
Labels: rCtx.Labels,
478-
}
479-
480-
is := c.ImageService()
481-
for {
482-
if created, err := is.Create(ctx, img); err != nil {
483-
if !errdefs.IsAlreadyExists(err) {
484-
return images.Image{}, err
485-
}
486-
487-
updated, err := is.Update(ctx, img)
488-
if err != nil {
489-
// if image was removed, try create again
490-
if errdefs.IsNotFound(err) {
491-
continue
492-
}
493-
return images.Image{}, err
494-
}
495-
496-
img = updated
497-
} else {
498-
img = created
499-
}
500-
501-
return img, nil
502-
}
503-
}
504-
505353
// Push uploads the provided content to a remote resource
506354
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
507355
pushCtx := defaultRemoteContext()
@@ -531,7 +379,21 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor,
531379
return err
532380
}
533381

534-
return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, pushCtx.BaseHandlers...)
382+
var wrapper func(images.Handler) images.Handler
383+
384+
if len(pushCtx.BaseHandlers) > 0 {
385+
wrapper = func(h images.Handler) images.Handler {
386+
h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
387+
if pushCtx.HandlerWrapper != nil {
388+
h = pushCtx.HandlerWrapper(h)
389+
}
390+
return h
391+
}
392+
} else if pushCtx.HandlerWrapper != nil {
393+
wrapper = pushCtx.HandlerWrapper
394+
}
395+
396+
return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, wrapper)
535397
}
536398

537399
// GetImage returns an existing image

client_opts.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,14 @@ func WithImageHandler(h images.Handler) RemoteOpt {
179179
}
180180
}
181181

182+
// WithImageHandlerWrapper wraps the handlers to be called on dispatch.
183+
func WithImageHandlerWrapper(w func(images.Handler) images.Handler) RemoteOpt {
184+
return func(client *Client, c *RemoteContext) error {
185+
c.HandlerWrapper = w
186+
return nil
187+
}
188+
}
189+
182190
// WithMaxConcurrentDownloads sets max concurrent download limit.
183191
func WithMaxConcurrentDownloads(max int) RemoteOpt {
184192
return func(client *Client, c *RemoteContext) error {

0 commit comments

Comments
 (0)