Skip to content

Commit cf787bd

Browse files
committed
DelayedClientTransport and fix TransportSet.shutdown() semantics.
Always return a completed future from `TransportSet`. If a (real) transport has not been created (e.g., in reconnect back-off), a `DelayedClientTransport` will be returned. Eventually we will get rid of the transport futures everywhere, and have streams always __owned__ by some transports. DelayedClientTransport ---------------------- After we get rid of the transport future, this is what `ClientCallImpl` and `LoadBalancer` get when a real transport has not been created yet. It buffers new streams and pings until `setTransport()` is called, after which point all buffered and future streams/pings are transferred to the real transport. If a buffered stream is cancelled, `DelayedClientTransport` will remove it from the buffer list, thus #1342 will be resolved after the larger refactoring is complete. This PR only makes `TransportSet` use `DelayedClientTransport`. Follow-up changes will be made to allow `LoadBalancer.pickTransport()` to return null, in which case `ManagedChannelImpl` will give `ClientCallImpl` a `DelayedClientTransport`. Changes to ClientTransport shutdown semantics --------------------------------------------- Previously when shutdown() is called, `ClientTransport` should not accept newStream(), and when all existing streams have been closed, `ClientTransport` is terminated. Only when a transport is terminated would a transport owner (e.g., `TransportSet`) remove the reference to it. `DelayedClientTransport` brings about a new case: when `setTransport()` is called, we switch to the real transport and no longer need the delayed transport. This is achieved by calling `shutdown()` on the delayed transport and letting it terminate. However, as the delayed transport has already been handed out to users, we would like `newStream()` to keep working for them, even though the delayed transport is already shut down and terminated. In order to make it easy to manage the life-cycle of `DelayedClientTransport`, we redefine the shutdown semantics of transport: - A transport can own a stream. Typically the transport owns the streams it creates, but there may be exceptions. `DelayedClientTransport` DOES NOT OWN the streams it returns from `newStream()` after `setTransport()` has been called. Instead, the ownership would be transferred to the real transport. - After `shutdown()` has been called, the transport stops owning new streams, and `newStream()` may still succeed. With this idea, `DelayedClientTransport`, even when terminated, will continue passing `newStream()` to the real transport. - When a transport is in shutdown state, and it doesn't own any stream, it then can enter terminated state. ManagedClientTransport / ClientTransport ---------------------------------------- Remove life-cycle interfaces from `ClientTransport`, and put them in its subclass - `ManagedClientTransport`, with the same idea that we have `Channel` and `ManagedChannel`. Only the one who creates the transport will get `ManagedClientTransport` thus is able to start and shutdown the transport. The users of transport, e.g., `LoadBalancer`, can only get `ClientTransport` thus are not alter its state. This change clarifies the responsibility of transport life-cycle management. Fix TransportSet shutdown semantics ----------------------------------- Currently, if `TransportSet.shutdown()` has been called, it no longer create new transports, which is wrong. The correct semantics of `TransportSet.shutdown()` should be: - Shutdown all transports, thus stop new streams being created on them - Stop `obtainActiveTransport()` from returning transports - Streams that already created, including those buffered in delayed transport, should continue. That means if delayed transport has buffered streams, we should let the existing reconnect task continue.
1 parent 544cd3a commit cf787bd

22 files changed

Lines changed: 929 additions & 257 deletions

core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import io.grpc.ExperimentalApi;
3737
import io.grpc.internal.AbstractManagedChannelImplBuilder;
3838
import io.grpc.internal.AbstractReferenceCounted;
39-
import io.grpc.internal.ClientTransport;
4039
import io.grpc.internal.ClientTransportFactory;
40+
import io.grpc.internal.ManagedClientTransport;
4141

4242
import java.net.SocketAddress;
4343

@@ -89,7 +89,7 @@ private InProcessClientTransportFactory(String name) {
8989
}
9090

9191
@Override
92-
public ClientTransport newClientTransport(SocketAddress addr, String authority) {
92+
public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) {
9393
return new InProcessTransport(name);
9494
}
9595

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import io.grpc.Status;
4141
import io.grpc.internal.ClientStream;
4242
import io.grpc.internal.ClientStreamListener;
43-
import io.grpc.internal.ClientTransport;
43+
import io.grpc.internal.ManagedClientTransport;
4444
import io.grpc.internal.NoopClientStream;
4545
import io.grpc.internal.ServerStream;
4646
import io.grpc.internal.ServerStreamListener;
@@ -59,12 +59,12 @@
5959
import javax.annotation.concurrent.ThreadSafe;
6060

