Skip to content

Commit ec11d61

Browse files
committed
JCBC-1739: Support expiry TTL inside transactions
Adds support for expiry/TTL to transactions for KV replace and inserts, along with FIT testing support. See the transaction spec for the details. Change-Id: I9d770066bc2c461b1700f95b280bf4468e30bd9e Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/236361 Tested-by: Graham Pople <[email protected]> Reviewed-by: Graham Pople <[email protected]>
1 parent 8f84109 commit ec11d61

16 files changed

Lines changed: 306 additions & 108 deletions

File tree

core-io/src/main/java/com/couchbase/client/core/transaction/CoreTransactionAttemptContext.java

Lines changed: 111 additions & 67 deletions
Large diffs are not rendered by default.

core-io/src/main/java/com/couchbase/client/core/transaction/CoreTransactionGetResult.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.couchbase.client.core.transaction;
1818

1919
import com.couchbase.client.core.annotation.Stability;
20+
import com.couchbase.client.core.api.kv.CoreExpiry;
2021
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
2122
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
2223
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.ObjectMapper;
@@ -32,6 +33,7 @@
3233
import reactor.util.annotation.Nullable;
3334

3435
import java.io.IOException;
36+
import java.time.Instant;
3537
import java.util.Objects;
3638
import java.util.Optional;
3739

