Skip to content

Commit 3668237

Browse files
committed
Add server config for stream processors
Signed-off-by: Michael Crosby <[email protected]>
1 parent 97a9877 commit 3668237

5 files changed

Lines changed: 76 additions & 22 deletions

File tree

diff/apply/apply.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
7878

7979
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
8080
for {
81-
if processor, err = diff.GetProcessor(ctx, desc.MediaType, processor, config.ProcessorPayloads); err != nil {
81+
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
8282
return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType)
8383
}
8484
if processor.MediaType() == ocispec.MediaTypeImageLayer {

diff/diff.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121

2222
"github.com/containerd/containerd/mount"
23+
"github.com/gogo/protobuf/types"
2324
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2425
)
2526

@@ -54,7 +55,7 @@ type Comparer interface {
5455
// ApplyConfig is used to hold parameters needed for a apply operation
5556
type ApplyConfig struct {
5657
// ProcessorPayloads specifies the payload sent to various processors
57-
ProcessorPayloads map[string]interface{}
58+
ProcessorPayloads map[string]*types.Any
5859
}
5960

6061
// ApplyOpt is used to configure an Apply operation

diff/stream.go

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@ package diff
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"io"
2223
"os"
2324
"os/exec"
2425

2526
"github.com/containerd/containerd/archive/compression"
2627
"github.com/containerd/containerd/images"
28+
"github.com/gogo/protobuf/proto"
29+
"github.com/gogo/protobuf/types"
2730
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2831
"github.com/pkg/errors"
2932
)
3033

3134
var (
3235
handlers []Handler
3336

34-
// ErrNoProcessor is returned when no stream processor is avaliable for a media-type
37+
// ErrNoProcessor is returned when no stream processor is available for a media-type
3538
ErrNoProcessor = errors.New("no processor for media-type")
3639
)
3740

@@ -46,10 +49,10 @@ func RegisterProcessor(handler Handler) {
4649
}
4750

4851
// GetProcessor returns the processor for a media-type
49-
func GetProcessor(ctx context.Context, mediaType string, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) {
52+
func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
5053
// reverse this list so that user configured handlers come up first
51-
for i := len(handlers); i >= 0; i-- {
52-
processor, ok := handlers[i](ctx, mediaType)
54+
for i := len(handlers) - 1; i >= 0; i-- {
55+
processor, ok := handlers[i](ctx, stream.MediaType())
5356
if ok {
5457
return processor(ctx, stream, payloads)
5558
}
@@ -71,7 +74,7 @@ func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
7174
}
7275

7376
// StreamProcessorInit returns the initialized stream processor
74-
type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error)
77+
type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error)
7578

7679
// RawProcessor provides access to direct fd for processing
7780
type RawProcessor interface {
@@ -93,7 +96,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn
9396
return nil, false
9497
}
9598
if compressed {
96-
return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) {
99+
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
97100
ds, err := compression.DecompressStream(stream)
98101
if err != nil {
99102
return nil, err
@@ -104,7 +107,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn
104107
}, nil
105108
}, true
106109
}
107-
return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) {
110+
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
108111
return &stdProcessor{
109112
rc: stream,
110113
}, nil
@@ -168,42 +171,71 @@ func (c *compressedProcessor) Close() error {
168171
return c.rc.Close()
169172
}
170173

174+
func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args []string) Handler {
175+
set := make(map[string]struct{}, len(mediaTypes))
176+
for _, m := range mediaTypes {
177+
set[m] = struct{}{}
178+
}
179+
return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) {
180+
if _, ok := set[mediaType]; ok {
181+
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
182+
payload := payloads[id]
183+
return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, payload)
184+
}, true
185+
}
186+
return nil, false
187+
}
188+
}
189+
190+
const (
191+
payloadEnvVar = "STREAM_PROCESSOR_PAYLOAD"
192+
mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE"
193+
)
194+
171195
// NewBinaryProcessor returns a binary processor for use with processing content streams
172-
func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, name string, args ...string) (StreamProcessor, error) {
196+
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
173197
cmd := exec.CommandContext(ctx, name, args...)
198+
if payload != nil {
199+
data, err := proto.Marshal(payload)
200+
if err != nil {
201+
return nil, err
202+
}
203+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", payloadEnvVar, data))
204+
}
205+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
174206
var (
175-
stdin *os.File
176-
err error
207+
stdin io.Reader
208+
closer func() error
209+
err error
177210
)
178211
if f, ok := stream.(RawProcessor); ok {
179212
stdin = f.File()
213+
closer = f.File().Close
180214
} else {
181-
r, w, err := os.Pipe()
182-
if err != nil {
183-
return nil, err
184-
}
185-
stdin = r
186-
go func() {
187-
io.Copy(w, stream)
188-
}()
215+
stdin = stream
189216
}
190217
cmd.Stdin = stdin
191218
r, w, err := os.Pipe()
192219
if err != nil {
193220
return nil, err
194221
}
195222
cmd.Stdout = w
223+
196224
if err := cmd.Start(); err != nil {
197225
return nil, err
198226
}
227+
go cmd.Wait()
228+
199229
// close after start and dup
200-
stdin.Close()
201230
w.Close()
231+
if closer != nil {
232+
closer()
233+
}
202234

203235
return &binaryProcessor{
204236
cmd: cmd,
205237
r: r,
206-
mt: mt,
238+
mt: rmt,
207239
}, nil
208240
}
209241

services/server/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,25 @@ type Config struct {
5656
// ProxyPlugins configures plugins which are communicated to over GRPC
5757
ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"`
5858

59+
StreamProcessors []StreamProcessor `toml:"stream_processors"`
60+
5961
md toml.MetaData
6062
}
6163

64+
// StreamProcessor provides configuration for diff content processors
65+
type StreamProcessor struct {
66+
// ID of the processor, also used to fetch the specific payload
67+
ID string `toml:"id"`
68+
// Accepts specific media-types
69+
Accepts []string `toml:"accepts"`
70+
// Returns the media-type
71+
Returns string `toml:"returns"`
72+
// Path or name of the binary
73+
Path string `toml:"path"`
74+
// Args to the binary
75+
Args []string `toml:"args"`
76+
}
77+
6278
// GetVersion returns the config file's version
6379
func (c *Config) GetVersion() int {
6480
if c.Version == 0 {

services/server/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/containerd/containerd/content/local"
3636
csproxy "github.com/containerd/containerd/content/proxy"
3737
"github.com/containerd/containerd/defaults"
38+
"github.com/containerd/containerd/diff"
3839
"github.com/containerd/containerd/events/exchange"
3940
"github.com/containerd/containerd/log"
4041
"github.com/containerd/containerd/metadata"
@@ -80,6 +81,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
8081
if err != nil {
8182
return nil, err
8283
}
84+
for _, p := range config.StreamProcessors {
85+
diff.RegisterProcessor(diff.BinaryHandler(p.ID, p.Returns, p.Accepts, p.Path, p.Args))
86+
}
87+
8388
serverOpts := []grpc.ServerOption{
8489
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
8590
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),

0 commit comments

Comments
 (0)