Skip to content

Commit 480bf65

Browse files
authored
Merge pull request #3010 from dmcgowan/pull-add-handler-wrap
Add image handler wrapper
2 parents 06ff5ef + dc09ed1 commit 480bf65

4 files changed

Lines changed: 225 additions & 162 deletions

File tree

client.go

Lines changed: 21 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ import (
4242
"github.com/containerd/containerd/content"
4343
contentproxy "github.com/containerd/containerd/content/proxy"
4444
"github.com/containerd/containerd/defaults"
45-
"github.com/containerd/containerd/errdefs"
4645
"github.com/containerd/containerd/events"
4746
"github.com/containerd/containerd/images"
4847
"github.com/containerd/containerd/leases"
@@ -53,15 +52,13 @@ import (
5352
"github.com/containerd/containerd/plugin"
5453
"github.com/containerd/containerd/remotes"
5554
"github.com/containerd/containerd/remotes/docker"
56-
"github.com/containerd/containerd/remotes/docker/schema1"
5755
"github.com/containerd/containerd/snapshots"
5856
snproxy "github.com/containerd/containerd/snapshots/proxy"
5957
"github.com/containerd/typeurl"
6058
ptypes "github.com/gogo/protobuf/types"
6159
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
6260
specs "github.com/opencontainers/runtime-spec/specs-go"
6361
"github.com/pkg/errors"
64-
"golang.org/x/sync/semaphore"
6562
"google.golang.org/grpc"
6663
"google.golang.org/grpc/health/grpc_health_v1"
6764
)
@@ -283,6 +280,12 @@ type RemoteContext struct {
283280
// handlers.
284281
BaseHandlers []images.Handler
285282

283+
// HandlerWrapper wraps the handler which gets sent to dispatch.
284+
// Unlike BaseHandlers, this can run before and after the built
285+
// in handlers, allowing operations to run on the descriptor
286+
// after it has completed transferring.
287+
HandlerWrapper func(images.Handler) images.Handler
288+
286289
// ConvertSchema1 is whether to convert Docker registry schema 1
287290
// manifests. If this option is false then any image which resolves
288291
// to schema 1 will return an error since schema 1 is not supported.
@@ -347,161 +350,6 @@ func (c *Client) Fetch(ctx context.Context, ref string, opts ...RemoteOpt) (imag
347350
return c.fetch(ctx, fetchCtx, ref, 0)
348351
}
349352

350-
// Pull downloads the provided content into containerd's content store
351-
// and returns a platform specific image object
352-
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
353-
pullCtx := defaultRemoteContext()
354-
for _, o := range opts {
355-
if err := o(c, pullCtx); err != nil {
356-
return nil, err
357-
}
358-
}
359-
360-
if pullCtx.PlatformMatcher == nil {
361-
if len(pullCtx.Platforms) > 1 {
362-
return nil, errors.New("cannot pull multiplatform image locally, try Fetch")
363-
} else if len(pullCtx.Platforms) == 0 {
364-
pullCtx.PlatformMatcher = platforms.Default()
365-
} else {
366-
p, err := platforms.Parse(pullCtx.Platforms[0])
367-
if err != nil {
368-
return nil, errors.Wrapf(err, "invalid platform %s", pullCtx.Platforms[0])
369-
}
370-
371-
pullCtx.PlatformMatcher = platforms.Only(p)
372-
}
373-
}
374-
375-
ctx, done, err := c.WithLease(ctx)
376-
if err != nil {
377-
return nil, err
378-
}
379-
defer done(ctx)
380-
381-
img, err := c.fetch(ctx, pullCtx, ref, 1)
382-
if err != nil {
383-
return nil, err
384-
}
385-
386-
i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher)
387-
388-
if pullCtx.Unpack {
389-
if err := i.Unpack(ctx, pullCtx.Snapshotter); err != nil {
390-
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
391-
}
392-
}
393-
394-
return i, nil
395-
}
396-
397-
func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) {
398-
store := c.ContentStore()
399-
name, desc, err := rCtx.Resolver.Resolve(ctx, ref)
400-
if err != nil {
401-
return images.Image{}, errors.Wrapf(err, "failed to resolve reference %q", ref)
402-
}
403-
404-
fetcher, err := rCtx.Resolver.Fetcher(ctx, name)
405-
if err != nil {
406-
return images.Image{}, errors.Wrapf(err, "failed to get fetcher for %q", name)
407-
}
408-
409-
var (
410-
handler images.Handler
411-
412-
isConvertible bool
413-
converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error)
414-
limiter *semaphore.Weighted
415-
)
416-
417-
if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 {
418-
schema1Converter := schema1.NewConverter(store, fetcher)
419-
420-
handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...)
421-
422-
isConvertible = true
423-
424-
converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) {
425-
return schema1Converter.Convert(ctx)
426-
}
427-
} else {
428-
// Get all the children for a descriptor
429-
childrenHandler := images.ChildrenHandler(store)
430-
// Set any children labels for that content
431-
childrenHandler = images.SetChildrenLabels(store, childrenHandler)
432-
// Filter children by platforms
433-
childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher)
434-
// Sort and limit manifests if a finite number is needed
435-
if limit > 0 {
436-
childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
437-
}
438-
439-
// set isConvertible to true if there is application/octet-stream media type
440-
convertibleHandler := images.HandlerFunc(
441-
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
442-
if desc.MediaType == docker.LegacyConfigMediaType {
443-
isConvertible = true
444-
}
445-
446-
return []ocispec.Descriptor{}, nil
447-
},
448-
)
449-
450-
handler = images.Handlers(append(rCtx.BaseHandlers,
451-
remotes.FetchHandler(store, fetcher),
452-
convertibleHandler,
453-
childrenHandler,
454-
)...)
455-
456-
converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) {
457-
return docker.ConvertManifest(ctx, store, desc)
458-
}
459-
}
460-
461-
if rCtx.MaxConcurrentDownloads > 0 {
462-
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
463-
}
464-
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
465-
return images.Image{}, err
466-
}
467-
468-
if isConvertible {
469-
if desc, err = converterFunc(ctx, desc); err != nil {
470-
return images.Image{}, err
471-
}
472-
}
473-
474-
img := images.Image{
475-
Name: name,
476-
Target: desc,
477-
Labels: rCtx.Labels,
478-
}
479-
480-
is := c.ImageService()
481-
for {
482-
if created, err := is.Create(ctx, img); err != nil {
483-
if !errdefs.IsAlreadyExists(err) {
484-
return images.Image{}, err
485-
}
486-
487-
updated, err := is.Update(ctx, img)
488-
if err != nil {
489-
// if image was removed, try create again
490-
if errdefs.IsNotFound(err) {
491-
continue
492-
}
493-
return images.Image{}, err
494-
}
495-
496-
img = updated
497-
} else {
498-
img = created
499-
}
500-
501-
return img, nil
502-
}
503-
}
504-
505353
// Push uploads the provided content to a remote resource
506354
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
507355
pushCtx := defaultRemoteContext()
@@ -531,7 +379,21 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor,
531379
return err
532380
}
533381

