Skip to content

Commit bc00e21

Browse files
authored
---
yaml --- r: 35327 b: refs/heads/pubsub-ordering-keys c: 7d47fce h: refs/heads/master i: 35325: 6cb6ea6 35323: 2dfa93a 35319: 82381d5 35311: 091b814 35295: c530395 35263: f921105 35199: 6d5004e 35071: 4e8f339 34815: adcded2
1 parent 938a79d commit bc00e21

19 files changed

Lines changed: 2744 additions & 154 deletions

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ refs/tags/v0.72.0: a7703f2593ba312c0b2dde6fdfd4f5c764bb55ac
155155
refs/tags/v0.73.0: 21241ea8be9439cc5764c4944cdce21d34ce4f9e
156156
refs/tags/v0.74.0: 9d1f733dbbf790de7b494418523b69c4a9a57638
157157
refs/heads/ignoretest: 23c412ae07af3d0ab1caa2d44d5bc5c0ccb8b31d
158-
refs/heads/pubsub-ordering-keys: d8a3ff89047b1e0abe2c7d2f29e136cb2bc43541
158+
refs/heads/pubsub-ordering-keys: 7d47fce81df96e1a0e0270cfe264fcecbfb0bb22
159159
refs/tags/v0.75.0: c3673089ae09a897c1b4cf7dfe167fe4f8ab32fb
160160
refs/tags/v0.76.0: 395b016826d3ddf9cb8b34919636df15a4dbd032
161161
refs/tags/v0.77.0: 28a85a77883ccf5d48f297fd0ef3b3dca6ce01f0

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-spanner/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
</systemPropertyVariables>
6262
<groups>com.google.cloud.spanner.IntegrationTest</groups>
6363
<excludedGroups>com.google.cloud.spanner.FlakyTest</excludedGroups>
64+
<forkedProcessTimeoutInSeconds>2400</forkedProcessTimeoutInSeconds>
6465
</configuration>
6566
<dependencies>
6667
<dependency>

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class BatchClientImpl implements BatchClient {
4646

4747
@Override
4848
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
49-
SessionImpl session = (SessionImpl) spanner.createSession(db);
49+
SessionImpl session = spanner.createSession(db);
5050
return new BatchReadOnlyTransactionImpl(spanner, session, checkNotNull(bound));
5151
}
5252

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package com.google.cloud.spanner;
1818

1919
import com.google.cloud.Timestamp;
20+
import com.google.cloud.spanner.SessionPool.PooledSession;
21+
import com.google.common.annotations.VisibleForTesting;
22+
import com.google.common.base.Function;
2023
import com.google.common.util.concurrent.ListenableFuture;
2124
import io.opencensus.common.Scope;
2225
import io.opencensus.trace.Span;
@@ -33,17 +36,33 @@ class DatabaseClientImpl implements DatabaseClient {
3336
TraceUtil.exportSpans(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION, PARTITION_DML_TRANSACTION);
3437
}
3538

36-
private final SessionPool pool;
39+
@VisibleForTesting final SessionPool pool;
3740

3841
DatabaseClientImpl(SessionPool pool) {
3942
this.pool = pool;
4043
}
4144

45+
@VisibleForTesting
46+
PooledSession getReadSession() {
47+
return pool.getReadSession();
48+
}
49+
50+
@VisibleForTesting
51+
PooledSession getReadWriteSession() {
52+
return pool.getReadWriteSession();
53+
}
54+
4255
@Override
43-
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
56+
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
4457
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
4558
try (Scope s = tracer.withSpan(span)) {
46-
return pool.getReadWriteSession().write(mutations);
59+
return runWithSessionRetry(
60+
new Function<Session, Timestamp>() {
61+
@Override
62+
public Timestamp apply(Session session) {
63+
return session.write(mutations);
64+
}
65+
});
4766
} catch (RuntimeException e) {
4867
TraceUtil.endSpanWithFailure(span, e);
4968
throw e;
@@ -53,10 +72,16 @@ public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
5372
}
5473