@@ -173,7 +175,8 @@ static CoreTransactionGetResult createFromInsert(CollectionIdentifier collection
173175
String atrBucketName,
174176
String atrScopeName,
175177
String atrCollectionName,
176-
long cas) {
178+
long cas,
179+
Optional<CoreExpiry> expiry) {
177180
TransactionLinks links = new TransactionLinks(
178181
CodecFlags.extractCommonFormatFlags(flagsToStage) == CodecFlags.CommonFlags.JSON.ordinal() ? Optional.of(content) : Optional.empty(),
179182
CodecFlags.extractCommonFormatFlags(flagsToStage) == CodecFlags.CommonFlags.BINARY.ordinal() ? Optional.of(content) : Optional.empty(),
@@ -192,7 +195,8 @@ static CoreTransactionGetResult createFromInsert(CollectionIdentifier collection
192195
Optional.empty(),
193196
// The staged operationId is only used after reading the document so is unimportant here
194197
Optional.empty(),
195-
Optional.of(flagsToStage)
198+
Optional.of(flagsToStage),
199+
expiry
196200
);
197201

198202
return new CoreTransactionGetResult(id,
@@ -227,7 +231,8 @@ public static CoreTransactionGetResult createFrom(CoreTransactionGetResult doc,
227231
doc.links.crc32OfStaging(),
228232
doc.links.forwardCompatibility(),
229233
doc.links.stagedOperationId(),
230-
doc.links().stagedUserFlags()
234+
doc.links.stagedUserFlags(),
235+
doc.links.stagedExpiry()
231236
);
232237

233238
return new CoreTransactionGetResult(doc.id,
@@ -259,6 +264,7 @@ public static CoreTransactionGetResult createFrom(CollectionIdentifier collectio
259264
Optional<String> crc32OfStaging = Optional.empty();
260265
Optional<ForwardCompatibility> forwardCompatibility = Optional.empty();
261266
Optional<Integer> stagedUserFlags = Optional.empty();
267+
Optional<CoreExpiry> stagedExpiry = Optional.empty();
262268

263269
// Read from xattrs.txn.restore
264270
Optional<String> casPreTxn = Optional.empty();
@@ -353,6 +359,11 @@ public static CoreTransactionGetResult createFrom(CollectionIdentifier collectio
353359
if (aux.has("uf")) {
354360
stagedUserFlags = Optional.of(aux.get("uf").intValue());
355361
}
362+
JsonNode docExpiryNode = aux.get("docexpiry");
363+
if (docExpiryNode != null) {
364+
long raw = docExpiryNode.longValue();
365+
stagedExpiry = Optional.of(CoreExpiry.of(Instant.ofEpochSecond(raw)));
366+
}
356367
}
357368

358369
byte[] bodyContent;
@@ -382,7 +393,8 @@ public static CoreTransactionGetResult createFrom(CollectionIdentifier collectio
382393
crc32OfStaging,
383394
forwardCompatibility,
384395
operationId,
385-
stagedUserFlags);
396+
stagedUserFlags,
397+
stagedExpiry);
386398

387399
DocumentMetadata md = new DocumentMetadata(
388400
casFromDocument,

core-io/src/main/java/com/couchbase/client/core/transaction/cleanup/TransactionsCleaner.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.couchbase.client.core.transaction.components.DocRecord;
3333
import com.couchbase.client.core.transaction.components.DocumentGetter;
3434
import com.couchbase.client.core.transaction.components.DurabilityLevelUtil;
35+
import com.couchbase.client.core.transaction.components.TransactionLinks;
3536
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
3637
import com.couchbase.client.core.transaction.forwards.ForwardCompatibilityStage;
3738
import com.couchbase.client.core.transaction.forwards.CoreTransactionsSupportedExtensions;
@@ -145,18 +146,19 @@ private Mono<Void> commitDocs(CoreTransactionLogger perEntryLog,
145146
CleanupRequest req,
146147
SpanWrapper pspan) {
147148
return doPerDoc(perEntryLog, attemptId, docs, pspan, true, (collection, doc, lir) -> {
148-
CbPreconditions.check(doc.links() != null);
149-
CbPreconditions.check(doc.links().isDocumentInTransaction());
150-
CbPreconditions.check(doc.links().stagedContentJsonOrBinary().isPresent());
149+
TransactionLinks links = doc.links();
150+
CbPreconditions.check(links != null);
151+
CbPreconditions.check(links.isDocumentInTransaction());
152+
CbPreconditions.check(links.stagedContentJsonOrBinary().isPresent());
151153

152-
byte[] content = doc.links().stagedContentJsonOrBinary().get();
154+
byte[] content = links.stagedContentJsonOrBinary().get();
153155

154156
return hooks.beforeCommitDoc.apply(doc.id()) // Testing hook
155157

156158
.then(Mono.defer(() -> {
157159
if (lir.tombstone()) {
158-
return TransactionKVHandler.insert(core, collection, doc.id(), content, doc.links().stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS), kvDurableTimeout(),
159-
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocsInsert"), pspan);
160+
return TransactionKVHandler.insert(core, collection, doc.id(), content, links.stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS), kvDurableTimeout(),
161+
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocsInsert"), pspan, links.stagedExpiry().orElse(null));
160162
} else {
161163
List<SubdocMutateRequest.Command> commands = Arrays.asList(
162164
new SubdocMutateRequest.Command(SubdocCommandType.DELETE, TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY, null, false, true, false, 0),
@@ -165,8 +167,8 @@ private Mono<Void> commitDocs(CoreTransactionLogger perEntryLog,
165167
);
166168
return TransactionKVHandler.mutateIn(core, collection, doc.id(), kvDurableTimeout(),
167169
false, false, false,
168-
lir.tombstone(), false, doc.cas(), doc.links().stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS),
169-
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), pspan,
170+
lir.tombstone(), false, doc.cas(), links.stagedUserFlags().orElse(CodecFlags.JSON_COMMON_FLAGS),
171+
req.durabilityLevel(), OptionsUtil.createClientContext("Cleaner::commitDocs"), pspan, links.stagedExpiry().orElse(null),
170172
commands);
171173
}
172174
}))

core-io/src/main/java/com/couchbase/client/core/transaction/components/TransactionLinks.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.couchbase.client.core.transaction.components;
1818

1919
import com.couchbase.client.core.annotation.Stability;
20+
import com.couchbase.client.core.api.kv.CoreExpiry;
2021
import com.couchbase.client.core.io.CollectionIdentifier;
2122
import com.couchbase.client.core.transaction.forwards.ForwardCompatibility;
2223

@@ -48,6 +49,7 @@ public class TransactionLinks {
4849
private final Optional<ForwardCompatibility> forwardCompatibility;
4950
private final Optional<String> stagedOperationId;
5051
private final Optional<Integer> stagedUserFlags;
52+
private final Optional<CoreExpiry> stagedExpiry;
5153

5254
/**
5355
* This is not part of the transactional metadata. It's here for legacy reasons and could be refactoring into
@@ -72,7 +74,8 @@ public TransactionLinks(
7274
Optional<String> crc32OfStaging,
7375
Optional<ForwardCompatibility> forwardCompatibility,
7476
Optional<String> stagedOperationId,
75-
Optional<Integer> stagedUserFlags) {
77+
Optional<Integer> stagedUserFlags,
78+
Optional<CoreExpiry> stagedExpiry) {
7679
this.stagedContentJson = Objects.requireNonNull(stagedContentJson);
7780
this.stagedContentBinary = Objects.requireNonNull(stagedContentBinary);
7881
this.atrId = Objects.requireNonNull(atrId);
@@ -90,6 +93,7 @@ public TransactionLinks(
9093
this.forwardCompatibility = Objects.requireNonNull(forwardCompatibility);
9194
this.stagedOperationId = Objects.requireNonNull(stagedOperationId);
9295
this.stagedUserFlags = stagedUserFlags;
96+
this.stagedExpiry = Objects.requireNonNull(stagedExpiry);
9397
}
9498

9599
/**
@@ -192,6 +196,10 @@ public Optional<Integer> stagedUserFlags() {
192196
return stagedUserFlags;
193197
}
194198

199+
public Optional<CoreExpiry> stagedExpiry() {
200+
return stagedExpiry;
201+
}
202+
195203
@Override
196204
public String toString() {
197205
final StringBuilder sb = new StringBuilder("TransactionLinks{");
@@ -217,6 +225,8 @@ public String toString() {
217225
sb.append(revidPreTxn.orElse("none"));
218226
sb.append(',');
219227
sb.append(exptimePreTxn.orElse(-1L));
228+
sb.append(',');
229+
sb.append(stagedExpiry.map(CoreExpiry::toString).orElse("-"));
220230
sb.append("}}");
221231
return sb.toString();
222232
}

core-io/src/main/java/com/couchbase/client/core/transaction/forwards/CoreTransactionsExtension.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public enum CoreTransactionsExtension {
5555
EXT_PARALLEL_UNSTAGING("PU"),
5656
EXT_REPLICA_FROM_PREFERRED_GROUP("RP"),
5757
EXT_GET_MULTI("GM"),
58+
EXT_TTL("TTL"),
5859
;
5960

6061
private final String value;

core-io/src/main/java/com/couchbase/client/core/transaction/support/StagedMutation.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.couchbase.client.core.transaction.support;
1717

1818
import com.couchbase.client.core.annotation.Stability;
19+
import com.couchbase.client.core.api.kv.CoreExpiry;
1920
import com.couchbase.client.core.io.CollectionIdentifier;
2021
import com.couchbase.client.core.msg.kv.CodecFlags;
2122
import com.couchbase.client.core.transaction.components.DocumentMetadata;
@@ -39,6 +40,7 @@ public class StagedMutation {
3940
// What the document's user flags will be set to post-transaction.
4041
public final int stagedUserFlags;
4142
public final StagedMutationType type;
43+
public final @Nullable CoreExpiry expiry;
4244

4345
public StagedMutation(String operationId,
4446
String id,
@@ -49,7 +51,8 @@ public StagedMutation(String operationId,
4951
int currentUserFlags,
5052
byte[] content,
5153
int stagedUserFlags,
52-
StagedMutationType type) {
54+
StagedMutationType type,
55+
@Nullable CoreExpiry expiry) {
5356
this.operationId = operationId;
5457
this.id = id;
5558
this.collection = collection;
@@ -60,6 +63,7 @@ public StagedMutation(String operationId,
6063
this.content = content;
6164
this.stagedUserFlags = stagedUserFlags;
6265
this.type = type;
66+
this.expiry = expiry;
6367
}
6468

6569
@Override

core-io/src/main/java/com/couchbase/client/core/transaction/util/TransactionKVHandler.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import com.couchbase.client.core.CoreKeyspace;
2020
import com.couchbase.client.core.Reactor;
2121
import com.couchbase.client.core.annotation.Stability;
22+
import com.couchbase.client.core.api.kv.CoreExpiry;
2223
import com.couchbase.client.core.api.kv.CoreReadPreference;
2324
import com.couchbase.client.core.api.kv.CoreSubdocGetResult;
25+
import com.couchbase.client.core.classic.ClassicExpiryHelper;
2426
import com.couchbase.client.core.cnc.TracingIdentifiers;
27+
import com.couchbase.client.core.config.BucketCapabilities;
2528
import com.couchbase.client.core.config.BucketConfig;
2629
import com.couchbase.client.core.error.DocumentNotFoundException;
2730
import com.couchbase.client.core.error.DocumentUnretrievableException;
@@ -72,14 +75,15 @@ public static Mono<InsertResponse> insert(final Core core,
7275
final Duration timeout,
7376
final Optional<DurabilityLevel> durabilityLevel,
7477
final Map<String, Object> clientContext,
75-
final SpanWrapper pspan) {
78+
final SpanWrapper pspan,
79+
final @Nullable CoreExpiry expiry) {
7680
return Mono.defer(() -> {
7781
long start = System.nanoTime();
7882
SpanWrapper span = SpanWrapperUtil.createOp(null, core.context().coreResources().requestTracerAndDecorator(), collectionIdentifier, id, TracingIdentifiers.SPAN_REQUEST_KV_INSERT, pspan);
7983

8084
InsertRequest request = new InsertRequest(id,
8185
transcodedContent,
82-
0,
86+
expiry == null ? 0 : ClassicExpiryHelper.encode(expiry),
8387
flags,
8488
timeout,
8589
core.context(),
@@ -241,8 +245,8 @@ public static Mono<SubdocMutateResponse> mutateIn(final Core core,
241245
durabilityLevel,
242246
clientContext,
243247
span,
244-
commands,
245-
null);
248+
null,
249+
commands);
246250
}
247251

248252
public static Mono<SubdocMutateResponse> mutateIn(final Core core,
@@ -259,23 +263,20 @@ public static Mono<SubdocMutateResponse> mutateIn(final Core core,
259263
final Optional<DurabilityLevel> durabilityLevel,
260264
final Map<String, Object> clientContext,
261265
final SpanWrapper pspan,
262-
final List<SubdocMutateRequest.Command> commands,
263-
CoreTransactionLogger logger) {
266+
@Nullable CoreExpiry expiry,
267+
final List<SubdocMutateRequest.Command> commands) {
264268
return Mono.defer(() -> {
265269
SpanWrapper span = SpanWrapperUtil.createOp(null, core.context().coreResources().requestTracerAndDecorator(), collectionIdentifier, id, TracingIdentifiers.SPAN_REQUEST_KV_MUTATE_IN, pspan);
266270
long start = System.nanoTime();
267-
268-
final boolean requiresBucketConfig = createAsDeleted || reviveDocument;
269-
CompletableFuture<BucketConfig> bucketConfigFuture;
270-
271-
if (requiresBucketConfig) {
272-
bucketConfigFuture = BucketConfigUtil.waitForBucketConfig(core, collectionIdentifier.bucket(), timeout).toFuture();
273-
} else {
274-
// Nothing will be using the bucket config so just provide null
275-
bucketConfigFuture = CompletableFuture.completedFuture(null);
276-
}
271+
CompletableFuture<BucketConfig> bucketConfigFuture = BucketConfigUtil.waitForBucketConfig(core, collectionIdentifier.bucket(), timeout).toFuture();
277272

278273
CompletableFuture<SubdocMutateResponse> future = bucketConfigFuture.thenCompose(bucketConfig -> {
274+
// Preserve expiry is only supported on 7.0+; use presence of collections capability as the indicator.
275+
boolean supportsPreserveExpiry = bucketConfig.bucketCapabilities().contains(BucketCapabilities.COLLECTIONS);
276+
// Sub-doc does not allow sending both preserveExpiry and an expiry, at least with StoreSemantics.Replace.
277+
// Also cannot send preserveExpiry with StoreSemantics.Insert.
278+
boolean sendPreserveExpiry = supportsPreserveExpiry && expiry == null && !insertDocument;
279+
279280
SubdocMutateRequest request = new SubdocMutateRequest(timeout,
280281
core.context(),
281282
collectionIdentifier,
@@ -288,8 +289,8 @@ public static Mono<SubdocMutateResponse> mutateIn(final Core core,
288289
accessDeleted,
289290
createAsDeleted,
290291
commands,
291-
0,
292-
false, // Preserve expiry only supported on 7.0+
292+
expiry == null ? 0 : ClassicExpiryHelper.encode(expiry),
293+
sendPreserveExpiry,
293294
cas,
294295
userFlags,
295296
durabilityLevel,

java-client/src/main/java/com/couchbase/client/java/transactions/ReactiveTransactionAttemptContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public Mono<TransactionGetResult> insert(ReactiveCollection collection, String i
220220
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
221221

222222
return reactor.publishOnUserScheduler(
223-
internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), new SpanWrapper(span))
223+
internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
224224
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
225225
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
226226
.doOnTerminate(() -> span.end())
@@ -257,7 +257,7 @@ public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object conte
257257
RequestSpan span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_REPLACE, internal.span());
258258
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
259259
return reactor.publishOnUserScheduler(
260-
internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), new SpanWrapper(span))
260+
internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
261261
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
262262
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
263263
.doOnTerminate(() -> span.end())

java-client/src/main/java/com/couchbase/client/java/transactions/TransactionAttemptContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public TransactionGetResult replace(TransactionGetResult doc, Object content, Tr
253253
TransactionReplaceOptions.Built built = options.build();
254254
RequestSpan span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_REPLACE, internal.span());
255255
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
256-
return internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), new SpanWrapper(span))
256+
return internal.replace(doc.internal(), encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
257257
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
258258
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
259259
.doOnTerminate(() -> span.end())
@@ -302,7 +302,7 @@ public TransactionGetResult insert(Collection collection, String id, Object cont
302302
TransactionInsertOptions.Built built = options.build();
303303
RequestSpan span = CbTracing.newSpan(internal.core().context(), TRANSACTION_OP_INSERT, internal.span());
304304
Transcoder.EncodedValue encoded = encode(content, span, serializer, built.transcoder(), internal.core().context());
305-
return internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), new SpanWrapper(span))
305+
return internal.insert(makeCollectionIdentifier(collection.async()), id, encoded.encoded(), encoded.flags(), built.expiry(), new SpanWrapper(span))
306306
.map(result -> new TransactionGetResult(result, serializer(), built.transcoder()))
307307
.doOnError(err -> span.status(RequestSpan.StatusCode.ERROR))
308308
.doOnTerminate(() -> span.end())

0 commit comments

Comments
 (0)