Skip to content

Commit 7b97abd

Browse files
committed
1 parent 04d2d75 commit 7b97abd

16 files changed

Lines changed: 108 additions & 306 deletions

File tree

core-io/src/main/java/com/couchbase/client/core/transaction/CoreTransactionAttemptContext.java

Lines changed: 67 additions & 111 deletions
Large diffs are not rendered by default.

core-io/src/main/java/com/couchbase/client/core/transaction/CoreTransactionGetResult.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.couchbase.client.core.transaction;
1818

1919
import com.couchbase.client.core.annotation.Stability;
20-
import com.couchbase.client.core.api.kv.CoreExpiry;
2120
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
2221
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
2322
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,7 +32,6 @@
3332
import reactor.util.annotation.Nullable;
3433

3534
import java.io.IOException;
36-
import java.time.Instant;
3735
import java.util.Objects;
3836
import java.util.Optional;
3937

@@ -175,8 +173,7 @@ static CoreTransactionGetResult createFromInsert(CollectionIdentifier collection
175173
String atrBucketName,
176174
String atrScopeName,
177175
String atrCollectionName,
178-
long cas,
179-
Optional<CoreExpiry> expiry) {
176+
long cas) {
180177
TransactionLinks links = new TransactionLinks(
181178
CodecFlags.extractCommonFormatFlags(flagsToStage) == CodecFlags.CommonFlags.JSON.ordinal() ? Optional.of(content) : Optional.empty(),
182179
CodecFlags.extractCommonFormatFlags(flagsToStage) == CodecFlags.CommonFlags.BINARY.ordinal() ? Optional.of(content) : Optional.empty(),
@@ -195,8 +192,7 @@ static CoreTransactionGetResult createFromInsert(CollectionIdentifier collection
195192
Optional.empty(),
196193
// The staged operationId is only used after reading the document so is unimportant here
197194
Optional.empty(),
198-
Optional.of(flagsToStage),
199-
expiry
195+
Optional.of(flagsToStage)
200196
);
201197

202198
return new CoreTransactionGetResult(id,
@@ -231,8 +227,7 @@ public static CoreTransactionGetResult createFrom(CoreTransactionGetResult doc,
231227
doc.links.crc32OfStaging(),
232228
doc.links.forwardCompatibility(),
233229
doc.links.stagedOperationId(),
234-
doc.links.stagedUserFlags(),
235-
doc.links.stagedExpiry()
230+
doc.links().stagedUserFlags()
236231
);
237232

238233
return new CoreTransactionGetResult(doc.id,
@@ -264,7 +259,6 @@ public static CoreTransactionGetResult createFrom(CollectionIdentifier collectio
264259
Optional<String> crc32OfStaging = Optional.empty();
265260
Optional<ForwardCompatibility> forwardCompatibility = Optional.empty();
266261
Optional<Integer> stagedUserFlags = Optional.empty();
267-
Optional<CoreExpiry> stagedExpiry = Optional.empty();
268262

269263
// Read from xattrs.txn.restore
270264
Optional<String> casPreTxn = Optional.empty();
@@ -359,11 +353,6 @@ public static CoreTransactionGetResult createFrom(CollectionIdentifier collectio
359353
if (aux.has("uf")) {
360354
stagedUserFlags = Optional.of(aux.get("uf").intValue());
361355
}
362-
JsonNode docExpiryNode = aux.get("docexpiry");
363-
if (docExpiryNode != null) {
364-
long raw = docExpiryNode.longValue();
365-
stagedExpiry = Optional.of(CoreExpiry.of(Instant.ofEpochSecond(raw)));
366-
}
367356
}
368357

369358
byte[] bodyContent;
@@ -393,8 +382,7 @@ public static CoreTransactionGetResult createFrom(CollectionIdentifier collectio
393382
crc32OfStaging,
394383
forwardCompatibility,
395384
operationId,
396-
stagedUserFlags,
397-
stagedExpiry);
385+
stagedUserFlags);
398386

399387
DocumentMetadata md = new DocumentMetadata(
400388
casFromDocument,

core-io/src/main/java/com/couchbase/client/core/transaction/cleanup/TransactionsCleaner.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import com.couchbase.client.core.transaction.components.DocRecord;
3333
import com.couchbase.client.core.transaction.components.DocumentGetter;
3434
import com.couchbase.client.core.transaction.components.DurabilityLevelUtil;
35-
import com.couchbase.client.core.transaction.components.TransactionLinks;
3635
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
3736
import com.couchbase.client.core.transaction.forwards.ForwardCompatibilityStage;
3837
import com.couchbase.client.core.transaction.forwards.CoreTransactionsSupportedExtensions;
@@ -146,19 +145,18 @@ private Mono<Void> commitDocs(CoreTransactionLogger perEntryLog,
146145
CleanupRequest req,
147146
SpanWrapper pspan) {
148147
return doPerDoc(perEntryLog, attemptId, docs, pspan, true, (collection, doc, lir) -> {
149-
TransactionLinks links = doc.links();
150-
CbPreconditions.check(links != null);
151-
CbPreconditions.check(links.isDocumentInTransaction());
152-
CbPreconditions.check(links.stagedContentJsonOrBinary().isPresent());
148+
CbPreconditions.check(doc.links() != null);
149+
CbPreconditions.check(doc.links().isDocumentInTransaction());
150+
CbPreconditions.check(doc.links().stagedContentJsonOrBinary().isPresent());
153151

154-
byte[] content = links.stagedContentJsonOrBinary().get();
152+
byte[] content = doc.links().stagedContentJsonOrBinary().get();
155153

156154
return hooks.beforeCommitDoc.apply(doc.id()) // Testing hook
157155

158156
.then(Mono.defer(() -> {
159157
if (lir.tombstone()) {
160-
return TransactionKVHandler.insert(core, collection, doc.id(), content, links.stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS), kvDurableTimeout(),
161-
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocsInsert"), pspan, links.stagedExpiry().orElse(null));
158+
return TransactionKVHandler.insert(core, collection, doc.id(), content, doc.links().stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS), kvDurableTimeout(),
159+
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocsInsert"), pspan);
162160
} else {
163161
List<SubdocMutateRequest.Command> commands = Arrays.asList(
164162
new SubdocMutateRequest.Command(SubdocCommandType.DELETE, TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY, null, false, true, false, 0),
@@ -167,8 +165,8 @@ private Mono<Void> commitDocs(CoreTransactionLogger perEntryLog,
167165
);
168166
return TransactionKVHandler.mutateIn(core, collection, doc.id(), kvDurableTimeout(),
169167
false, false, false,
170-
lir.tombstone(), false, doc.cas(), links.stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS),
171-
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), pspan, links.stagedExpiry().orElse(null),
168+
lir.tombstone(), false, doc.cas(), doc.links().stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS),
169+
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), pspan,
172170
commands);
173171
}
174172
}))

core-io/src/main/java/com/couchbase/client/core/transaction/components/TransactionLinks.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.couchbase.client.core.transaction.components;
1818

1919
import com.couchbase.client.core.annotation.Stability;
20-
import com.couchbase.client.core.api.kv.CoreExpiry;
2120
import com.couchbase.client.core.io.CollectionIdentifier;
2221
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
2322

@@ -49,7 +48,6 @@ public class TransactionLinks {
4948
private final Optional<ForwardCompatibility> forwardCompatibility;
5049
private final Optional<String> stagedOperationId;
5150
private final Optional<Integer> stagedUserFlags;
52-
private final Optional<CoreExpiry> stagedExpiry;
5351

5452
/**
5553
* This is not part of the transactional metadata. It's here for legacy reasons and could be refactoring into
@@ -74,8 +72,7 @@ public TransactionLinks(
7472
Optional<String> crc32OfStaging,
7573
Optional<ForwardCompatibility> forwardCompatibility,
7674
Optional<String> stagedOperationId,
77-
Optional<Integer> stagedUserFlags,
78-
Optional<CoreExpiry> stagedExpiry) {
75+
Optional<Integer> stagedUserFlags) {
7976
this.stagedContentJson = Objects.requireNonNull(stagedContentJson);
8077
this.stagedContentBinary = Objects.requireNonNull(stagedContentBinary);
8178
this.atrId = Objects.requireNonNull(atrId);
@@ -93,7 +90,6 @@ public TransactionLinks(
9390
this.forwardCompatibility = Objects.requireNonNull(forwardCompatibility);
9491
this.stagedOperationId = Objects.requireNonNull(stagedOperationId);
9592
this.stagedUserFlags = stagedUserFlags;
96-
this.stagedExpiry = Objects.requireNonNull(stagedExpiry);
9793
}
9894

9995
/**
@@ -196,10 +192,6 @@ public Optional<Integer> stagedUserFlags() {
196192
return stagedUserFlags;
197193
}
198194

199-
public Optional<CoreExpiry> stagedExpiry() {
200-
return stagedExpiry;
201-
}
202-
203195
@Override
204196
public String toString() {
205197
final StringBuilder sb = new StringBuilder("TransactionLinks{");
@@ -225,8 +217,6 @@ public String toString() {
225217
sb.append(revidPreTxn.orElse("none"));
226218
sb.append(',');
227219
sb.append(exptimePreTxn.orElse(-1L));
228-
sb.append(',');
229-
sb.append(stagedExpiry.map(CoreExpiry::toString).orElse("-"));
230220
sb.append("}}");
231221
return sb.toString();
232222
}

core-io/src/main/java/com/couchbase/client/core/transaction/forwards/CoreTransactionsExtension.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public enum CoreTransactionsExtension {
5555
EXT_PARALLEL_UNSTAGING("PU"),
5656
EXT_REPLICA_FROM_PREFERRED_GROUP("RP"),
5757
EXT_GET_MULTI("GM"),
58-
EXT_TTL("TTL"),
5958
;
6059

6160
private final String value;

core-io/src/main/java/com/couchbase/client/core/transaction/support/StagedMutation.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.couchbase.client.core.transaction.support;
1717

1818
import com.couchbase.client.core.annotation.Stability;
19-
import com.couchbase.client.core.api.kv.CoreExpiry;
2019
import com.couchbase.client.core.io.CollectionIdentifier;
2120
import com.couchbase.client.core.msg.kv.CodecFlags;
2221
import com.couchbase.client.core.transaction.components.DocumentMetadata;
@@ -40,7 +39,6 @@ public class StagedMutation {
4039
// What the document's user flags will be set to post-transaction.
4140
public final int stagedUserFlags;
4241
public final StagedMutationType type;
43-
public final @Nullable CoreExpiry expiry;
4442

4543
public StagedMutation(String operationId,
4644
String id,
@@ -51,8 +49,7 @@ public StagedMutation(String operationId,
5149
int currentUserFlags,
5250
byte[] content,
5351
int stagedUserFlags,
54-
StagedMutationType type,
55-
@Nullable CoreExpiry expiry) {
52+
StagedMutationType type) {
5653
this.operationId = operationId;
5754
this.id = id;
5855
this.collection = collection;
@@ -63,7 +60,6 @@ public StagedMutation(String operationId,
6360
this.content = content;
6461
this.stagedUserFlags = stagedUserFlags;
6562
this.type = type;
66-
this.expiry = expiry;
6763
}
6864

6965
@Override

core-io/src/main/java/com/couchbase/client/core/transaction/util/TransactionKVHandler.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919
import com.couchbase.client.core.CoreKeyspace;
2020
import com.couchbase.client.core.Reactor;
2121
import com.couchbase.client.core.annotation.Stability;
22-
import com.couchbase.client.core.api.kv.CoreExpiry;
2322
import com.couchbase.client.core.api.kv.CoreReadPreference;
2423
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
25-
import com.couchbase.client.core.classic.ClassicExpiryHelper;
2624
import com.couchbase.client.core.cnc.TracingIdentifiers;
27-
import com.couchbase.client.core.config.BucketCapabilities;
2825
import com.couchbase.client.core.config.BucketConfig;
2926
import com.couchbase.client.core.error.DocumentNotFoundException;
3027
import com.couchbase.client.core.error.DocumentUnretrievableException;
@@ -75,15 +72,14 @@ public static Mono<InsertResponse> insert(final Core core,
7572
final Duration timeout,
7673
final Optional<DurabilityLevel> durabilityLevel,
7774
final Map<String, Object> clientContext,
78-
final SpanWrapper pspan,
79-
final @Nullable CoreExpiry expiry) {
75+
final SpanWrapper pspan) {
8076
return Mono.defer(() -> {
8177
long start = System.nanoTime();
8278
SpanWrapper span = SpanWrapperUtil.createOp(null, core.context().coreResources().requestTracerAndDecorator(), collectionIdentifier, id, TracingIdentifiers.SPAN_REQUEST_KV_INSERT, pspan);
8379

8480
InsertRequest request = new InsertRequest(id,
8581
transcodedContent,
86-
expiry == null ? 0 : ClassicExpiryHelper.encode(expiry),
82+
0,
8783
flags,
8884
timeout,
8985
core.context(),
@@ -245,8 +241,8 @@ public static Mono<SubdocMutateResponse> mutateIn(final Core core,
245241
durabilityLevel,
246242
clientContext,
247243
span,
248-
null,
249-
commands);
244+
commands,
245+
null);
250246
}
251247

252248
public static Mono<SubdocMutateResponse> mutateIn(final Core core,
@@ -263,20 +259,23 @@ public static Mono<SubdocMutateResponse> mutateIn(final Core core,
263259
final Optional<DurabilityLevel> durabilityLevel,
264260
final Map<String, Object> clientContext,
265261
final SpanWrapper pspan,
266-
@Nullable CoreExpiry expiry,
267-
final List<SubdocMutateRequest.Command> commands) {
262+
final List<SubdocMutateRequest.Command> commands,
263+
CoreTransactionLogger logger) {
268264
return Mono.defer(() -> {
269265
SpanWrapper span = SpanWrapperUtil.createOp(null, core.context().coreResources().requestTracerAndDecorator(), collectionIdentifier, id, TracingIdentifiers.SPAN_REQUEST_KV_MUTATE_IN, pspan);
270266
long start = System.nanoTime();
271-
CompletableFuture<BucketConfig> bucketConfigFuture = BucketConfigUtil.waitForBucketConfig(core, collectionIdentifier.bucket(), timeout).toFuture();
272267

273-
CompletableFuture<SubdocMutateResponse> future = bucketConfigFuture.thenCompose(bucketConfig -> {
274-
// Preserve expiry is only supported on 7.0+; use presence of collections capability as the indicator.
275-
boolean supportsPreserveExpiry = bucketConfig.bucketCapabilities().contains(BucketCapabilities.COLLECTIONS);
276-
// Sub-doc does not allow sending both preserveExpiry and an expiry, at least with StoreSemantics.Replace.
277-
// Also cannot send preserveExpiry with StoreSemantics.Insert.
278-
boolean sendPreserveExpiry = supportsPreserveExpiry && expiry == null && !insertDocument;
268+
final boolean requiresBucketConfig = createAsDeleted || reviveDocument;
269+
CompletableFuture<BucketConfig> bucketConfigFuture;
279270

271+
if (requiresBucketConfig) {
272+
bucketConfigFuture = BucketConfigUtil.waitForBucketConfig(core, collectionIdentifier.bucket(), timeout).toFuture();
273+
} else {
274+
// Nothing will be using the bucket config so just provide null
275+
bucketConfigFuture = CompletableFuture.completedFuture(null);
276+
}
277+
278+
CompletableFuture<SubdocMutateResponse> future = bucketConfigFuture.thenCompose(bucketConfig -> {
280279
SubdocMutateRequest request = new SubdocMutateRequest(timeout,
281280
core.context(),
282281
collectionIdentifier,
@@ -289,8 +288,8 @@ public static Mono<SubdocMutateResponse> mutateIn(final Core core,
289288
accessDeleted,
290289
createAsDeleted,
291290
commands,
292-
expiry == null ? 0 : ClassicExpiryHelper.encode(expiry),
293-
sendPreserveExpiry,
291+
0,
292+
false, // Preserve expiry only supported on 7.0+
294293
cas,
295294
userFlags,
296295
durabilityLevel,

java-client/src/main/java/com/couchbase/client/java/transactions/ReactiveTransactionAttemptContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public Mono<TransactionGetResult> insert(ReactiveCollection collection, String i
220220
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
221221

222222
return reactor.publishOnUserScheduler(
223-
internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
223+
internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), new SpanWrapper(span))
224224
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
225225
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
226226
.doOnTerminate(() -> span.end())
@@ -257,7 +257,7 @@ public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object conte
257257
RequestSpan span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_REPLACE, internal.span());
258258
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
259259
return reactor.publishOnUserScheduler(
260-
internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
260+
internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), new SpanWrapper(span))
261261
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
262262
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
263263
.doOnTerminate(() -> span.end())

java-client/src/main/java/com/couchbase/client/java/transactions/TransactionAttemptContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public TransactionGetResult replace(TransactionGetResult doc, Object content, Tr
253253
TransactionReplaceOptions.Built built = options.build();
254254
RequestSpan span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_REPLACE, internal.span());
255255
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
256-
return internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
256+
return internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), new SpanWrapper(span))
257257
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
258258
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
259259
.doOnTerminate(() -> span.end())
@@ -302,7 +302,7 @@ public TransactionGetResult insert(Collection collection, String id, Object cont
302302
TransactionInsertOptions.Built built = options.build();
303303
RequestSpan span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_INSERT, internal.span());
304304
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
305-
return internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
305+
return internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), new SpanWrapper(span))
306306
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
307307
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
308308
.doOnTerminate(() -> span.end())

0 commit comments

Comments
 (0)