Skip to content

Commit 552a0b1

Browse files
committed
Handle stderr in stream processors
Signed-off-by: Michael Crosby <[email protected]>
1 parent 3fded74 commit 552a0b1

3 files changed

Lines changed: 86 additions & 18 deletions

File tree

diff/apply/apply.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,14 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
7676
}
7777
defer ra.Close()
7878

79+
var processors []diff.StreamProcessor
7980
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
81+
processors = append(processors, processor)
8082
for {
8183
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
8284
return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType)
8385
}
86+
processors = append(processors, processor)
8487
if processor.MediaType() == ocispec.MediaTypeImageLayer {
8588
break
8689
}
@@ -102,6 +105,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
102105
}); err != nil {
103106
return emptyDesc, err
104107
}
108+
109+
for _, p := range processors {
110+
if ep, ok := p.(interface {
111+
Err() error
112+
}); ok {
113+
if err := ep.Err(); err != nil {
114+
return emptyDesc, err
115+
}
116+
}
117+
}
105118
return ocispec.Descriptor{
106119
MediaType: ocispec.MediaTypeImageLayer,
107120
Size: rc.c,

diff/stream_unix.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@ import (
2525
"io"
2626
"os"
2727
"os/exec"
28+
"sync"
2829

2930
"github.com/gogo/protobuf/proto"
3031
"github.com/gogo/protobuf/types"
32+
"github.com/pkg/errors"
3133
)
3234

3335
// NewBinaryProcessor returns a binary processor for use with processing content streams
3436
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
3537
cmd := exec.CommandContext(ctx, name, args...)
38+
cmd.Env = os.Environ()
3639

3740
var payloadC io.Closer
3841
if payload != nil {
@@ -71,10 +74,19 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce
7174
}
7275
cmd.Stdout = w
7376

77+
stderr := bytes.NewBuffer(nil)
78+
cmd.Stderr = stderr
79+
7480
if err := cmd.Start(); err != nil {
7581
return nil, err
7682
}
77-
go cmd.Wait()
83+
p := &binaryProcessor{
84+
cmd: cmd,
85+
r: r,
86+
mt: rmt,
87+
stderr: stderr,
88+
}
89+
go p.wait()
7890

7991
// close after start and dup
8092
w.Close()
@@ -84,17 +96,33 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce
8496
if payloadC != nil {
8597
payloadC.Close()
8698
}
87-
return &binaryProcessor{
88-
cmd: cmd,
89-
r: r,
90-
mt: rmt,
91-
}, nil
99+
return p, nil
92100
}
93101

94102
type binaryProcessor struct {
95-
cmd *exec.Cmd
96-
r *os.File
97-
mt string
103+
cmd *exec.Cmd
104+
r *os.File
105+
mt string
106+
stderr *bytes.Buffer
107+
108+
mu sync.Mutex
109+
err error
110+
}
111+
112+
func (c *binaryProcessor) Err() error {
113+
c.mu.Lock()
114+
defer c.mu.Unlock()
115+
return c.err
116+
}
117+
118+
func (c *binaryProcessor) wait() {
119+
if err := c.cmd.Wait(); err != nil {
120+
if _, ok := err.(*exec.ExitError); ok {
121+
c.mu.Lock()
122+
c.err = errors.New(c.stderr.String())
123+
c.mu.Unlock()
124+
}
125+
}
98126
}
99127

100128
func (c *binaryProcessor) File() *os.File {

diff/stream_windows.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
"os"
2828
"os/exec"
2929
"path/filepath"
30+
"sync"
3031

3132
winio "github.com/Microsoft/go-winio"
3233
"github.com/gogo/protobuf/proto"
3334
"github.com/gogo/protobuf/types"
35+
"github.com/pkg/errors"
3436
"github.com/sirupsen/logrus"
3537
)
3638

@@ -39,6 +41,7 @@ const processorPipe = "STREAM_PROCESSOR_PIPE"
3941
// NewBinaryProcessor returns a binary processor for use with processing content streams
4042
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
4143
cmd := exec.CommandContext(ctx, name, args...)
44+
cmd.Env = os.Environ()
4245

4346
if payload != nil {
4447
data, err := proto.Marshal(payload)
@@ -84,28 +87,52 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce
8487
return nil, err
8588
}
8689
cmd.Stdout = w
90+
stderr := bytes.NewBuffer(nil)
91+
cmd.Stderr = stderr
8792

8893
if err := cmd.Start(); err != nil {
8994
return nil, err
9095
}
91-
go cmd.Wait()
96+
p := &binaryProcessor{
97+
cmd: cmd,
98+
r: r,
99+
mt: rmt,
100+
stderr: stderr,
101+
}
102+
go p.wait()
92103

93104
// close after start and dup
94105
w.Close()
95106
if closer != nil {
96107
closer()
97108
}
98-
return &binaryProcessor{
99-
cmd: cmd,
100-
r: r,
101-
mt: rmt,
102-
}, nil
109+
return p, nil
103110
}
104111

105112
type binaryProcessor struct {
106-
cmd *exec.Cmd
107-
r *os.File
108-
mt string
113+
cmd *exec.Cmd
114+
r *os.File
115+
mt string
116+
stderr *bytes.Buffer
117+
118+
mu sync.Mutex
119+
err error
120+
}
121+
122+
func (c *binaryProcessor) Err() error {
123+
c.mu.Lock()
124+
defer c.mu.Unlock()
125+
return c.err
126+
}
127+
128+
func (c *binaryProcessor) wait() {
129+
if err := c.cmd.Wait(); err != nil {
130+
if _, ok := err.(*exec.ExitError); ok {
131+
c.mu.Lock()
132+
c.err = errors.New(c.stderr.String())
133+
c.mu.Unlock()
134+
}
135+
}
109136
}
110137

111138
func (c *binaryProcessor) File() *os.File {

0 commit comments

Comments
 (0)