@@ -21,21 +21,33 @@ import (
2121 "context"
2222 "fmt"
2323 "io"
24+ "net"
25+ "time"
2426
2527 "k8s.io/utils/exec"
2628
29+ "github.com/containerd/containerd/log"
2730 sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
2831 cioutil "github.com/containerd/containerd/pkg/ioutil"
2932)
3033
3134func (c * criService ) portForward (ctx context.Context , id string , port int32 , stream io.ReadWriter ) error {
35+ sandbox , err := c .sandboxStore .Get (id )
36+ if err != nil {
37+ return fmt .Errorf ("failed to find sandbox %q in store: %w" , id , err )
38+ }
39+ // host process containers
40+ if hostNetwork (sandbox .Config ) {
41+ return hpcPortForwarding (ctx , id , port , stream )
42+ }
43+
3244 stdout := cioutil .NewNopWriteCloser (stream )
3345 stderrBuffer := new (bytes.Buffer )
3446 stderr := cioutil .NewNopWriteCloser (stderrBuffer )
3547 // localhost is resolved to 127.0.0.1 in ipv4, and ::1 in ipv6.
3648 // Explicitly using ipv4 IP address in here to avoid flakiness.
3749 cmd := []string {"wincat.exe" , "127.0.0.1" , fmt .Sprint (port )}
38- err : = c .execInSandbox (ctx , id , cmd , stream , stdout , stderr )
50+ err = c .execInSandbox (ctx , id , cmd , stream , stdout , stderr )
3951 if err != nil {
4052 return fmt .Errorf ("failed to execute port forward in sandbox: %s: %w" , stderrBuffer .String (), err )
4153 }
@@ -75,3 +87,70 @@ func (c *criService) execInSandbox(ctx context.Context, sandboxID string, cmd []
7587 Code : int (* exitCode ),
7688 }
7789}
90+
91+ func hpcPortForwarding (ctx context.Context , id string , port int32 , stream io.ReadWriter ) error {
92+ // Dial to localhost for HPCs since host process containers
93+ // share network namespace of the host
94+ podIP := "localhost"
95+ err := func () error {
96+ conn , err := net .Dial ("tcp" , net .JoinHostPort (podIP , fmt .Sprintf ("%d" , port )))
97+ if err != nil {
98+ return fmt .Errorf ("failed to connect to %s:%d for pod %q: %v" , podIP , port , id , err )
99+ }
100+ log .G (ctx ).Debugf ("Connection to ip %s and port %d was successful" , podIP , port )
101+
102+ defer conn .Close ()
103+
104+ // copy stream
105+ errCh := make (chan error , 2 )
106+ // Copy from the namespace port connection to the client stream
107+ go func () {
108+ log .G (ctx ).Debugf ("PortForward copying data from namespace %q port %d to the client stream" , id , port )
109+ _ , err := io .Copy (stream , conn )
110+ errCh <- err
111+ }()
112+
113+ // Copy from the client stream to the namespace port connection
114+ go func () {
115+ log .G (ctx ).Debugf ("PortForward copying data from client stream to namespace %q port %d" , id , port )
116+ _ , err := io .Copy (conn , stream )
117+ errCh <- err
118+ }()
119+
120+ // Wait until the first error is returned by one of the connections
121+ // we use errFwd to store the result of the port forwarding operation
122+ // if the context is cancelled close everything and return
123+ var errFwd error
124+ select {
125+ case errFwd = <- errCh :
126+ log .G (ctx ).Debugf ("PortForward stop forwarding in one direction in network namespace %q port %d: %v" , id , port , errFwd )
127+ case <- ctx .Done ():
128+ log .G (ctx ).Debugf ("PortForward cancelled in network namespace %q port %d: %v" , id , port , ctx .Err ())
129+ return ctx .Err ()
130+ }
131+ // give a chance to terminate gracefully or timeout
132+ // after 1s
133+ const timeout = time .Second
134+ select {
135+ case e := <- errCh :
136+ if errFwd == nil {
137+ errFwd = e
138+ }
139+ log .G (ctx ).Debugf ("PortForward stopped forwarding in both directions in network namespace %q port %d: %v" , id , port , e )
140+ case <- time .After (timeout ):
141+ log .G (ctx ).Debugf ("PortForward timed out waiting to close the connection in network namespace %q port %d" , id , port )
142+ case <- ctx .Done ():
143+ log .G (ctx ).Debugf ("PortForward cancelled in network namespace %q port %d: %v" , id , port , ctx .Err ())
144+ errFwd = ctx .Err ()
145+ }
146+
147+ return errFwd
148+ }()
149+
150+ if err != nil {
151+ return fmt .Errorf ("failed to execute portforward for HPC podId %v, podIp %v, err: %w" , id , podIP , err )
152+ }
153+ log .G (ctx ).Debugf ("Finish port forwarding for windows %q port %d" , id , port )
154+
155+ return nil
156+ }
0 commit comments