Skip to content

Commit e1489f9

Browse files
committed
Use named pipes for windows processors
Signed-off-by: Michael Crosby <[email protected]>
1 parent 134d3c8 commit e1489f9

3 files changed

Lines changed: 256 additions & 91 deletions

File tree

diff/stream.go

Lines changed: 0 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717
package diff
1818

1919
import (
20-
"bytes"
2120
"context"
22-
"fmt"
2321
"io"
2422
"os"
25-
"os/exec"
2623

2724
"github.com/containerd/containerd/archive/compression"
2825
"github.com/containerd/containerd/images"
29-
"github.com/gogo/protobuf/proto"
3026
"github.com/gogo/protobuf/types"
3127
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
3228
"github.com/pkg/errors"
@@ -189,90 +185,3 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string
189185
}
190186

191187
const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"
192-
193-
// NewBinaryProcessor returns a binary processor for use with processing content streams
194-
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
195-
cmd := exec.CommandContext(ctx, name, args...)
196-
197-
var payloadC io.Closer
198-
if payload != nil {
199-
data, err := proto.Marshal(payload)
200-
if err != nil {
201-
return nil, err
202-
}
203-
r, w, err := os.Pipe()
204-
if err != nil {
205-
return nil, err
206-
}
207-
go func() {
208-
io.Copy(w, bytes.NewReader(data))
209-
w.Close()
210-
}()
211-
212-
cmd.ExtraFiles = append(cmd.ExtraFiles, r)
213-
payloadC = r
214-
}
215-
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
216-
var (
217-
stdin io.Reader
218-
closer func() error
219-
err error
220-
)
221-
if f, ok := stream.(RawProcessor); ok {
222-
stdin = f.File()
223-
closer = f.File().Close
224-
} else {
225-
stdin = stream
226-
}
227-
cmd.Stdin = stdin
228-
r, w, err := os.Pipe()
229-
if err != nil {
230-
return nil, err
231-
}
232-
cmd.Stdout = w
233-
234-
if err := cmd.Start(); err != nil {
235-
return nil, err
236-
}
237-
go cmd.Wait()
238-
239-
// close after start and dup
240-
w.Close()
241-
if closer != nil {
242-
closer()
243-
}
244-
if payloadC != nil {
245-
payloadC.Close()
246-
}
247-
return &binaryProcessor{
248-
cmd: cmd,
249-
r: r,
250-
mt: rmt,
251-
}, nil
252-
}
253-
254-
type binaryProcessor struct {
255-
cmd *exec.Cmd
256-
r *os.File
257-
mt string
258-
}
259-
260-
func (c *binaryProcessor) File() *os.File {
261-
return c.r
262-
}
263-
264-
func (c *binaryProcessor) MediaType() string {
265-
return c.mt
266-
}
267-
268-
func (c *binaryProcessor) Read(p []byte) (int, error) {
269-
return c.r.Read(p)
270-
}
271-
272-
func (c *binaryProcessor) Close() error {
273-
err := c.r.Close()
274-
if kerr := c.cmd.Process.Kill(); err == nil {
275-
err = kerr
276-
}
277-
return err
278-
}

diff/stream_unix.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// +build !windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package diff
20+
21+
import (
22+
"bytes"
23+
"context"
24+
"fmt"
25+
"io"
26+
"os"
27+
"os/exec"
28+
29+
"github.com/gogo/protobuf/proto"
30+
"github.com/gogo/protobuf/types"
31+
)
32+
33+
// NewBinaryProcessor returns a binary processor for use with processing content streams
34+
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
35+
cmd := exec.CommandContext(ctx, name, args...)
36+
37+
var payloadC io.Closer
38+
if payload != nil {
39+
data, err := proto.Marshal(payload)
40+
if err != nil {
41+
return nil, err
42+
}
43+
r, w, err := os.Pipe()
44+
if err != nil {
45+
return nil, err
46+
}
47+
go func() {
48+
io.Copy(w, bytes.NewReader(data))
49+
w.Close()
50+
}()
51+
52+
cmd.ExtraFiles = append(cmd.ExtraFiles, r)
53+
payloadC = r
54+
}
55+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
56+
var (
57+
stdin io.Reader
58+
closer func() error
59+
err error
60+
)
61+
if f, ok := stream.(RawProcessor); ok {
62+
stdin = f.File()
63+
closer = f.File().Close
64+
} else {
65+
stdin = stream
66+
}
67+
cmd.Stdin = stdin
68+
r, w, err := os.Pipe()
69+
if err != nil {
70+
return nil, err
71+
}
72+
cmd.Stdout = w
73+
74+
if err := cmd.Start(); err != nil {
75+
return nil, err
76+
}
77+
go cmd.Wait()
78+
79+
// close after start and dup
80+
w.Close()
81+
if closer != nil {
82+
closer()
83+
}
84+
if payloadC != nil {
85+
payloadC.Close()
86+
}
87+
return &binaryProcessor{
88+
cmd: cmd,
89+
r: r,
90+
mt: rmt,
91+
}, nil
92+
}
93+
94+
type binaryProcessor struct {
95+
cmd *exec.Cmd
96+
r *os.File
97+
mt string
98+
}
99+
100+
func (c *binaryProcessor) File() *os.File {
101+
return c.r
102+
}
103+
104+
func (c *binaryProcessor) MediaType() string {
105+
return c.mt
106+
}
107+
108+
func (c *binaryProcessor) Read(p []byte) (int, error) {
109+
return c.r.Read(p)
110+
}
111+
112+
func (c *binaryProcessor) Close() error {
113+
err := c.r.Close()
114+
if kerr := c.cmd.Process.Kill(); err == nil {
115+
err = kerr
116+
}
117+
return err
118+
}

