Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions staging/src/k8s.io/client-go/tools/portforward/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package portforward

import (
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -357,13 +358,42 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {

errorChan := make(chan error)
go func() {
type portForwardErrResponse struct {
// server side's suggestions
// guide us whether to close the connection
CloseConnection bool `json:"closeConnection,omitempty"`
// error detail
Message string `json:"message,omitempty"`
}
message, err := io.ReadAll(errorStream)
switch {
case err != nil:
if err != nil {
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
close(errorChan)
return
}

// happy path, we have successfully completed forwarding task
if len(message) == 0 {
close(errorChan)
return
}

resp := portForwardErrResponse{}
if err := json.Unmarshal(message, &resp); err != nil {
errorChan <- fmt.Errorf("decode portforward error response for port %d -> %d: %w", port.Local, port.Remote, err)
close(errorChan)
return
}

err = fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, resp.Message)
if !resp.CloseConnection {
// logging it for debug
runtime.HandleError(err)
close(errorChan)
return
}
// We got an unexpected error and the server instructed us to close the connection.
errorChan <- err
close(errorChan)
}()

Expand Down
85 changes: 85 additions & 0 deletions staging/src/k8s.io/client-go/tools/portforward/portforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package portforward

import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"reflect"
"sort"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -631,3 +633,86 @@ func TestForwardPortsReturnsNilWhenStopChanIsClosed(t *testing.T) {
t.Fatalf("unexpected error from pf.ForwardPorts(): %s", err)
}
}

func TestHandleRemoteSendError(t *testing.T) {
type portForwardErrResponse struct {
// server side's suggestions
// guide us whether to close the connection
CloseConnection bool `json:"closeConnection,omitempty"`
// error detail
Message string `json:"message,omitempty"`
}

testCases := []struct {
resp portForwardErrResponse
}{
{
resp: portForwardErrResponse{
CloseConnection: false,
},
},
{
resp: portForwardErrResponse{
CloseConnection: true,
},
},
{
resp: portForwardErrResponse{
CloseConnection: false,
Message: syscall.EPIPE.Error(),
},
},
{
resp: portForwardErrResponse{
CloseConnection: false,
Message: syscall.ECONNRESET.Error(),
},
},
{
resp: portForwardErrResponse{
CloseConnection: true,
Message: "got unexpect err",
},
},
}

for _, testCase := range testCases {
t.Run("", func(t *testing.T) {
out := bytes.NewBufferString("")
errOut := bytes.NewBufferString("")

errStream := &bytes.Buffer{}
if err := json.NewEncoder(errStream).Encode(testCase.resp); err != nil {
t.Fatalf("encode the err response: %v", err)
}

pf, err := New(&fakeDialer{}, []string{":2222"}, nil, nil, out, errOut)
if err != nil {
t.Fatalf("error while calling New: %s", err)
}

// Setup fake local connection
localConnection := &fakeConn{
sendBuffer: bytes.NewBufferString(""),
receiveBuffer: bytes.NewBufferString(""),
}

// Setup fake remote connection to return an error message on the error stream
remoteDataToSend := bytes.NewBufferString("")
remoteDataReceived := bytes.NewBufferString("")
remoteConnection := newFakeConnection()
remoteConnection.dataStream.readFunc = remoteDataToSend.Read
remoteConnection.dataStream.writeFunc = remoteDataReceived.Write
remoteConnection.errorStream.readFunc = errStream.Read
pf.streamConn = remoteConnection

// Test handleConnection, using go-routine because it needs to be able to write to unbuffered pf.errorChan
pf.handleConnection(localConnection, ForwardedPort{Local: 1111, Remote: 2222})

underlayConn := pf.streamConn.(*fakeConnection)
if testCase.resp.CloseConnection != underlayConn.closed {
t.Fatalf("want %v but got %v", testCase.resp.CloseConnection, underlayConn.closed)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ Loop:
// function for the given stream pair.
func (h *httpStreamHandler) portForward(p *httpStreamPair) {
ctx := context.Background()
defer p.dataStream.Close()
defer p.errorStream.Close()
defer p.dataStream.Close() //nolint: errcheck
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we have the same problem as in here and we should Reset() instead of Close()

#126718 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see , discussed here #117493 (comment)

defer p.errorStream.Close() //nolint: errcheck

portString := p.dataStream.Headers().Get(api.PortHeader)
port, _ := strconv.ParseInt(portString, 10, 32)
Expand All @@ -252,11 +252,7 @@ func (h *httpStreamHandler) portForward(p *httpStreamPair) {
err := h.forwarder.PortForward(ctx, h.pod, h.uid, int32(port), p.dataStream)
klog.V(5).InfoS("Connection request done invoking forwarder.PortForward for port", "connection", h.conn, "request", p.requestID, "port", portString)

if err != nil {
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
utilruntime.HandleError(msg)
fmt.Fprint(p.errorStream, msg.Error())
}
handleStreamPortForwardErr(err, h.pod, int32(port), h.uid, p.requestID, h.conn, p.errorStream)
}

// httpStreamPair represents the error and data streams for a port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,32 @@ package portforward

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"syscall"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
"k8s.io/apimachinery/pkg/util/runtime"

"k8s.io/klog/v2"
)

// portForwardErrResponse
// It will be sent to the client to instruct the client whether to close the Connection.
// it should be kept in sync with those defined in staging/src/k8s.io/client-go/tools/portforward/portforward.go:361
type portForwardErrResponse struct {
// unexpected error was encountered and the connection needs to be closed.
CloseConnection bool `json:"closeConnection,omitempty"`
// error details
// regardless of whether we encounter an expected error, we still need to set it up.
Message string `json:"message,omitempty"`
}

// PortForwarder knows how to forward content from a data stream to/from a port
// in a pod.
type PortForwarder interface {
Expand All @@ -52,3 +69,41 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po
return
}
}

// handleStreamPortForwardErr
// Whether it is httpStream or WebSocket, we use the same method to handle errors.
func handleStreamPortForwardErr(err error, pod string, port int32, uid types.UID, requestID string, conn any, errorStream io.WriteCloser) {
// happy path, we have successfully completed forwarding task
if err == nil {
return
}

errResp := portForwardErrResponse{
CloseConnection: false,
}
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
// During the forwarding process, if these types of errors are encountered,
// we should continue to provide port forwarding services.
//
// These two errors can occur in the following scenarios:
// ECONNRESET: the target process reset connection between CRI and itself.
// see: https://github.com/kubernetes/kubernetes/issues/111825 for detail
//
// EPIPE: the target process did not read the received data, causing the
// buffer in the kernel to be full, resulting in the occurrence of Zero Window,
// then closing the connection (FIN, RESET)
// see: https://github.com/kubernetes/kubernetes/issues/74551 for detail
klog.ErrorS(err, "forwarding port", "conn", conn, "request", requestID, "port", port)
} else {
errResp.CloseConnection = true
}

// send error messages to the client to let our users know what happened.
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %w", port, pod, uid, err)
errResp.Message = msg.Error()
runtime.HandleError(msg)
err = json.NewEncoder(errorStream).Encode(errResp)
if err != nil {
klog.ErrorS(err, "encode the resp", "conn", conn, "request", requestID, "port", port)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
)

Expand Down Expand Up @@ -184,16 +183,12 @@ func (h *websocketStreamHandler) run() {

func (h *websocketStreamHandler) portForward(p *websocketStreamPair) {
ctx := context.Background()
defer p.dataStream.Close()
defer p.errorStream.Close()
defer p.dataStream.Close() //nolint: errcheck
defer p.errorStream.Close() //nolint: errcheck

klog.V(5).InfoS("Connection invoking forwarder.PortForward for port", "connection", h.conn, "port", p.port)
err := h.forwarder.PortForward(ctx, h.pod, h.uid, p.port, p.dataStream)
klog.V(5).InfoS("Connection done invoking forwarder.PortForward for port", "connection", h.conn, "port", p.port)

if err != nil {
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", p.port, h.pod, h.uid, err)
runtime.HandleError(msg)
fmt.Fprint(p.errorStream, msg.Error())
}
handleStreamPortForwardErr(err, h.pod, p.port, h.uid, "", h.conn, p.errorStream)
}
21 changes: 21 additions & 0 deletions test/e2e/kubectl/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,27 @@ func curl(url string) (string, error) {
return curlTransport(url, utilnet.SetTransportDefaults(&http.Transport{}))
}

func post(url string, reader io.Reader, transport *http.Transport) (string, error) {
if transport == nil {
transport = utilnet.SetTransportDefaults(&http.Transport{})
}
client := &http.Client{Transport: transport}
req, err := http.NewRequest(http.MethodPost, url, reader)
if err != nil {
return "", err
}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close() //nolint: errcheck
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
return string(body), nil
}

func validateGuestbookApp(ctx context.Context, c clientset.Interface, ns string) {
framework.Logf("Waiting for all frontend pods to be Running.")
label := labels.SelectorFromSet(labels.Set(map[string]string{"tier": "frontend", "app": "guestbook"}))
Expand Down
Loading