Skip to content

Commit 97a9877

Browse files
committed
Add StreamProcessor for apply
Signed-off-by: Michael Crosby <[email protected]>
1 parent 2ff1ef4 commit 97a9877

3 files changed

Lines changed: 266 additions & 37 deletions

File tree

diff/apply/apply.go

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@ import (
2323
"time"
2424

2525
"github.com/containerd/containerd/archive"
26-
"github.com/containerd/containerd/archive/compression"
2726
"github.com/containerd/containerd/content"
2827
"github.com/containerd/containerd/diff"
29-
"github.com/containerd/containerd/errdefs"
30-
"github.com/containerd/containerd/images"
3128
"github.com/containerd/containerd/log"
3229
"github.com/containerd/containerd/mount"
3330
digest "github.com/opencontainers/go-digest"
@@ -66,54 +63,50 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
6663
}
6764
}()
6865

69-
isCompressed, err := images.IsCompressedDiff(ctx, desc.MediaType)
66+
var config diff.ApplyConfig
67+
for _, o := range opts {
68+
if err := o(&config); err != nil {
69+
return emptyDesc, errors.Wrap(err, "failed to apply config opt")
70+
}
71+
}
72+
73+
ra, err := s.store.ReaderAt(ctx, desc)
7074
if err != nil {
71-
return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType)
75+
return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
7276
}
77+
defer ra.Close()
7378