534-
return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, pushCtx.BaseHandlers...)
382+
var wrapper func(images.Handler) images.Handler
383+
384+
if len(pushCtx.BaseHandlers) > 0 {
385+
wrapper = func(h images.Handler) images.Handler {
386+
h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
387+
if pushCtx.HandlerWrapper != nil {
388+
h = pushCtx.HandlerWrapper(h)
389+
}
390+
return h
391+
}
392+
} else if pushCtx.HandlerWrapper != nil {
393+
wrapper = pushCtx.HandlerWrapper
394+
}
395+
396+
return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), pushCtx.PlatformMatcher, wrapper)
535397
}
536398

537399
// GetImage returns an existing image

client_opts.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,14 @@ func WithImageHandler(h images.Handler) RemoteOpt {
179179
}
180180
}
181181

182+
// WithImageHandlerWrapper wraps the handlers to be called on dispatch.
183+
func WithImageHandlerWrapper(w func(images.Handler) images.Handler) RemoteOpt {
184+
return func(client *Client, c *RemoteContext) error {
185+
c.HandlerWrapper = w
186+
return nil
187+
}
188+
}
189+
182190
// WithMaxConcurrentDownloads sets max concurrent download limit.
183191
func WithMaxConcurrentDownloads(max int) RemoteOpt {
184192
return func(client *Client, c *RemoteContext) error {

0 commit comments

Comments
 (0)