Skip to content

Commit 3df5d44

Browse files
committed
Add support for HPC port forwarding
Signed-off-by: Kirtana Ashok <[email protected]>
1 parent ec5222f commit 3df5d44

2 files changed

Lines changed: 160 additions & 2 deletions

File tree

pkg/cri/sbserver/sandbox_portforward_windows.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,33 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24+
"net"
25+
"time"
2426

2527
"k8s.io/utils/exec"
2628

29+
"github.com/containerd/containerd/log"
2730
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
2831
cioutil "github.com/containerd/containerd/pkg/ioutil"
2932
)
3033

3134
func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriter) error {
35+
sandbox, err := c.sandboxStore.Get(id)
36+
if err != nil {
37+
return fmt.Errorf("failed to find sandbox %q in store: %w", id, err)
38+
}
39+
// host process containers
40+
if hostNetwork(sandbox.Config) {
41+
return hpcPortForwarding(ctx, id, port, stream)
42+
}
43+
3244
stdout := cioutil.NewNopWriteCloser(stream)
3345
stderrBuffer := new(bytes.Buffer)
3446
stderr := cioutil.NewNopWriteCloser(stderrBuffer)
3547
// localhost is resolved to 127.0.0.1 in ipv4, and ::1 in ipv6.
3648
// Explicitly using ipv4 IP address in here to avoid flakiness.
3749
cmd := []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}
38-
err := c.execInSandbox(ctx, id, cmd, stream, stdout, stderr)
50+
err = c.execInSandbox(ctx, id, cmd, stream, stdout, stderr)
3951
if err != nil {
4052
return fmt.Errorf("failed to execute port forward in sandbox: %s: %w", stderrBuffer.String(), err)
4153
}
@@ -75,3 +87,70 @@ func (c *criService) execInSandbox(ctx context.Context, sandboxID string, cmd []
7587
Code: int(*exitCode),
7688
}
7789
}
90+
91+
func hpcPortForwarding(ctx context.Context, id string, port int32, stream io.ReadWriter) error {
92+
// Dial to localhost for HPCs since host process containers
93+
// share network namespace of the host
94+
podIP := "localhost"
95+
err := func() error {
96+
conn, err := net.Dial("tcp", net.JoinHostPort(podIP, fmt.Sprintf("%d", port)))
97+
if err != nil {
98+
return fmt.Errorf("failed to connect to %s:%d for pod %q: %v", podIP, port, id, err)
99+
}
100+
log.G(ctx).Debugf("Connection to ip %s and port %d was successful", podIP, port)
101+
102+
defer conn.Close()
103+
104+
// copy stream
105+
errCh := make(chan error, 2)
106+
// Copy from the namespace port connection to the client stream
107+
go func() {
108+
log.G(ctx).Debugf("PortForward copying data from namespace %q port %d to the client stream", id, port)
109+
_, err := io.Copy(stream, conn)
110+
errCh <- err
111+
}()
112+
113+
// Copy from the client stream to the namespace port connection
114+
go func() {
115+
log.G(ctx).Debugf("PortForward copying data from client stream to namespace %q port %d", id, port)
116+
_, err := io.Copy(conn, stream)
117+
errCh <- err
118+
}()
119+
120+
// Wait until the first error is returned by one of the connections
121+
// we use errFwd to store the result of the port forwarding operation
122+
// if the context is cancelled close everything and return
123+
var errFwd error
124+
select {
125+
case errFwd = <-errCh:
126+
log.G(ctx).Debugf("PortForward stop forwarding in one direction in network namespace %q port %d: %v", id, port, errFwd)
127+
case <-ctx.Done():
128+
log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
129+
return ctx.Err()
130+
}
131+
// give a chance to terminate gracefully or timeout
132+
// after 1s
133+
const timeout = time.Second
134+
select {
135+
case e := <-errCh:
136+
if errFwd == nil {
137+
errFwd = e
138+
}
139+
log.G(ctx).Debugf("PortForward stopped forwarding in both directions in network namespace %q port %d: %v", id, port, e)
140+
case <-time.After(timeout):
141+
log.G(ctx).Debugf("PortForward timed out waiting to close the connection in network namespace %q port %d", id, port)
142+
case <-ctx.Done():
143+
log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
144+
errFwd = ctx.Err()
145+
}
146+
147+
return errFwd
148+
}()
149+
150+
if err != nil {
151+
return fmt.Errorf("failed to execute portforward for HPC podId %v, podIp %v, err: %w", id, podIP, err)
152+
}
153+
log.G(ctx).Debugf("Finish port forwarding for windows %q port %d", id, port)
154+
155+
return nil
156+
}

