|
75 | 75 | import static io.questdb.cairo.ErrorTag.resolveTag; |
76 | 76 | import static io.questdb.cairo.TableUtils.TABLE_EXISTS; |
77 | 77 | import static io.questdb.cairo.pool.AbstractMultiTenantPool.NO_LOCK_REASON; |
78 | | -import static io.questdb.cairo.wal.WalTxnType.*; |
79 | 78 | import static io.questdb.cairo.wal.WalTxnType.MAT_VIEW_INVALIDATE; |
| 79 | +import static io.questdb.cairo.wal.WalTxnType.*; |
80 | 80 | import static io.questdb.cairo.wal.WalUtils.*; |
81 | 81 | import static io.questdb.tasks.TableWriterTask.CMD_ALTER_TABLE; |
82 | 82 | import static io.questdb.tasks.TableWriterTask.CMD_UPDATE_TABLE; |
@@ -125,80 +125,6 @@ public void close() { |
125 | 125 | Misc.free(mvStateWriter); |
126 | 126 | } |
127 | 127 |
|
128 | | - private static long calculateSkipTransactionCount(long initialSeqTxn, WalTxnDetails walTxnDetails) { |
129 | | - // Check all future transactions to see if any fully replace this transaction's range or table is truncated |
130 | | - final long lastSeqTxn = walTxnDetails.getLastSeqTxn(); |
131 | | - |
132 | | - // Initial loop condition, as if the previous transaction was skipped |
133 | | - for (long seqTxn = initialSeqTxn; seqTxn < lastSeqTxn; seqTxn++) { |
134 | | - int walId = walTxnDetails.getWalId(seqTxn); |
135 | | - if (walId < 1 || !isDataType(walTxnDetails.getWalTxnType(seqTxn))) { |
136 | | - // This is not a data transaction |
137 | | - return seqTxn - initialSeqTxn; |
138 | | - } |
139 | | - |
140 | | - long txnTsLo = walTxnDetails.getMinTimestamp(seqTxn); |
141 | | - long txnTsHi = walTxnDetails.getMaxTimestamp(seqTxn) + 1; // Max is inclusive, make txnTsHi exclusive |
142 | | - if (walTxnDetails.getDedupMode(seqTxn) == WalUtils.WAL_DEDUP_MODE_REPLACE_RANGE) { |
143 | | - txnTsLo = walTxnDetails.getReplaceRangeTsLow(seqTxn); |
144 | | - txnTsHi = walTxnDetails.getReplaceRangeTsHi(seqTxn); |
145 | | - } |
146 | | - |
147 | | - long firstNonSkippableTxn = Long.MAX_VALUE; |
148 | | - boolean seqTxnCanBeSkipped = false; |
149 | | - |
150 | | - // Even though it's O(N^2) complexity, the number of transactions we can skip is expected to be small. |
151 | | - // So the outer loop exits very early, it is expected to exit after 1st iteration. |
152 | | - // Unless TRUNCATE SQL found and many transactions can be skipped. |
153 | | - // TRUNCATE has special optimization to stop scanning early. |
154 | | - for (long futureSeqTxn = seqTxn + 1; futureSeqTxn <= lastSeqTxn; futureSeqTxn++) { |
155 | | - int futureWalId = walTxnDetails.getWalId(futureSeqTxn); |
156 | | - if (futureWalId > 0) { |
157 | | - final byte walTxnType = walTxnDetails.getWalTxnType(futureSeqTxn); |
158 | | - if (walTxnType == TRUNCATE) { |
159 | | - // Truncate fully removes any prior data, no point doing any data apply |
160 | | - // We can skip straight to the truncate operation or the first non-skippable operation before it |
161 | | - return Math.min(firstNonSkippableTxn, futureSeqTxn) - initialSeqTxn; |
162 | | - } |
163 | | - |
164 | | - if (walTxnType == SQL) { |
165 | | - // This is not a data transaction. |
166 | | - // Potentially it can be an UPDATE SQL that uses existing data |
167 | | - // so the transactions cannot be skipped even if the data is fully replaces after the update. |
168 | | - // We can optimize partition drops to be recognized here in the future. |
169 | | - break; |
170 | | - } |
171 | | - if (!WalTxnType.isDataType(walTxnType)) { |
172 | | - firstNonSkippableTxn = Math.min(firstNonSkippableTxn, futureSeqTxn); |
173 | | - } |
174 | | - } else { |
175 | | - firstNonSkippableTxn = Math.min(firstNonSkippableTxn, futureSeqTxn); |
176 | | - } |
177 | | - |
178 | | - // If the future transaction is a replace range operation |
179 | | - byte futureDedupMode = walTxnDetails.getDedupMode(futureSeqTxn); |
180 | | - if (futureDedupMode == WalUtils.WAL_DEDUP_MODE_REPLACE_RANGE) { |
181 | | - long futureRangeTsLo = walTxnDetails.getReplaceRangeTsLow(futureSeqTxn); |
182 | | - long futureRangeTsHi = walTxnDetails.getReplaceRangeTsHi(futureSeqTxn); |
183 | | - |
184 | | - // Check if the future transaction's replace range fully covers this transaction |
185 | | - if (futureRangeTsLo <= txnTsLo && futureRangeTsHi >= txnTsHi) { |
186 | | - // Found that seqTxn is fully replaced by a future transaction |
187 | | - // Skip it and continue checking further transactions |
188 | | - seqTxnCanBeSkipped = true; |
189 | | - break; |
190 | | - } |
191 | | - } |
192 | | - } |
193 | | - |
194 | | - if (!seqTxnCanBeSkipped) { |
195 | | - return seqTxn - initialSeqTxn; |
196 | | - } |
197 | | - } |
198 | | - |
199 | | - return lastSeqTxn - initialSeqTxn; |
200 | | - } |
201 | | - |
202 | 128 | private static void cleanDroppedTableDirectory(CairoEngine engine, Path tempPath, TableToken tableToken) { |
203 | 129 | // Clean all the files inside table folder name except WAL directories and SEQ_DIR directory |
204 | 130 | boolean allClean = true; |
@@ -536,7 +462,7 @@ private void applyOutstandingWalTransactions( |
536 | 462 | // be returned to the pool and dirty writes will be rolled back. We have to update the sequencer |
537 | 463 | // on the state of the writer and revert any dirty txns that might have advanced. We do that |
538 | 464 | // by equalizing writerTxn and dirtyWriterTxn. |
539 | | - engine.getTableSequencerAPI().updateWriterTxns(tableToken, writer.getSeqTxn(), writer.getAppliedSeqTxn()); |
| 465 | + engine.getTableSequencerAPI().updateWriterTxns(tableToken, writer.getSeqTxn(), writer.getSeqTxn()); |
540 | 466 | throw th; |
541 | 467 | } finally { |
542 | 468 | Misc.free(structuralChangeCursor); |
@@ -623,27 +549,14 @@ private int processWalCommit( |
623 | 549 | case DATA: |
624 | 550 | case MAT_VIEW_DATA: |
625 | 551 | walTelemetryFacade.store(WAL_TXN_APPLY_START, writer.getTableToken(), walId, seqTxn, -1L, -1L, start - commitTimestamp); |
626 | | - long skipTxnCount = calculateSkipTransactionCount(seqTxn, txnDetails); |
627 | | - |
628 | | - // Ask TableWriter to skip applying transactions entirely when possible |
629 | | - boolean skipped = false; |
630 | | - if (skipTxnCount > 0) { |
631 | | - skipped = writer.trySkipWalTransactions(seqTxn, skipTxnCount); |
632 | | - } |
633 | | - |
634 | | - // Cannot skip, possibly there are rows in LAG that need to be committed |
635 | | - if (!skipped) { |
636 | | - writer.commitWalInsertTransactions( |
637 | | - walPath, |
638 | | - seqTxn, |
639 | | - pressureControl |
640 | | - ); |
641 | | - } |
642 | | - |
| 552 | + writer.commitWalInsertTransactions( |
| 553 | + walPath, |
| 554 | + seqTxn, |
| 555 | + pressureControl |
| 556 | + ); |
643 | 557 | final long latency = microClock.getTicks() - start; |
644 | 558 | long totalPhysicalRowCount = writer.getPhysicallyWrittenRowsSinceLastCommit(); |
645 | 559 | long lastCommittedSeqTxn = writer.getAppliedSeqTxn(); |
646 | | - |
647 | 560 | lastCommittedRows = 0; |
648 | 561 | for (long s = seqTxn; s <= lastCommittedSeqTxn; s++) { |
649 | 562 | long walRowCount = txnDetails.getSegmentRowHi(s) - txnDetails.getSegmentRowLo(s); |
@@ -683,7 +596,7 @@ private int processWalCommit( |
683 | 596 | } |
684 | 597 | } |
685 | 598 |
|
686 | | - return (int) (lastCommittedSeqTxn - seqTxn + 1); |
| 599 | + return (int) (writer.getAppliedSeqTxn() - seqTxn + 1); |
687 | 600 | case SQL: |
688 | 601 | try (WalEventReader eventReader = walEventReader) { |
689 | 602 | final WalEventCursor walEventCursor = eventReader.of(walPath, segmentTxn); |
|
0 commit comments