diff/stream_windows.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// +build windows
2+
3+
/*
4+
Copyright The containerd Authors.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package diff
20+
21+
import (
22+
"bytes"
23+
"context"
24+
"fmt"
25+
"io"
26+
"io/ioutil"
27+
"os"
28+
"os/exec"
29+
"path/filepath"
30+
31+
winio "github.com/Microsoft/go-winio"
32+
"github.com/gogo/protobuf/proto"
33+
"github.com/gogo/protobuf/types"
34+
"github.com/sirupsen/logrus"
35+
)
36+
37+
const processorPipe = "STREAM_PROCESSOR_PIPE"
38+
39+
// NewBinaryProcessor returns a binary processor for use with processing content streams
40+
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
41+
cmd := exec.CommandContext(ctx, name, args...)
42+
43+
if payload != nil {
44+
data, err := proto.Marshal(payload)
45+
if err != nil {
46+
return nil, err
47+
}
48+
up, err := getUiqPath()
49+
if err != nil {
50+
return nil, err
51+
}
52+
path := fmt.Sprintf("\\\\.\\pipe\\containerd-processor-%s-pipe", up)
53+
l, err := winio.ListenPipe(path, nil)
54+
if err != nil {
55+
return nil, err
56+
}
57+
go func() {
58+
defer l.Close()
59+
conn, err := l.Accept()
60+
if err != nil {
61+
logrus.WithError(err).Error("accept npipe connection")
62+
return
63+
}
64+
io.Copy(conn, bytes.NewReader(data))
65+
conn.Close()
66+
}()
67+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", processorPipe, path))
68+
}
69+
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
70+
var (
71+
stdin io.Reader
72+
closer func() error
73+
err error
74+
)
75+
if f, ok := stream.(RawProcessor); ok {
76+
stdin = f.File()
77+
closer = f.File().Close
78+
} else {
79+
stdin = stream
80+
}
81+
cmd.Stdin = stdin
82+
r, w, err := os.Pipe()
83+
if err != nil {
84+
return nil, err
85+
}
86+
cmd.Stdout = w
87+
88+
if err := cmd.Start(); err != nil {
89+
return nil, err
90+
}
91+
go cmd.Wait()
92+
93+
// close after start and dup
94+
w.Close()
95+
if closer != nil {
96+
closer()
97+
}
98+
return &binaryProcessor{
99+
cmd: cmd,
100+
r: r,
101+
mt: rmt,
102+
}, nil
103+
}
104+
105+
type binaryProcessor struct {
106+
cmd *exec.Cmd
107+
r *os.File
108+
mt string
109+
}
110+
111+
func (c *binaryProcessor) File() *os.File {
112+
return c.r
113+
}
114+
115+
func (c *binaryProcessor) MediaType() string {
116+
return c.mt
117+
}
118+
119+
func (c *binaryProcessor) Read(p []byte) (int, error) {
120+
return c.r.Read(p)
121+
}
122+
123+
func (c *binaryProcessor) Close() error {
124+
err := c.r.Close()
125+
if kerr := c.cmd.Process.Kill(); err == nil {
126+
err = kerr
127+
}
128+
return err
129+
}
130+
131+
func getUiqPath() (string, error) {
132+
dir, err := ioutil.TempDir("", "")
133+
if err != nil {
134+
return "", err
135+
}
136+
os.Remove(dir)
137+
return filepath.Base(dir), nil
138+
}

0 commit comments

Comments
 (0)