pkg/cri/server/sandbox_portforward_windows.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,33 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24+
"net"
25+
"time"
2426

2527
"k8s.io/utils/exec"
2628

29+
"github.com/containerd/containerd/log"
2730
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
2831
cioutil "github.com/containerd/containerd/pkg/ioutil"
2932
)
3033

3134
func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriter) error {
35+
sandbox, err := c.sandboxStore.Get(id)
36+
if err != nil {
37+
return fmt.Errorf("failed to find sandbox %q in store: %w", id, err)
38+
}
39+
// host process containers
40+
if hostNetwork(sandbox.Config) {
41+
return hpcPortForwarding(ctx, id, port, stream)
42+
}
43+
3244
stdout := cioutil.NewNopWriteCloser(stream)
3345
stderrBuffer := new(bytes.Buffer)
3446
stderr := cioutil.NewNopWriteCloser(stderrBuffer)
3547
// localhost is resolved to 127.0.0.1 in ipv4, and ::1 in ipv6.
3648
// Explicitly using ipv4 IP address in here to avoid flakiness.
3749
cmd := []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}
38-
err := c.execInSandbox(ctx, id, cmd, stream, stdout, stderr)
50+
err = c.execInSandbox(ctx, id, cmd, stream, stdout, stderr)
3951
if err != nil {
4052
return fmt.Errorf("failed to execute port forward in sandbox: %s: %w", stderrBuffer.String(), err)
4153
}
@@ -75,3 +87,70 @@ func (c *criService) execInSandbox(ctx context.Context, sandboxID string, cmd []
7587
Code: int(*exitCode),
7688
}
7789
}
90+
91+
func hpcPortForwarding(ctx context.Context, id string, port int32, stream io.ReadWriter) error {
92+
// Dial to localhost for HPCs since host process containers
93+
// share network namespace of the host
94+
podIP := "localhost"
95+
err := func() error {
96+
conn, err := net.Dial("tcp", net.JoinHostPort(podIP, fmt.Sprintf("%d", port)))
97+
if err != nil {
98+
return fmt.Errorf("failed to connect to %s:%d for pod %q: %v", podIP, port, id, err)
99+
}
100+
log.G(ctx).Debugf("Connection to ip %s and port %d was successful", podIP, port)
101+
102+
defer conn.Close()
103+
104+
// copy stream
105+
errCh := make(chan error, 2)
106+
// Copy from the namespace port connection to the client stream
107+
go func() {
108+
log.G(ctx).Debugf("PortForward copying data from namespace %q port %d to the client stream", id, port)
109+
_, err := io.Copy(stream, conn)
110+
errCh <- err
111+
}()
112+
113+
// Copy from the client stream to the namespace port connection
114+
go func() {
115+
log.G(ctx).Debugf("PortForward copying data from client stream to namespace %q port %d", id, port)
116+
_, err := io.Copy(conn, stream)
117+
errCh <- err
118+
}()
119+
120+
// Wait until the first error is returned by one of the connections
121+
// we use errFwd to store the result of the port forwarding operation
122+
// if the context is cancelled close everything and return
123+
var errFwd error
124+
select {
125+
case errFwd = <-errCh:
126+
log.G(ctx).Debugf("PortForward stop forwarding in one direction in network namespace %q port %d: %v", id, port, errFwd)
127+
case <-ctx.Done():
128+
log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
129+
return ctx.Err()
130+
}
131+
// give a chance to terminate gracefully or timeout
132+
// after 1s
133+
const timeout = time.Second
134+
select {
135+
case e := <-errCh:
136+
if errFwd == nil {
137+
errFwd = e
138+
}
139+
log.G(ctx).Debugf("PortForward stopped forwarding in both directions in network namespace %q port %d: %v", id, port, e)
140+
case <-time.After(timeout):
141+
log.G(ctx).Debugf("PortForward timed out waiting to close the connection in network namespace %q port %d", id, port)
142+
case <-ctx.Done():
143+
log.G(ctx).Debugf("PortForward cancelled in network namespace %q port %d: %v", id, port, ctx.Err())
144+
errFwd = ctx.Err()
145+
}
146+
147+
return errFwd
148+
}()
149+
150+
if err != nil {
151+
return fmt.Errorf("failed to execute portforward for HPC podId %v, podIp %v, err: %w", id, podIP, err)
152+
}
153+
log.G(ctx).Debugf("Finish port forwarding for windows %q port %d", id, port)
154+
155+
return nil
156+
}

0 commit comments

Comments
 (0)