Skip to content

Commit c309efd

Browse files
authored
feat(resolver): Send resolved address in ROUTE routing context (#1732)
Sending resolved address in ROUTE routing context helps when connecting to DBMS that has `dbms.routing.default_router=SERVER` mode enabled as returned routing table includes the address that the driver could establish connection with.
1 parent ea2b8c3 commit c309efd

14 files changed

Lines changed: 308 additions & 62 deletions

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import org.neo4j.driver.Bookmark;
2323
import org.neo4j.driver.internal.DatabaseName;
2424
import org.neo4j.driver.internal.spi.Connection;
25+
import org.neo4j.driver.net.ServerAddress;
2526

2627
public interface ClusterCompositionProvider {
2728
CompletionStage<ClusterComposition> getClusterComposition(
28-
Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser);
29+
Connection connection,
30+
ServerAddress address,
31+
DatabaseName databaseName,
32+
Bookmark bookmark,
33+
String impersonatedUser);
2934
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,8 @@ private CompletionStage<ClusterComposition> lookupOnRouter(
281281
.thenApply(address -> addAndReturn(seenServers, address))
282282
.thenCompose(connectionPool::acquire)
283283
.thenApply(connection -> ImpersonationUtil.ensureImpersonationSupport(connection, impersonatedUser))
284-
.thenCompose(connection ->
285-
provider.getClusterComposition(connection, routingTable.database(), bookmark, impersonatedUser))
284+
.thenCompose(connection -> provider.getClusterComposition(
285+
connection, routerAddress, routingTable.database(), bookmark, impersonatedUser))
286286
.handle((response, error) -> {
287287
Throwable cause = Futures.completionExceptionCause(error);
288288
if (cause != null) {

driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.neo4j.driver.internal.handlers.RouteMessageResponseHandler;
4040
import org.neo4j.driver.internal.messaging.request.RouteMessage;
4141
import org.neo4j.driver.internal.spi.Connection;
42+
import org.neo4j.driver.net.ServerAddress;
4243

4344
/**
4445
* This implementation of the {@link RoutingProcedureRunner} access the routing procedure
@@ -61,10 +62,21 @@ protected RouteMessageRoutingProcedureRunner(
6162

6263
@Override
6364
public CompletionStage<RoutingProcedureResponse> run(
64-
Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser) {
65+
Connection connection,
66+
ServerAddress address,
67+
DatabaseName databaseName,
68+
Bookmark bookmark,
69+
String impersonatedUser) {
6570
CompletableFuture<Map<String, Value>> completableFuture = createCompletableFuture.get();
6671

6772
DirectConnection directConnection = toDirectConnection(connection, databaseName, impersonatedUser);
73+
Map<String, Value> routingContext = this.routingContext;
74+
if (routingContext.containsKey(RoutingContext.ROUTING_ADDRESS_KEY)) {
75+
routingContext = new HashMap<>(routingContext);
76+
routingContext.put(
77+
RoutingContext.ROUTING_ADDRESS_KEY,
78+
Values.value(String.format("%s:%d", address.host(), address.port())));
79+
}
6880
directConnection.writeAndFlush(
6981
new RouteMessage(
7082
routingContext, bookmark, databaseName.databaseName().orElse(null), impersonatedUser),

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
public class RoutingContext {
3131
public static final RoutingContext EMPTY = new RoutingContext();
32-
private static final String ROUTING_ADDRESS_KEY = "address";
32+
public static final String ROUTING_ADDRESS_KEY = "address";
3333

3434
private final Map<String, String> context;
3535
private final boolean isServerRoutingEnabled;

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.internal.DatabaseName;
3535
import org.neo4j.driver.internal.spi.Connection;
3636
import org.neo4j.driver.internal.util.Clock;
37+
import org.neo4j.driver.net.ServerAddress;
3738

3839
public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider {
3940
private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to ";
@@ -64,7 +65,11 @@ public RoutingProcedureClusterCompositionProvider(Clock clock, RoutingContext ro
6465

6566
@Override
6667
public CompletionStage<ClusterComposition> getClusterComposition(
67-
Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser) {
68+
Connection connection,
69+
ServerAddress address,
70+
DatabaseName databaseName,
71+
Bookmark bookmark,
72+
String impersonatedUser) {
6873
RoutingProcedureRunner runner;
6974

7075
if (supportsRouteMessage(connection)) {
@@ -75,7 +80,8 @@ public CompletionStage<ClusterComposition> getClusterComposition(
7580
runner = singleDatabaseRoutingProcedureRunner;
7681
}
7782

78-
return runner.run(connection, databaseName, bookmark, impersonatedUser).thenApply(this::processRoutingResponse);
83+
return runner.run(connection, address, databaseName, bookmark, impersonatedUser)
84+
.thenApply(this::processRoutingResponse);
7985
}
8086

8187
private ClusterComposition processRoutingResponse(RoutingProcedureResponse response) {

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.neo4j.driver.Bookmark;
2323
import org.neo4j.driver.internal.DatabaseName;
2424
import org.neo4j.driver.internal.spi.Connection;
25+
import org.neo4j.driver.net.ServerAddress;
2526

2627
/**
2728
* Interface which defines the standard way to get the routing table
@@ -31,11 +32,16 @@ public interface RoutingProcedureRunner {
3132
* Run the calls to the server
3233
*
3334
* @param connection The connection which will be used to call the server
35+
* @param address The router address to include in routing context
3436
* @param databaseName The database name
3537
* @param bookmark The bookmark used to query the routing information
3638
* @param impersonatedUser The impersonated user, should be {@code null} for non-impersonated requests
3739
* @return The routing table
3840
*/
3941
CompletionStage<RoutingProcedureResponse> run(
40-
Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser);
42+
Connection connection,
43+
ServerAddress address,
44+
DatabaseName databaseName,
45+
Bookmark bookmark,
46+
String impersonatedUser);
4147
}

driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.neo4j.driver.internal.spi.Connection;
4141
import org.neo4j.driver.internal.util.Futures;
4242
import org.neo4j.driver.internal.util.ServerVersion;
43+
import org.neo4j.driver.net.ServerAddress;
4344

4445
/**
4546
* This implementation of the {@link RoutingProcedureRunner} works with single database versions of Neo4j calling
@@ -59,7 +60,11 @@ public SingleDatabaseRoutingProcedureRunner(RoutingContext context, Logging logg
5960

6061
@Override
6162
public CompletionStage<RoutingProcedureResponse> run(
62-
Connection connection, DatabaseName databaseName, Bookmark bookmark, String impersonatedUser) {
63+
Connection connection,
64+
ServerAddress address,
65+
DatabaseName databaseName,
66+
Bookmark bookmark,
67+
String impersonatedUser) {
6368
DirectConnection delegate = connection(connection);
6469
Query procedure = procedureQuery(connection.serverVersion(), databaseName);
6570
BookmarkHolder bookmarkHolder = bookmarkHolder(bookmark);

driver/src/test/java/org/neo4j/driver/internal/cluster/AbstractRoutingProcedureRunnerTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.neo4j.driver.Record;
3838
import org.neo4j.driver.exceptions.ClientException;
3939
import org.neo4j.driver.internal.spi.Connection;
40+
import org.neo4j.driver.net.ServerAddress;
4041

4142
abstract class AbstractRoutingProcedureRunnerTest {
4243
@Test
@@ -45,7 +46,8 @@ void shouldReturnFailedResponseOnClientException() {
4546
SingleDatabaseRoutingProcedureRunner runner =
4647
singleDatabaseRoutingProcedureRunner(RoutingContext.EMPTY, failedFuture(error));
4748

48-
RoutingProcedureResponse response = await(runner.run(connection(), defaultDatabase(), empty(), null));
49+
RoutingProcedureResponse response =
50+
await(runner.run(connection(), ServerAddress.of("localhost", 7687), defaultDatabase(), empty(), null));
4951

5052
assertFalse(response.isSuccess());
5153
assertEquals(error, response.error());
@@ -57,8 +59,10 @@ void shouldReturnFailedStageOnError() {
5759
SingleDatabaseRoutingProcedureRunner runner =
5860
singleDatabaseRoutingProcedureRunner(RoutingContext.EMPTY, failedFuture(error));
5961

60-
Exception e =
61-
assertThrows(Exception.class, () -> await(runner.run(connection(), defaultDatabase(), empty(), null)));
62+
Exception e = assertThrows(
63+
Exception.class,
64+
() -> await(runner.run(
65+
connection(), ServerAddress.of("localhost", 7687), defaultDatabase(), empty(), null)));
6266
assertEquals(error, e);
6367
}
6468

@@ -67,7 +71,8 @@ void shouldReleaseConnectionOnSuccess() {
6771
SingleDatabaseRoutingProcedureRunner runner = singleDatabaseRoutingProcedureRunner(RoutingContext.EMPTY);
6872

6973
Connection connection = connection();
70-
RoutingProcedureResponse response = await(runner.run(connection, defaultDatabase(), empty(), null));
74+
RoutingProcedureResponse response =
75+
await(runner.run(connection, ServerAddress.of("localhost", 7687), defaultDatabase(), empty(), null));
7176

7277
assertTrue(response.isSuccess());
7378
verify(connection).release();
@@ -81,7 +86,9 @@ void shouldPropagateReleaseError() {
8186
Connection connection = connection(failedFuture(releaseError));
8287

8388
RuntimeException e = assertThrows(
84-
RuntimeException.class, () -> await(runner.run(connection, defaultDatabase(), empty(), null)));
89+
RuntimeException.class,
90+
() -> await(
91+
runner.run(connection, ServerAddress.of("localhost", 7687), defaultDatabase(), empty(), null)));
8592
assertEquals(releaseError, e);
8693
verify(connection).release();
8794
}

driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@
5151
import org.neo4j.driver.internal.BookmarkHolder;
5252
import org.neo4j.driver.internal.ReadOnlyBookmarkHolder;
5353
import org.neo4j.driver.internal.spi.Connection;
54+
import org.neo4j.driver.net.ServerAddress;
5455

5556
class MultiDatabasesRoutingProcedureRunnerTest extends AbstractRoutingProcedureRunnerTest {
5657
@ParameterizedTest
5758
@ValueSource(strings = {"", SYSTEM_DATABASE_NAME, " this is a db name "})
5859
void shouldCallGetRoutingTableWithEmptyMapOnSystemDatabaseForDatabase(String db) {
5960
TestRoutingProcedureRunner runner = new TestRoutingProcedureRunner(RoutingContext.EMPTY);
60-
RoutingProcedureResponse response = await(runner.run(connection(), database(db), empty(), null));
61+
RoutingProcedureResponse response =
62+
await(runner.run(connection(), ServerAddress.of("localhost", 7687), database(db), empty(), null));
6163

6264
assertTrue(response.isSuccess());
6365
assertEquals(1, response.records().size());
@@ -77,7 +79,8 @@ void shouldCallGetRoutingTableWithParamOnSystemDatabaseForDatabase(String db) {
7779
RoutingContext context = new RoutingContext(uri);
7880

7981
TestRoutingProcedureRunner runner = new TestRoutingProcedureRunner(context);
80-
RoutingProcedureResponse response = await(runner.run(connection(), database(db), empty(), null));
82+
RoutingProcedureResponse response =
83+
await(runner.run(connection(), ServerAddress.of("localhost", 7687), database(db), empty(), null));
8184

8285
assertTrue(response.isSuccess());
8386
assertEquals(1, response.records().size());

driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.neo4j.driver.internal.spi.ConnectionPool;
7878
import org.neo4j.driver.internal.util.FakeClock;
7979
import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor;
80+
import org.neo4j.driver.net.ServerAddress;
8081
import org.neo4j.driver.net.ServerAddressResolver;
8182

8283
class RediscoveryTest {
@@ -552,7 +553,11 @@ private static ClusterCompositionProvider compositionProviderMock(
552553
Map<BoltServerAddress, Object> responsesByAddress) {
553554
ClusterCompositionProvider provider = mock(ClusterCompositionProvider.class);
554555
when(provider.getClusterComposition(
555-
any(Connection.class), any(DatabaseName.class), any(InternalBookmark.class), any()))
556+
any(Connection.class),
557+
any(ServerAddress.class),
558+
any(DatabaseName.class),
559+
any(InternalBookmark.class),
560+
any()))
556561
.then(invocation -> {
557562
Connection connection = invocation.getArgument(0);
558563
BoltServerAddress address = connection.serverAddress();

0 commit comments

Comments
 (0)