@@ -18,20 +18,23 @@ package diff
1818
1919import (
2020 "context"
21+ "fmt"
2122 "io"
2223 "os"
2324 "os/exec"
2425
2526 "github.com/containerd/containerd/archive/compression"
2627 "github.com/containerd/containerd/images"
28+ "github.com/gogo/protobuf/proto"
29+ "github.com/gogo/protobuf/types"
2730 ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2831 "github.com/pkg/errors"
2932)
3033
3134var (
3235 handlers []Handler
3336
34- // ErrNoProcessor is returned when no stream processor is avaliable for a media-type
37+ // ErrNoProcessor is returned when no stream processor is available for a media-type
3538 ErrNoProcessor = errors .New ("no processor for media-type" )
3639)
3740
@@ -46,10 +49,10 @@ func RegisterProcessor(handler Handler) {
4649}
4750
4851// GetProcessor returns the processor for a media-type
49- func GetProcessor (ctx context.Context , mediaType string , stream StreamProcessor , payloads map [string ]interface {} ) (StreamProcessor , error ) {
52+ func GetProcessor (ctx context.Context , stream StreamProcessor , payloads map [string ]* types. Any ) (StreamProcessor , error ) {
5053 // reverse this list so that user configured handlers come up first
51- for i := len (handlers ); i >= 0 ; i -- {
52- processor , ok := handlers [i ](ctx , mediaType )
54+ for i := len (handlers ) - 1 ; i >= 0 ; i -- {
55+ processor , ok := handlers [i ](ctx , stream . MediaType () )
5356 if ok {
5457 return processor (ctx , stream , payloads )
5558 }
@@ -71,7 +74,7 @@ func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
7174}
7275
7376// StreamProcessorInit returns the initialized stream processor
74- type StreamProcessorInit func (ctx context.Context , stream StreamProcessor , payloads map [string ]interface {} ) (StreamProcessor , error )
77+ type StreamProcessorInit func (ctx context.Context , stream StreamProcessor , payloads map [string ]* types. Any ) (StreamProcessor , error )
7578
7679// RawProcessor provides access to direct fd for processing
7780type RawProcessor interface {
@@ -93,7 +96,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn
9396 return nil , false
9497 }
9598 if compressed {
96- return func (ctx context.Context , stream StreamProcessor , payloads map [string ]interface {} ) (StreamProcessor , error ) {
99+ return func (ctx context.Context , stream StreamProcessor , payloads map [string ]* types. Any ) (StreamProcessor , error ) {
97100 ds , err := compression .DecompressStream (stream )
98101 if err != nil {
99102 return nil , err
@@ -104,7 +107,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn
104107 }, nil
105108 }, true
106109 }
107- return func (ctx context.Context , stream StreamProcessor , payloads map [string ]interface {} ) (StreamProcessor , error ) {
110+ return func (ctx context.Context , stream StreamProcessor , payloads map [string ]* types. Any ) (StreamProcessor , error ) {
108111 return & stdProcessor {
109112 rc : stream ,
110113 }, nil
@@ -168,42 +171,71 @@ func (c *compressedProcessor) Close() error {
168171 return c .rc .Close ()
169172}
170173
174+ func BinaryHandler (id , returnsMediaType string , mediaTypes []string , path string , args []string ) Handler {
175+ set := make (map [string ]struct {}, len (mediaTypes ))
176+ for _ , m := range mediaTypes {
177+ set [m ] = struct {}{}
178+ }
179+ return func (_ context.Context , mediaType string ) (StreamProcessorInit , bool ) {
180+ if _ , ok := set [mediaType ]; ok {
181+ return func (ctx context.Context , stream StreamProcessor , payloads map [string ]* types.Any ) (StreamProcessor , error ) {
182+ payload := payloads [id ]
183+ return NewBinaryProcessor (ctx , mediaType , returnsMediaType , stream , path , args , payload )
184+ }, true
185+ }
186+ return nil , false
187+ }
188+ }
189+
190+ const (
191+ payloadEnvVar = "STREAM_PROCESSOR_PAYLOAD"
192+ mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE"
193+ )
194+
171195// NewBinaryProcessor returns a binary processor for use with processing content streams
172- func NewBinaryProcessor (ctx context.Context , mt string , stream StreamProcessor , name string , args ... string ) (StreamProcessor , error ) {
196+ func NewBinaryProcessor (ctx context.Context , imt , rmt string , stream StreamProcessor , name string , args [] string , payload * types. Any ) (StreamProcessor , error ) {
173197 cmd := exec .CommandContext (ctx , name , args ... )
198+ if payload != nil {
199+ data , err := proto .Marshal (payload )
200+ if err != nil {
201+ return nil , err
202+ }
203+ cmd .Env = append (cmd .Env , fmt .Sprintf ("%s=%s" , payloadEnvVar , data ))
204+ }
205+ cmd .Env = append (cmd .Env , fmt .Sprintf ("%s=%s" , mediaTypeEnvVar , imt ))
174206 var (
175- stdin * os.File
176- err error
207+ stdin io.Reader
208+ closer func () error
209+ err error
177210 )
178211 if f , ok := stream .(RawProcessor ); ok {
179212 stdin = f .File ()
213+ closer = f .File ().Close
180214 } else {
181- r , w , err := os .Pipe ()
182- if err != nil {
183- return nil , err
184- }
185- stdin = r
186- go func () {
187- io .Copy (w , stream )
188- }()
215+ stdin = stream
189216 }
190217 cmd .Stdin = stdin
191218 r , w , err := os .Pipe ()
192219 if err != nil {
193220 return nil , err
194221 }
195222 cmd .Stdout = w
223+
196224 if err := cmd .Start (); err != nil {
197225 return nil , err
198226 }
227+ go cmd .Wait ()
228+
199229 // close after start and dup
200- stdin .Close ()
201230 w .Close ()
231+ if closer != nil {
232+ closer ()
233+ }
202234
203235 return & binaryProcessor {
204236 cmd : cmd ,
205237 r : r ,
206- mt : mt ,
238+ mt : rmt ,
207239 }, nil
208240}
209241
0 commit comments