5574
@Override
56-
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
75+
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
5776
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
5877
try (Scope s = tracer.withSpan(span)) {
59-
return pool.getReadWriteSession().writeAtLeastOnce(mutations);
78+
return runWithSessionRetry(
79+
new Function<Session, Timestamp>() {
80+
@Override
81+
public Timestamp apply(Session session) {
82+
return session.writeAtLeastOnce(mutations);
83+
}
84+
});
6085
} catch (RuntimeException e) {
6186
TraceUtil.endSpanWithFailure(span, e);
6287
throw e;
@@ -69,7 +94,7 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
6994
public ReadContext singleUse() {
7095
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
7196
try (Scope s = tracer.withSpan(span)) {
72-
return pool.getReadSession().singleUse();
97+
return getReadSession().singleUse();
7398
} catch (RuntimeException e) {
7499
TraceUtil.endSpanWithFailure(span, e);
75100
throw e;
@@ -80,7 +105,7 @@ public ReadContext singleUse() {
80105
public ReadContext singleUse(TimestampBound bound) {
81106
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
82107
try (Scope s = tracer.withSpan(span)) {
83-
return pool.getReadSession().singleUse(bound);
108+
return getReadSession().singleUse(bound);
84109
} catch (RuntimeException e) {
85110
TraceUtil.endSpanWithFailure(span, e);
86111
throw e;
@@ -91,7 +116,7 @@ public ReadContext singleUse(TimestampBound bound) {
91116
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
92117
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
93118
try (Scope s = tracer.withSpan(span)) {
94-
return pool.getReadSession().singleUseReadOnlyTransaction();
119+
return getReadSession().singleUseReadOnlyTransaction();
95120
} catch (RuntimeException e) {
96121
TraceUtil.endSpanWithFailure(span, e);
97122
throw e;
@@ -102,7 +127,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
102127
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
103128
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
104129
try (Scope s = tracer.withSpan(span)) {
105-
return pool.getReadSession().singleUseReadOnlyTransaction(bound);
130+
return getReadSession().singleUseReadOnlyTransaction(bound);
106131
} catch (RuntimeException e) {
107132
TraceUtil.endSpanWithFailure(span, e);
108133
throw e;
@@ -113,7 +138,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
113138
public ReadOnlyTransaction readOnlyTransaction() {
114139
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
115140
try (Scope s = tracer.withSpan(span)) {
116-
return pool.getReadSession().readOnlyTransaction();
141+
return getReadSession().readOnlyTransaction();
117142
} catch (RuntimeException e) {
118143
TraceUtil.endSpanWithFailure(span, e);
119144
throw e;
@@ -124,7 +149,7 @@ public ReadOnlyTransaction readOnlyTransaction() {
124149
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
125150
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
126151
try (Scope s = tracer.withSpan(span)) {
127-
return pool.getReadSession().readOnlyTransaction(bound);
152+
return getReadSession().readOnlyTransaction(bound);
128153
} catch (RuntimeException e) {
129154
TraceUtil.endSpanWithFailure(span, e);
130155
throw e;
@@ -135,7 +160,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
135160
public TransactionRunner readWriteTransaction() {
136161
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
137162
try (Scope s = tracer.withSpan(span)) {
138-
return pool.getReadWriteSession().readWriteTransaction();
163+
return getReadWriteSession().readWriteTransaction();
139164
} catch (RuntimeException e) {
140165
TraceUtil.endSpanWithFailure(span, e);
141166
throw e;
@@ -146,24 +171,41 @@ public TransactionRunner readWriteTransaction() {
146171
public TransactionManager transactionManager() {
147172
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
148173
try (Scope s = tracer.withSpan(span)) {
149-
return pool.getReadWriteSession().transactionManager();
174+
return getReadWriteSession().transactionManager();
150175
} catch (RuntimeException e) {
151176
TraceUtil.endSpanWithFailure(span, e);
152177
throw e;
153178
}
154179
}
155180

156181
@Override
157-
public long executePartitionedUpdate(Statement stmt) {
182+
public long executePartitionedUpdate(final Statement stmt) {
158183
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
159184
try (Scope s = tracer.withSpan(span)) {
160-
return pool.getReadWriteSession().executePartitionedUpdate(stmt);
185+
return runWithSessionRetry(
186+
new Function<Session, Long>() {
187+
@Override
188+
public Long apply(Session session) {
189+
return session.executePartitionedUpdate(stmt);
190+
}
191+
});
161192
} catch (RuntimeException e) {
162193
TraceUtil.endSpanWithFailure(span, e);
163194
throw e;
164195
}
165196
}
166197

198+
private <T> T runWithSessionRetry(Function<Session, T> callable) {
199+
PooledSession session = getReadWriteSession();
200+
while (true) {
201+
try {
202+
return callable.apply(session);
203+
} catch (SessionNotFoundException e) {
204+
session = pool.replaceReadWriteSession(e, session);
205+
}
206+
}
207+
}
208+
167209
ListenableFuture<Void> closeAsync() {
168210
return pool.closeAsync();
169211
}

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,26 @@
2222
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
2323
public class ForwardingResultSet extends ForwardingStructReader implements ResultSet {
2424

25-
private final ResultSet delegate;
25+
private ResultSet delegate;
2626

2727
public ForwardingResultSet(ResultSet delegate) {
2828
super(delegate);
2929
this.delegate = Preconditions.checkNotNull(delegate);
3030
}
3131

32+
/**
33+
* Replaces the underlying {@link ResultSet}. It is the responsibility of the caller to ensure
34+
* that the new delegate has the same properties and is in the same state as the original
35+
* delegate. This method can be used if the underlying delegate needs to be replaced after a
36+
* session or transaction needed to be restarted after the {@link ResultSet} had already been
37+
* returned to the user.
38+
*/
39+
void replaceDelegate(ResultSet newDelegate) {
40+
Preconditions.checkNotNull(newDelegate);
41+
super.replaceDelegate(newDelegate);
42+
this.delegate = newDelegate;
43+
}
44+
3245
@Override
3346
public boolean next() throws SpannerException {
3447
return delegate.next();

branches/pubsub-ordering-keys/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingStructReader.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,23 @@
2525
/** Forwarding implements of StructReader */
2626
public class ForwardingStructReader implements StructReader {
2727

28-
private final StructReader delegate;
28+
private StructReader delegate;
2929

3030
public ForwardingStructReader(StructReader delegate) {
3131
this.delegate = Preconditions.checkNotNull(delegate);
3232
}
3333

34+
/**
35+
* Replaces the underlying {@link StructReader}. It is the responsibility of the caller to ensure
36+
* that the new delegate has the same properties and is in the same state as the original
37+
* delegate. This method can be used if the underlying delegate needs to be replaced after a
38+
* session or transaction needed to be restarted after the {@link StructReader} had already been
39+
* returned to the user.
40+
*/
41+
void replaceDelegate(StructReader newDelegate) {
42+
this.delegate = Preconditions.checkNotNull(newDelegate);
43+
}
44+
3445
@Override
3546
public Type getType() {
3647
return delegate.getType();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import javax.annotation.Nullable;
20+
21+
/**
22+
* Exception thrown by Cloud Spanner when an operation detects that the session that is being used
23+
* is no longer valid. This type of error has its own subclass as it is a condition that should
24+
* normally be hidden from the user, and the client library should try to fix this internally.
25+
*/
26+
public class SessionNotFoundException extends SpannerException {
27+
private static final long serialVersionUID = -6395746612598975751L;
28+
29+
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
30+
SessionNotFoundException(
31+
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
32+
super(token, ErrorCode.NOT_FOUND, false, message, cause);
33+
}
34+
}

0 commit comments

Comments
 (0)