74-
var ocidesc ocispec.Descriptor
75-
if err := mount.WithTempMount(ctx, mounts, func(root string) error {
76-
ra, err := s.store.ReaderAt(ctx, desc)
77-
if err != nil {
78-
return errors.Wrap(err, "failed to get reader from content store")
79+
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
80+
for {
81+
if processor, err = diff.GetProcessor(ctx, desc.MediaType, processor, config.ProcessorPayloads); err != nil {
82+
return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType)
7983
}
80-
defer ra.Close()
81-
82-
r := content.NewReader(ra)
83-
if isCompressed {
84-
ds, err := compression.DecompressStream(r)
85-
if err != nil {
86-
return err
87-
}
88-
defer ds.Close()
89-
r = ds
90-
}
91-
92-
digester := digest.Canonical.Digester()
93-
rc := &readCounter{
94-
r: io.TeeReader(r, digester.Hash()),
84+
if processor.MediaType() == ocispec.MediaTypeImageLayer {
85+
break
9586
}
87+
}
88+
defer processor.Close()
9689

90+
digester := digest.Canonical.Digester()
91+
rc := &readCounter{
92+
r: io.TeeReader(processor, digester.Hash()),
93+
}
94+
if err := mount.WithTempMount(ctx, mounts, func(root string) error {
9795
if _, err := archive.Apply(ctx, root, rc); err != nil {
9896
return err
9997
}
10098

10199
// Read any trailing data
102-
if _, err := io.Copy(ioutil.Discard, rc); err != nil {
103-
return err
104-
}
105-
106-
ocidesc = ocispec.Descriptor{
107-
MediaType: ocispec.MediaTypeImageLayer,
108-
Size: rc.c,
109-
Digest: digester.Digest(),
110-
}
111-
return nil
112-
100+
_, err := io.Copy(ioutil.Discard, rc)
101+
return err
113102
}); err != nil {
114103
return emptyDesc, err
115104
}
116-
return ocidesc, nil
105+
return ocispec.Descriptor{
106+
MediaType: ocispec.MediaTypeImageLayer,
107+
Size: rc.c,
108+
Digest: digester.Digest(),
109+
}, nil
117110
}
118111

119112
type readCounter struct {

diff/diff.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type Comparer interface {
5353

5454
// ApplyConfig is used to hold parameters needed for a apply operation
5555
type ApplyConfig struct {
56+
// ProcessorPayloads specifies the payload sent to various processors
57+
ProcessorPayloads map[string]interface{}
5658
}
5759

5860
// ApplyOpt is used to configure an Apply operation

diff/stream.go

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package diff
18+
19+
import (
20+
"context"
21+
"io"
22+
"os"
23+
"os/exec"
24+
25+
"github.com/containerd/containerd/archive/compression"
26+
"github.com/containerd/containerd/images"
27+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
28+
"github.com/pkg/errors"
29+
)
30+
31+
var (
32+
handlers []Handler
33+
34+
// ErrNoProcessor is returned when no stream processor is avaliable for a media-type
35+
ErrNoProcessor = errors.New("no processor for media-type")
36+
)
37+
38+
func init() {
39+
// register the default compression handler
40+
RegisterProcessor(compressedHandler)
41+
}
42+
43+
// RegisterProcessor registers a stream processor for media-types
44+
func RegisterProcessor(handler Handler) {
45+
handlers = append(handlers, handler)
46+
}
47+
48+
// GetProcessor returns the processor for a media-type
49+
func GetProcessor(ctx context.Context, mediaType string, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) {
50+
// reverse this list so that user configured handlers come up first
51+
for i := len(handlers); i >= 0; i-- {
52+
processor, ok := handlers[i](ctx, mediaType)
53+
if ok {
54+
return processor(ctx, stream, payloads)
55+
}
56+
}
57+
return nil, ErrNoProcessor
58+
}
59+
60+
// Handler checks a media-type and initializes the processor
61+
type Handler func(ctx context.Context, mediaType string) (StreamProcessorInit, bool)
62+
63+
// StaticHandler returns the processor init func for a static media-type
64+
func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
65+
return func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
66+
if mediaType == expectedMediaType {
67+
return fn, true
68+
}
69+
return nil, false
70+
}
71+
}
72+
73+
// StreamProcessorInit returns the initialized stream processor
74+
type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error)
75+
76+
// RawProcessor provides access to direct fd for processing
77+
type RawProcessor interface {
78+
// File returns the fd for the read stream of the underlying processor
79+
File() *os.File
80+
}
81+
82+
// StreamProcessor handles processing a content stream and transforming it into a different media-type
83+
type StreamProcessor interface {
84+
io.ReadCloser
85+
86+
// MediaType is the resulting media-type that the processor processes the stream into
87+
MediaType() string
88+
}
89+
90+
func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
91+
compressed, err := images.IsCompressedDiff(ctx, mediaType)
92+
if err != nil {
93+
return nil, false
94+
}
95+
if compressed {
96+
return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) {
97+
ds, err := compression.DecompressStream(stream)
98+
if err != nil {
99+
return nil, err
100+
}
101+
102+
return &compressedProcessor{
103+
rc: ds,
104+
}, nil
105+
}, true
106+
}
107+
return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) {
108+
return &stdProcessor{
109+
rc: stream,
110+
}, nil
111+
}, true
112+
}
113+
114+
// NewProcessorChain initialized the root StreamProcessor
115+
func NewProcessorChain(mt string, r io.Reader) StreamProcessor {
116+
return &processorChain{
117+
mt: mt,
118+
rc: r,
119+
}
120+
}
121+
122+
type processorChain struct {
123+
mt string
124+
rc io.Reader
125+
}
126+
127+
func (c *processorChain) MediaType() string {
128+
return c.mt
129+
}
130+
131+
func (c *processorChain) Read(p []byte) (int, error) {
132+
return c.rc.Read(p)
133+
}
134+
135+
func (c *processorChain) Close() error {
136+
return nil
137+
}
138+
139+
type stdProcessor struct {
140+
rc StreamProcessor
141+
}
142+
143+
func (c *stdProcessor) MediaType() string {
144+
return ocispec.MediaTypeImageLayer
145+
}
146+
147+
func (c *stdProcessor) Read(p []byte) (int, error) {
148+
return c.rc.Read(p)
149+
}
150+
151+
func (c *stdProcessor) Close() error {
152+
return nil
153+
}
154+
155+
type compressedProcessor struct {
156+
rc io.ReadCloser
157+
}
158+
159+
func (c *compressedProcessor) MediaType() string {
160+
return ocispec.MediaTypeImageLayer
161+
}
162+
163+
func (c *compressedProcessor) Read(p []byte) (int, error) {
164+
return c.rc.Read(p)
165+
}
166+
167+
func (c *compressedProcessor) Close() error {
168+
return c.rc.Close()
169+
}
170+
171+
// NewBinaryProcessor returns a binary processor for use with processing content streams
172+
func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, name string, args ...string) (StreamProcessor, error) {
173+
cmd := exec.CommandContext(ctx, name, args...)
174+
var (
175+
stdin *os.File
176+
err error
177+
)
178+
if f, ok := stream.(RawProcessor); ok {
179+
stdin = f.File()
180+
} else {
181+
r, w, err := os.Pipe()
182+
if err != nil {
183+
return nil, err
184+
}
185+
stdin = r
186+
go func() {
187+
io.Copy(w, stream)
188+
}()
189+
}
190+
cmd.Stdin = stdin
191+
r, w, err := os.Pipe()
192+
if err != nil {
193+
return nil, err
194+
}
195+
cmd.Stdout = w
196+
if err := cmd.Start(); err != nil {
197+
return nil, err
198+
}
199+
// close after start and dup
200+
stdin.Close()
201+
w.Close()
202+
203+
return &binaryProcessor{
204+
cmd: cmd,
205+
r: r,
206+
mt: mt,
207+
}, nil
208+
}
209+
210+
type binaryProcessor struct {
211+
cmd *exec.Cmd
212+
r *os.File
213+
mt string
214+
}
215+
216+
func (c *binaryProcessor) File() *os.File {
217+
return c.r
218+
}
219+
220+
func (c *binaryProcessor) MediaType() string {
221+
return c.mt
222+
}
223+
224+
func (c *binaryProcessor) Read(p []byte) (int, error) {
225+
return c.r.Read(p)
226+
}
227+
228+
func (c *binaryProcessor) Close() error {
229+
err := c.r.Close()
230+
if kerr := c.cmd.Process.Kill(); err == nil {
231+
err = kerr
232+
}
233+
return err
234+
}

0 commit comments

Comments
 (0)