Currently LWT routing relay on Metadata.getReplicasList:
|
} else if (statement.isLWT()) { |
|
this.queryPlan = |
|
new QueryPlan( |
|
getReplicas( |
|
manager.poolsState.keyspace, |
|
statement, |
|
manager |
|
.loadBalancingPolicy() |
|
.newQueryPlan(manager.poolsState.keyspace, statement))); |
|
private Iterator<Host> getReplicas( |
|
String loggedKeyspace, Statement statement, Iterator<Host> fallback) { |
|
ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion(); |
|
CodecRegistry codecRegistry = manager.cluster.manager.configuration.getCodecRegistry(); |
|
ByteBuffer partitionKey = statement.getRoutingKey(protocolVersion, codecRegistry); |
|
String keyspace = statement.getKeyspace(); |
|
if (keyspace == null) { |
|
keyspace = loggedKeyspace; |
|
} |
|
|
|
if (partitionKey == null || keyspace == null) { |
|
return fallback; |
|
} |
|
|
|
Token.Factory partitioner = statement.getPartitioner(); |
|
String tableName = null; |
|
ColumnDefinitions defs = null; |
|
if (statement instanceof BoundStatement) { |
|
defs = ((BoundStatement) statement).preparedStatement().getVariables(); |
|
} else if (statement instanceof PreparedStatement) { |
|
defs = ((PreparedStatement) statement).getVariables(); |
|
} |
|
if (defs != null && defs.size() > 0) { |
|
tableName = defs.getTable(0); |
|
} |
|
|
|
final List<Host> replicas = |
|
manager |
|
.cluster |
|
.getMetadata() |
|
.getReplicasList(Metadata.quote(keyspace), tableName, partitioner, partitionKey); |
|
|
|
// replicas are stored in the right order starting with the primary replica |
|
return replicas.iterator(); |
|
} |
|
public List<Host> getReplicasList( |
|
String keyspace, String table, Token.Factory partitioner, ByteBuffer partitionKey) { |
|
keyspace = handleId(keyspace); |
|
table = handleId(table); |
|
TokenMap current = tokenMap; |
|
if (partitioner == null && current != null) { |
|
partitioner = current.factory; |
|
} |
|
if (partitioner == null) { |
|
return EMPTY_LIST; |
|
} |
|
Token token = partitioner.hash(partitionKey); |
|
|
|
// Tablets: |
|
KeyspaceMetadata ksMetadata = getKeyspace(keyspace); |
|
if (ksMetadata != null && ksMetadata.usesTablets()) { |
|
if (keyspace != null && table != null) { |
|
assert (token instanceof Token.TokenLong64); |
|
return tabletMap.getReplicas(keyspace, table, (long) token.getValue()); |
|
} else { |
|
return EMPTY_LIST; |
|
} |
|
} |
|
|
|
// TokenMap: |
|
if (current == null) return EMPTY_LIST; |
|
return current.getReplicas(keyspace, token); |
|
} |
Which does not respect rack or DC prioritization, we need to fix it to priorities nodes same way it is done for regular queries.
Best solution would be to move all this logic into LoadBalancing policy.
Migrated to Jira: DRIVER-147
Currently LWT routing relay on
Metadata.getReplicasList:java-driver/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
Lines 146 to 154 in 6a8678f
java-driver/driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java
Lines 98 to 132 in 6a8678f
java-driver/driver-core/src/main/java/com/datastax/driver/core/Metadata.java
Lines 591 to 618 in 6a8678f
Which does not respect rack or DC prioritization, we need to fix it to priorities nodes same way it is done for regular queries.
Best solution would be to move all this logic into LoadBalancing policy.
Migrated to Jira: DRIVER-147