Skip to content

Commit a626ff8

Browse files
authored
chore(core): revert mat view WAL Apply skipping transactions (#6460)
1 parent da5ff81 commit a626ff8

File tree

4 files changed

+11
-395
lines changed

4 files changed

+11
-395
lines changed

core/src/main/java/io/questdb/cairo/TableWriter.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3071,19 +3071,6 @@ public final void truncateSoft() {
30713071
truncate(true);
30723072
}
30733073

3074-
public boolean trySkipWalTransactions(long seqTxn, long skipTxnCount) {
3075-
assert skipTxnCount > 0;
3076-
if (txWriter.getLagRowCount() == 0 && txWriter.getLagTxnCount() == 0) {
3077-
LOG.info().$("skipping replaced WAL transactions [table=").$(tableToken)
3078-
.$("range=[").$(seqTxn)
3079-
.$(", ").$(seqTxn + skipTxnCount).$(')')
3080-
.I$();
3081-
commitSeqTxn(seqTxn + skipTxnCount - 1);
3082-
return true;
3083-
}
3084-
return false;
3085-
}
3086-
30873074
public void updateTableToken(TableToken tableToken) {
30883075
this.tableToken = tableToken;
30893076
this.metadata.updateTableToken(tableToken);

core/src/main/java/io/questdb/cairo/wal/ApplyWal2TableJob.java

Lines changed: 8 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@
7575
import static io.questdb.cairo.ErrorTag.resolveTag;
7676
import static io.questdb.cairo.TableUtils.TABLE_EXISTS;
7777
import static io.questdb.cairo.pool.AbstractMultiTenantPool.NO_LOCK_REASON;
78-
import static io.questdb.cairo.wal.WalTxnType.*;
7978
import static io.questdb.cairo.wal.WalTxnType.MAT_VIEW_INVALIDATE;
79+
import static io.questdb.cairo.wal.WalTxnType.*;
8080
import static io.questdb.cairo.wal.WalUtils.*;
8181
import static io.questdb.tasks.TableWriterTask.CMD_ALTER_TABLE;
8282
import static io.questdb.tasks.TableWriterTask.CMD_UPDATE_TABLE;
@@ -125,80 +125,6 @@ public void close() {
125125
Misc.free(mvStateWriter);
126126
}
127127

128-
private static long calculateSkipTransactionCount(long initialSeqTxn, WalTxnDetails walTxnDetails) {
129-
// Check all future transactions to see if any fully replace this transaction's range or table is truncated
130-
final long lastSeqTxn = walTxnDetails.getLastSeqTxn();
131-
132-
// Initial loop condition, as if the previous transaction was skipped
133-
for (long seqTxn = initialSeqTxn; seqTxn < lastSeqTxn; seqTxn++) {
134-
int walId = walTxnDetails.getWalId(seqTxn);
135-
if (walId < 1 || !isDataType(walTxnDetails.getWalTxnType(seqTxn))) {
136-
// This is not a data transaction
137-
return seqTxn - initialSeqTxn;
138-
}
139-
140-
long txnTsLo = walTxnDetails.getMinTimestamp(seqTxn);
141-
long txnTsHi = walTxnDetails.getMaxTimestamp(seqTxn) + 1; // Max is inclusive, make txnTsHi exclusive
142-
if (walTxnDetails.getDedupMode(seqTxn) == WalUtils.WAL_DEDUP_MODE_REPLACE_RANGE) {
143-
txnTsLo = walTxnDetails.getReplaceRangeTsLow(seqTxn);
144-
txnTsHi = walTxnDetails.getReplaceRangeTsHi(seqTxn);
145-
}
146-
147-
long firstNonSkippableTxn = Long.MAX_VALUE;
148-
boolean seqTxnCanBeSkipped = false;
149-
150-
// Even though it's O(N^2) complexity, the number of transactions we can skip is expected to be small.
151-
// So the outer loop exits very early, it is expected to exit after 1st iteration.
152-
// Unless TRUNCATE SQL found and many transactions can be skipped.
153-
// TRUNCATE has special optimization to stop scanning early.
154-
for (long futureSeqTxn = seqTxn + 1; futureSeqTxn <= lastSeqTxn; futureSeqTxn++) {
155-
int futureWalId = walTxnDetails.getWalId(futureSeqTxn);
156-
if (futureWalId > 0) {
157-
final byte walTxnType = walTxnDetails.getWalTxnType(futureSeqTxn);
158-
if (walTxnType == TRUNCATE) {
159-
// Truncate fully removes any prior data, no point doing any data apply
160-
// We can skip straight to the truncate operation or the first non-skippable operation before it
161-
return Math.min(firstNonSkippableTxn, futureSeqTxn) - initialSeqTxn;
162-
}
163-
164-
if (walTxnType == SQL) {
165-
// This is not a data transaction.
166-
// Potentially it can be an UPDATE SQL that uses existing data
167-
// so the transactions cannot be skipped even if the data is fully replaces after the update.
168-
// We can optimize partition drops to be recognized here in the future.
169-
break;
170-
}
171-
if (!WalTxnType.isDataType(walTxnType)) {
172-
firstNonSkippableTxn = Math.min(firstNonSkippableTxn, futureSeqTxn);
173-
}
174-
} else {
175-
firstNonSkippableTxn = Math.min(firstNonSkippableTxn, futureSeqTxn);
176-
}
177-
178-
// If the future transaction is a replace range operation
179-
byte futureDedupMode = walTxnDetails.getDedupMode(futureSeqTxn);
180-
if (futureDedupMode == WalUtils.WAL_DEDUP_MODE_REPLACE_RANGE) {
181-
long futureRangeTsLo = walTxnDetails.getReplaceRangeTsLow(futureSeqTxn);
182-
long futureRangeTsHi = walTxnDetails.getReplaceRangeTsHi(futureSeqTxn);
183-
184-
// Check if the future transaction's replace range fully covers this transaction
185-
if (futureRangeTsLo <= txnTsLo && futureRangeTsHi >= txnTsHi) {
186-
// Found that seqTxn is fully replaced by a future transaction
187-
// Skip it and continue checking further transactions
188-
seqTxnCanBeSkipped = true;
189-
break;
190-
}
191-
}
192-
}
193-
194-
if (!seqTxnCanBeSkipped) {
195-
return seqTxn - initialSeqTxn;
196-
}
197-
}
198-
199-
return lastSeqTxn - initialSeqTxn;
200-
}
201-
202128
private static void cleanDroppedTableDirectory(CairoEngine engine, Path tempPath, TableToken tableToken) {
203129
// Clean all the files inside table folder name except WAL directories and SEQ_DIR directory
204130
boolean allClean = true;
@@ -536,7 +462,7 @@ private void applyOutstandingWalTransactions(
536462
// be returned to the pool and dirty writes will be rolled back. We have to update the sequencer
537463
// on the state of the writer and revert any dirty txns that might have advanced. We do that
538464
// by equalizing writerTxn and dirtyWriterTxn.
539-
engine.getTableSequencerAPI().updateWriterTxns(tableToken, writer.getSeqTxn(), writer.getAppliedSeqTxn());
465+
engine.getTableSequencerAPI().updateWriterTxns(tableToken, writer.getSeqTxn(), writer.getSeqTxn());
540466
throw th;
541467
} finally {
542468
Misc.free(structuralChangeCursor);
@@ -623,27 +549,14 @@ private int processWalCommit(
623549
case DATA:
624550
case MAT_VIEW_DATA:
625551
walTelemetryFacade.store(WAL_TXN_APPLY_START, writer.getTableToken(), walId, seqTxn, -1L, -1L, start - commitTimestamp);
626-
long skipTxnCount = calculateSkipTransactionCount(seqTxn, txnDetails);
627-
628-
// Ask TableWriter to skip applying transactions entirely when possible
629-
boolean skipped = false;
630-
if (skipTxnCount > 0) {
631-
skipped = writer.trySkipWalTransactions(seqTxn, skipTxnCount);
632-
}
633-
634-
// Cannot skip, possibly there are rows in LAG that need to be committed
635-
if (!skipped) {
636-
writer.commitWalInsertTransactions(
637-
walPath,
638-
seqTxn,
639-
pressureControl
640-
);
641-
}
642-
552+
writer.commitWalInsertTransactions(
553+
walPath,
554+
seqTxn,
555+
pressureControl
556+
);
643557
final long latency = microClock.getTicks() - start;
644558
long totalPhysicalRowCount = writer.getPhysicallyWrittenRowsSinceLastCommit();
645559
long lastCommittedSeqTxn = writer.getAppliedSeqTxn();
646-
647560
lastCommittedRows = 0;
648561
for (long s = seqTxn; s <= lastCommittedSeqTxn; s++) {
649562
long walRowCount = txnDetails.getSegmentRowHi(s) - txnDetails.getSegmentRowLo(s);
@@ -683,7 +596,7 @@ private int processWalCommit(
683596
}
684597
}
685598

686-
return (int) (lastCommittedSeqTxn - seqTxn + 1);
599+
return (int) (writer.getAppliedSeqTxn() - seqTxn + 1);
687600
case SQL:
688601
try (WalEventReader eventReader = walEventReader) {
689602
final WalEventCursor walEventCursor = eventReader.of(walPath, segmentTxn);

0 commit comments

Comments
 (0)