6161
@ThreadSafe
62-
class InProcessTransport implements ServerTransport, ClientTransport {
62+
class InProcessTransport implements ServerTransport, ManagedClientTransport {
6363
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
6464

6565
private final String name;
6666
private ServerTransportListener serverTransportListener;
67-
private ClientTransport.Listener clientTransportListener;
67+
private ManagedClientTransport.Listener clientTransportListener;
6868
@GuardedBy("this")
6969
private boolean shutdown;
7070
@GuardedBy("this")
@@ -79,7 +79,7 @@ public InProcessTransport(String name) {
7979
}
8080

8181
@Override
82-
public synchronized void start(ClientTransport.Listener listener) {
82+
public synchronized void start(ManagedClientTransport.Listener listener) {
8383
this.clientTransportListener = listener;
8484
InProcessServer server = InProcessServer.findServer(name);
8585
if (server != null) {
@@ -152,7 +152,7 @@ public void run() {
152152

153153
@Override
154154
public synchronized void shutdown() {
155-
// Can be called multiple times: once for ClientTransport, once for ServerTransport.
155+
// Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
156156
if (shutdown) {
157157
return;
158158
}

core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,8 @@ private static class AuthorityOverridingTransportFactory implements ClientTransp
230230
}
231231

232232
@Override
233-
public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
233+
public ManagedClientTransport newClientTransport(SocketAddress serverAddress,
234+
String authority) {
234235
return factory.newClientTransport(
235236
serverAddress, authorityOverride != null ? authorityOverride : authority);
236237
}

core/src/main/java/io/grpc/internal/ClientTransport.java

Lines changed: 11 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014, Google Inc. All rights reserved.
2+
* Copyright 2016, Google Inc. All rights reserved.
33
*
44
* Redistribution and use in source and binary forms, with or without
55
* modification, are permitted provided that the following conditions are
@@ -33,25 +33,21 @@
3333

3434
import io.grpc.Metadata;
3535
import io.grpc.MethodDescriptor;
36-
import io.grpc.Status;
3736

3837
import java.util.concurrent.Executor;
39-
4038
import javax.annotation.concurrent.ThreadSafe;
4139

4240
/**
43-
* The client-side transport encapsulating a single connection to a remote server. Allows creation
44-
* of new {@link Stream} instances for communication with the server. All methods on the transport
45-
* and its listener are expected to execute quickly.
46-
*
47-
* <p>{@link #start} must be the first method call to this interface and return before calling other
48-
* methods.
41+
* The client-side transport typically encapsulating a single connection to a remote
42+
* server. However, streams created before the client has discovered any server address may
43+
* eventually be issued on different connections. All methods on the transport and its callbacks
44+
* are expected to execute quickly.
4945
*/
5046
@ThreadSafe
5147
public interface ClientTransport {
5248

5349
/**
54-
* Creates a new stream for sending messages to the remote end-point.
50+
* Creates a new stream for sending messages to a remote end-point.
5551
*
5652
* <p>
5753
* This method returns immediately and does not wait for any validation of the request. If
@@ -67,64 +63,18 @@ public interface ClientTransport {
6763
ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers);
6864

6965
/**
70-
* Starts transport. This method may only be called once.
71-
*
72-
* <p>Implementations must not call {@code listener} from within {@link #start}; implementations
73-
* are expected to notify listener on a separate thread. This method should not throw any
74-
* exceptions.
66+
* Pings a remote endpoint. When an acknowledgement is received, the given callback will be
67+
* invoked using the given executor.
7568
*
76-
* @param listener non-{@code null} listener of transport events
77-
*/
78-
void start(Listener listener);
79-
80-
/**
81-
* Pings the remote endpoint to verify that the transport is still active. When an acknowledgement
82-
* is received, the given callback will be invoked using the given executor.
69+
* <p>Pings are not necessarily sent to the same endpont, thus a successful ping only means at
70+
* least one endpoint responded, but doesn't imply the availability of other endpoints (if there
71+
* is any).
8372
*
8473
* <p>This is an optional method. Transports that do not have any mechanism by which to ping the
8574
* remote endpoint may throw {@link UnsupportedOperationException}.
8675
*/
8776
void ping(PingCallback callback, Executor executor);
8877

89-
/**
90-
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
91-
* fail (once {@link Listener#transportShutdown} callback called). This method may only be called
92-
* once.
93-
*/
94-
void shutdown();
95-
96-
/**
97-
* Receives notifications for the transport life-cycle events. Implementation does not need to be
98-
* thread-safe, so notifications must be properly sychronized externally.
99-
*/
100-
interface Listener {
101-
/**
102-
* The transport is shutting down. No new streams will be processed, but existing streams may
103-
* continue. Shutdown could have been caused by an error or normal operation. It is possible
104-
* that this method is called without {@link #shutdown} being called. If the argument to this
105-
* function is {@link Status#isOk}, it is safe to immediately reconnect.
106-
*
107-
* <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
108-
*
109-
* @param s the reason for the shutdown.
110-
*/
111-
void transportShutdown(Status s);
112-
113-
/**
114-
* The transport completed shutting down. All resources have been released.
115-
*
116-
* <p>This is called exactly once, and must be called after {@link #transportShutdown} has been
117-
* called.
118-
*/
119-
void transportTerminated();
120-
121-
/**
122-
* The transport is ready to accept traffic, because the connection is established. This is
123-
* called at most once.
124-
*/
125-
void transportReady();
126-
}
127-
12878
/**
12979
* A callback that is invoked when the acknowledgement to a {@link #ping} is received. Exactly one
13080
* of the two methods should be called per {@link #ping}.

core/src/main/java/io/grpc/internal/ClientTransportFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@
3333

3434
import java.net.SocketAddress;
3535

36-
/** Pre-configured factory for creating {@link ClientTransport} instances. */
36+
/** Pre-configured factory for creating {@link ManagedClientTransport} instances. */
3737
public interface ClientTransportFactory extends ReferenceCounted {
3838
/**
3939
* Creates an unstarted transport for exclusive use.
4040
*
4141
* @param serverAddress the address that the transport is connected to
4242
* @param authority the HTTP/2 authority of the server
4343
*/
44-
ClientTransport newClientTransport(SocketAddress serverAddress, String authority);
44+
ManagedClientTransport newClientTransport(SocketAddress serverAddress, String authority);
4545
}

0 commit comments

Comments
 (0)