@@ -202,6 +202,7 @@ public class TableWriter implements TableWriterAPI, MetadataService, Closeable {
202202 private final long dataAppendPageSize;
203203 private final DdlListener ddlListener;
204204 private final MemoryMAR ddlMem;
205+ private final LongAdder dedupRowsRemovedSinceLastCommit = new LongAdder();
205206 private final ObjList<ColumnIndexer> denseIndexers = new ObjList<>();
206207 private final ObjList<MapWriter> denseSymbolMapWriters;
207208 private final int detachedMkDirMode;
@@ -245,7 +246,6 @@ public class TableWriter implements TableWriterAPI, MetadataService, Closeable {
245246 private final int pathRootSize;
246247 private final int pathSize;
247248 private final FragileCode RECOVER_FROM_META_RENAME_FAILURE = this::recoverFromMetaRenameFailure;
248- private final LongAdder dedupRowsRemovedSinceLastCommit = new LongAdder();
249249 private final LongAdder physicallyWrittenRowsSinceLastCommit = new LongAdder();
250250 private final Row row = new RowImpl();
251251 private final LongList rowValueIsNotNull = new LongList();
@@ -725,6 +725,10 @@ public void addColumn(
725725 }
726726 }
727727
728+ public void addDedupRowsRemoved(long count) {
729+ dedupRowsRemovedSinceLastCommit.add(count);
730+ }
731+
728732 @Override
729733 public void addIndex(@NotNull CharSequence columnName, int indexValueBlockSize) {
730734 assert indexValueBlockSize == Numbers.ceilPow2(indexValueBlockSize) : "power of 2 expected";
@@ -776,8 +780,9 @@ public void addIndex(@NotNull CharSequence columnName, int indexValueBlockSize)
776780 .$("]' to ").$substr(pathRootSize, path).$();
777781 }
778782
779- public void addDedupRowsRemoved(long count) {
780- dedupRowsRemovedSinceLastCommit.add(count);
783+ public void addPhysicallyWrittenRows(long rows) {
784+ physicallyWrittenRowsSinceLastCommit.add(rows);
785+ metrics.tableWriterMetrics().addPhysicallyWrittenRows(rows);
781786 }
782787
783788 public long apply(AbstractOperation operation, long seqTxn) {
@@ -1284,9 +1289,99 @@ public void commitSeqTxn() {
12841289 txWriter.commit(denseSymbolMapWriters);
12851290 }
12861291
1287- public void addPhysicallyWrittenRows(long rows) {
1288- physicallyWrittenRowsSinceLastCommit.add(rows);
1289- metrics.tableWriterMetrics().addPhysicallyWrittenRows(rows);
1292+ public void commitWalInsertTransactions(
1293+ @Transient Path walPath,
1294+ long seqTxn,
1295+ TableWriterPressureControl pressureControl
1296+ ) {
1297+ if (hasO3() || columnVersionWriter.hasChanges()) {
1298+ // When the writer is returned to the pool, it should be rolled back. Having an open transaction is very suspicious.
1299+ // Set the writer to distressed state and throw exception so that the writer is re-created.
1300+ distressed = true;
1301+ throw CairoException.critical(0).put("cannot process WAL while in transaction");
1302+ }
1303+
1304+ physicallyWrittenRowsSinceLastCommit.reset();
1305+ dedupRowsRemovedSinceLastCommit.reset();
1306+ txWriter.beginPartitionSizeUpdate();
1307+ long commitToTimestamp = walTxnDetails.getCommitToTimestamp(seqTxn);
1308+ int transactionBlock = calculateInsertTransactionBlock(seqTxn, pressureControl);
1309+ // Capture wall clock once to reduce syscalls. Used for:
1310+ // - commit latency threshold check in processWalCommit()
1311+ // - recording last WAL commit timestamp
1312+ // - TTL wall clock comparison in housekeep()
1313+ final long wallClockMicros = configuration.getMicrosecondClock().getTicks();
1314+
1315+ boolean committed;
1316+ final long initialCommittedRowCount = txWriter.getRowCount();
1317+ walRowsProcessed = 0;
1318+
1319+ try {
1320+ if (transactionBlock == 1) {
1321+ committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
1322+ } else {
1323+ try {
1324+ int blockSize = processWalCommitBlock(
1325+ seqTxn,
1326+ transactionBlock,
1327+ pressureControl
1328+ );
1329+ committed = blockSize > 0;
1330+ seqTxn += blockSize - 1;
1331+ } catch (CairoException e) {
1332+ if (e.isBlockApplyError()) {
1333+ if (configuration.getDebugWalApplyBlockFailureNoRetry()) {
1334+ // Do not re-try the application as 1 by 1 in tests.
1335+ throw e;
1336+ }
1337+ pressureControl.onBlockApplyError();
1338+ pressureControl.updateInflightTxnBlockLength(
1339+ 1,
1340+ Math.max(1, walTxnDetails.getSegmentRowHi(seqTxn) - walTxnDetails.getSegmentRowLo(seqTxn))
1341+ );
1342+ LOG.info().$("failed to apply block, trying to apply 1 by 1 [table=").$(tableToken)
1343+ .$(", startTxn=").$(seqTxn)
1344+ .I$();
1345+ // Try applying 1 transaction at a time
1346+ committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
1347+ } else {
1348+ throw e;
1349+ }
1350+ }
1351+ }
1352+ } catch (CairoException e) {
1353+ if (e.isOutOfMemory()) {
1354+ // oom -> we cannot rely on internal TableWriter consistency, all bets are off, better to discard it and re-recreate
1355+ distressed = true;
1356+ }
1357+ throw e;
1358+ }
1359+
1360+ walTxnDetails.setIncrementRowsCommitted(walRowsProcessed);
1361+ if (committed) {
1362+ assert txWriter.getLagRowCount() == 0;
1363+
1364+ txWriter.setSeqTxn(seqTxn);
1365+ txWriter.setLagTxnCount(0);
1366+ txWriter.setLagOrdered(true);
1367+
1368+ commit00();
1369+ lastWalCommitTimestampMicros = wallClockMicros;
1370+ housekeep(wallClockMicros);
1371+ shrinkO3Mem();
1372+
1373+ assert txWriter.getPartitionCount() == 0 || txWriter.getMinTimestamp() >= txWriter.getPartitionTimestampByIndex(0);
1374+ LOG.debug().$("table ranges after the commit [table=").$(tableToken)
1375+ .$(", minTs=").$ts(timestampDriver, txWriter.getMinTimestamp())
1376+ .$(", maxTs=").$ts(timestampDriver, txWriter.getMaxTimestamp()).I$();
1377+ }
1378+
1379+ // Sometimes nothing is committed to the table, only copied to LAG.
1380+ // Sometimes data from LAG is made visible to the table using fast commit that increment transient row count.
1381+ // Keep in memory last committed seq txn, but do not write it to _txn file.
1382+ assert txWriter.getLagTxnCount() == (seqTxn - txWriter.getSeqTxn());
1383+ long rowsCommitted = txWriter.getRowCount() - initialCommittedRowCount;
1384+ metrics.tableWriterMetrics().addCommittedRows(rowsCommitted);
12901385 }
12911386
12921387 @Override
@@ -2167,6 +2262,10 @@ public DedupColumnCommitAddresses getDedupCommitAddresses() {
21672262 return dedupColumnCommitAddresses;
21682263 }
21692264
2265+ public long getDedupRowsRemovedSinceLastCommit() {
2266+ return dedupRowsRemovedSinceLastCommit.sum();
2267+ }
2268+
21702269 @TestOnly
21712270 public ObjList<MapWriter> getDenseSymbolMapWriters() {
21722271 return denseSymbolMapWriters;
@@ -2184,10 +2283,6 @@ public long getMaxTimestamp() {
21842283 return txWriter.getMaxTimestamp();
21852284 }
21862285
2187- public long getMinTimestamp() {
2188- return txWriter.getMinTimestamp();
2189- }
2190-
21912286 @Override
21922287 public int getMetaMaxUncommittedRows() {
21932288 return metadata.getMaxUncommittedRows();
@@ -2203,6 +2298,10 @@ public long getMetadataVersion() {
22032298 return txWriter.getMetadataVersion();
22042299 }
22052300
2301+ public long getMinTimestamp() {
2302+ return txWriter.getMinTimestamp();
2303+ }
2304+
22062305 public long getO3RowCount() {
22072306 return hasO3() ? getO3RowCount0() : 0L;
22082307 }
@@ -2253,105 +2352,6 @@ public long getPartitionTimestamp(int partitionIndex) {
22532352 return txWriter.getPartitionTimestampByIndex(partitionIndex);
22542353 }
22552354
2256- public void commitWalInsertTransactions(
2257- @Transient Path walPath,
2258- long seqTxn,
2259- TableWriterPressureControl pressureControl
2260- ) {
2261- if (hasO3() || columnVersionWriter.hasChanges()) {
2262- // When the writer is returned to the pool, it should be rolled back. Having an open transaction is very suspicious.
2263- // Set the writer to distressed state and throw exception so that the writer is re-created.
2264- distressed = true;
2265- throw CairoException.critical(0).put("cannot process WAL while in transaction");
2266- }
2267-
2268- physicallyWrittenRowsSinceLastCommit.reset();
2269- dedupRowsRemovedSinceLastCommit.reset();
2270- txWriter.beginPartitionSizeUpdate();
2271- long commitToTimestamp = walTxnDetails.getCommitToTimestamp(seqTxn);
2272- int transactionBlock = calculateInsertTransactionBlock(seqTxn, pressureControl);
2273- // Capture wall clock once to reduce syscalls. Used for:
2274- // - commit latency threshold check in processWalCommit()
2275- // - recording last WAL commit timestamp
2276- // - TTL wall clock comparison in housekeep()
2277- final long wallClockMicros = configuration.getMicrosecondClock().getTicks();
2278-
2279- boolean committed;
2280- final long initialCommittedRowCount = txWriter.getRowCount();
2281- walRowsProcessed = 0;
2282-
2283- try {
2284- if (transactionBlock == 1) {
2285- committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
2286- } else {
2287- try {
2288- int blockSize = processWalCommitBlock(
2289- seqTxn,
2290- transactionBlock,
2291- pressureControl
2292- );
2293- committed = blockSize > 0;
2294- seqTxn += blockSize - 1;
2295- } catch (CairoException e) {
2296- if (e.isBlockApplyError()) {
2297- if (configuration.getDebugWalApplyBlockFailureNoRetry()) {
2298- // Do not re-try the application as 1 by 1 in tests.
2299- throw e;
2300- }
2301- pressureControl.onBlockApplyError();
2302- pressureControl.updateInflightTxnBlockLength(
2303- 1,
2304- Math.max(1, walTxnDetails.getSegmentRowHi(seqTxn) - walTxnDetails.getSegmentRowLo(seqTxn))
2305- );
2306- LOG.info().$("failed to apply block, trying to apply 1 by 1 [table=").$(tableToken)
2307- .$(", startTxn=").$(seqTxn)
2308- .I$();
2309- // Try applying 1 transaction at a time
2310- committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
2311- } else {
2312- throw e;
2313- }
2314- }
2315- }
2316- } catch (CairoException e) {
2317- if (e.isOutOfMemory()) {
2318- // oom -> we cannot rely on internal TableWriter consistency, all bets are off, better to discard it and re-recreate
2319- distressed = true;
2320- }
2321- throw e;
2322- }
2323-
2324- walTxnDetails.setIncrementRowsCommitted(walRowsProcessed);
2325- if (committed) {
2326- assert txWriter.getLagRowCount() == 0;
2327-
2328- txWriter.setSeqTxn(seqTxn);
2329- txWriter.setLagTxnCount(0);
2330- txWriter.setLagOrdered(true);
2331-
2332- commit00();
2333- lastWalCommitTimestampMicros = wallClockMicros;
2334- housekeep(wallClockMicros);
2335- shrinkO3Mem();
2336-
2337- assert txWriter.getPartitionCount() == 0 || txWriter.getMinTimestamp() >= txWriter.getPartitionTimestampByIndex(0);
2338- LOG.debug().$("table ranges after the commit [table=").$(tableToken)
2339- .$(", minTs=").$ts(timestampDriver, txWriter.getMinTimestamp())
2340- .$(", maxTs=").$ts(timestampDriver, txWriter.getMaxTimestamp()).I$();
2341- }
2342-
2343- // Sometimes nothing is committed to the table, only copied to LAG.
2344- // Sometimes data from LAG is made visible to the table using fast commit that increment transient row count.
2345- // Keep in memory last committed seq txn, but do not write it to _txn file.
2346- assert txWriter.getLagTxnCount() == (seqTxn - txWriter.getSeqTxn());
2347- long rowsCommitted = txWriter.getRowCount() - initialCommittedRowCount;
2348- metrics.tableWriterMetrics().addCommittedRows(rowsCommitted);
2349- }
2350-
2351- public long getDedupRowsRemovedSinceLastCommit() {
2352- return dedupRowsRemovedSinceLastCommit.sum();
2353- }
2354-
23552355 public long getPhysicallyWrittenRowsSinceLastCommit() {
23562356 return physicallyWrittenRowsSinceLastCommit.sum();
23572357 }
@@ -3156,7 +3156,7 @@ private static void closeRemove(FilesFacade ff, long fd, LPSZ path) {
31563156 }
31573157 }
31583158
3159- private static void configureNullSetters(ObjList<Runnable> nullers, int columnType, MemoryA dataMem, MemoryA auxMem) {
3159+ private static void configureNullSetters(ObjList<Runnable> nullers, int columnType, MemoryA dataMem, MemoryA auxMem, int columnIndex, ObjList<MapWriter> symbolWriters ) {
31603160 short columnTag = ColumnType.tagOf(columnType);
31613161 if (ColumnType.isVarSize(columnTag)) {
31623162 final ColumnTypeDriver typeDriver = ColumnType.getDriver(columnTag);
@@ -3199,7 +3199,10 @@ private static void configureNullSetters(ObjList<Runnable> nullers, int columnTy
31993199 nullers.add(() -> dataMem.putChar((char) 0));
32003200 break;
32013201 case ColumnType.SYMBOL:
3202- nullers.add(() -> dataMem.putInt(SymbolTable.VALUE_IS_NULL));
3202+ nullers.add(() -> {
3203+ symbolWriters.getQuick(columnIndex).updateNullFlag(true);
3204+ dataMem.putInt(SymbolTable.VALUE_IS_NULL);
3205+ });
32033206 break;
32043207 case ColumnType.GEOBYTE:
32053208 nullers.add(() -> dataMem.putByte(GeoHashes.BYTE_NULL));
@@ -4165,9 +4168,9 @@ private void configureColumn(int type, boolean indexFlag, int index) {
41654168 o3MemColumns1.extendAndSet(baseIndex + 1, o3AuxMem1);
41664169 o3MemColumns2.extendAndSet(baseIndex, o3DataMem2);
41674170 o3MemColumns2.extendAndSet(baseIndex + 1, o3AuxMem2);
4168- configureNullSetters(nullSetters, type, dataMem, auxMem);
4169- configureNullSetters(o3NullSetters1, type, o3DataMem1, o3AuxMem1);
4170- configureNullSetters(o3NullSetters2, type, o3DataMem2, o3AuxMem2);
4171+ configureNullSetters(nullSetters, type, dataMem, auxMem, index, symbolMapWriters );
4172+ configureNullSetters(o3NullSetters1, type, o3DataMem1, o3AuxMem1, index, symbolMapWriters );
4173+ configureNullSetters(o3NullSetters2, type, o3DataMem2, o3AuxMem2, index, symbolMapWriters );
41714174
41724175 if (indexFlag && type > 0) {
41734176 indexers.extendAndSet(index, new SymbolColumnIndexer(configuration));
0 commit comments