mcp/streamable: add resumability for the Streamable transport#133
Conversation
891161c to
abc4c9d
Compare
jba
left a comment
There was a problem hiding this comment.
First preliminary review. With Rob out, I want to really understand this. That may take a day or two.
5a2d5aa to
9d77544
Compare
9d77544 to
881274b
Compare
This CL implements a retry mechanism to resume SSE streams to recover from network failures.
881274b to
649f399
Compare
| return | ||
| case s.incoming <- evt.Data: | ||
|
|
||
| if !isResumable(resp) { |
There was a problem hiding this comment.
I think we may also want to try resuming if the error from establishSSE is non-nil. For example, if the network is partitioned, that might manifest as a timeout error instead of an HTTP response. But we can leave that for a later PR.
There was a problem hiding this comment.
With the way it's written- we will retry another attempt if the error is non-nil. Are you saying that we have to do something special in that case?
mcp/streamable_test.go
Outdated
| } | ||
|
|
||
| // Perform handshake. | ||
| initReq := &jsonrpc.Request{ID: jsonrpc2.Int64ID(100), Method: "initialize", Params: mustMarshal(t, &InitializeParams{})} |
There was a problem hiding this comment.
Why does this have to happen by hand?
I guess what I mean is, can you write this test with a Client instead of at the transport level? If you need to reach into the transport for something, maybe you could add test hooks to the transport code. That may ultimately be cleaner that re-writing the init protocol.
There was a problem hiding this comment.
Good point. Updated to test with Client.
mcp/streamable_test.go
Outdated
| if err != nil { | ||
| t.Fatalf("Failed to read notification: %v", err) | ||
| } | ||
| if req, ok := msg.(*jsonrpc.Request); ok && req.Method == "notifications/progress" { |
There was a problem hiding this comment.
Should it also be an error if you see a non-notification?
There was a problem hiding this comment.
Updated to only hook into progress notifications.
| @@ -223,39 +197,37 @@ func TestClientReplayAfterProxyBreak(t *testing.T) { | |||
| go restartedProxy.Serve(listener) | |||
There was a problem hiding this comment.
How is this goroutine stopped?
There was a problem hiding this comment.
restartedProxy.Close() will terminate it.
"Serve always returns a non-nil error and closes l. After [Server.Shutdown] or [Server.Close], the returned error is [ErrServerClosed]."
mcp/streamable_test.go
Outdated
| // that is killed and restarted to simulate a recoverable network outage. | ||
| func TestClientReplayAfterProxyBreak(t *testing.T) { | ||
| func TestClientReplay(t *testing.T) { | ||
| notifications := make(chan string, 10) |
There was a problem hiding this comment.
Document how the channel buffer size affects correctness. (e.g. must be at least as large as the number of notifications sent in the tool handler on L125?)
There was a problem hiding this comment.
Looks like buffer size has no effect since we collect them all. Removed the size of 10.
This CL implements a retry mechanism to resume SSE streams to recover from network failures.
For #10
I referenced