Skip to content

Commit c1ec745

Browse files
committed
fix(core): Symbol column nullFlag not maintained when column is not included in INSERT
1 parent 017350d commit c1ec745

File tree

5 files changed

+197
-122
lines changed

5 files changed

+197
-122
lines changed

core/src/main/java/io/questdb/cairo/TableWriter.java

Lines changed: 117 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ public class TableWriter implements TableWriterAPI, MetadataService, Closeable {
202202
private final long dataAppendPageSize;
203203
private final DdlListener ddlListener;
204204
private final MemoryMAR ddlMem;
205+
private final LongAdder dedupRowsRemovedSinceLastCommit = new LongAdder();
205206
private final ObjList<ColumnIndexer> denseIndexers = new ObjList<>();
206207
private final ObjList<MapWriter> denseSymbolMapWriters;
207208
private final int detachedMkDirMode;
@@ -245,7 +246,6 @@ public class TableWriter implements TableWriterAPI, MetadataService, Closeable {
245246
private final int pathRootSize;
246247
private final int pathSize;
247248
private final FragileCode RECOVER_FROM_META_RENAME_FAILURE = this::recoverFromMetaRenameFailure;
248-
private final LongAdder dedupRowsRemovedSinceLastCommit = new LongAdder();
249249
private final LongAdder physicallyWrittenRowsSinceLastCommit = new LongAdder();
250250
private final Row row = new RowImpl();
251251
private final LongList rowValueIsNotNull = new LongList();
@@ -725,6 +725,10 @@ public void addColumn(
725725
}
726726
}
727727

728+
public void addDedupRowsRemoved(long count) {
729+
dedupRowsRemovedSinceLastCommit.add(count);
730+
}
731+
728732
@Override
729733
public void addIndex(@NotNull CharSequence columnName, int indexValueBlockSize) {
730734
assert indexValueBlockSize == Numbers.ceilPow2(indexValueBlockSize) : "power of 2 expected";
@@ -776,8 +780,9 @@ public void addIndex(@NotNull CharSequence columnName, int indexValueBlockSize)
776780
.$("]' to ").$substr(pathRootSize, path).$();
777781
}
778782

779-
public void addDedupRowsRemoved(long count) {
780-
dedupRowsRemovedSinceLastCommit.add(count);
783+
public void addPhysicallyWrittenRows(long rows) {
784+
physicallyWrittenRowsSinceLastCommit.add(rows);
785+
metrics.tableWriterMetrics().addPhysicallyWrittenRows(rows);
781786
}
782787

783788
public long apply(AbstractOperation operation, long seqTxn) {
@@ -1284,9 +1289,99 @@ public void commitSeqTxn() {
12841289
txWriter.commit(denseSymbolMapWriters);
12851290
}
12861291

1287-
public void addPhysicallyWrittenRows(long rows) {
1288-
physicallyWrittenRowsSinceLastCommit.add(rows);
1289-
metrics.tableWriterMetrics().addPhysicallyWrittenRows(rows);
1292+
public void commitWalInsertTransactions(
1293+
@Transient Path walPath,
1294+
long seqTxn,
1295+
TableWriterPressureControl pressureControl
1296+
) {
1297+
if (hasO3() || columnVersionWriter.hasChanges()) {
1298+
// When the writer is returned to the pool, it should be rolled back. Having an open transaction is very suspicious.
1299+
// Set the writer to distressed state and throw exception so that the writer is re-created.
1300+
distressed = true;
1301+
throw CairoException.critical(0).put("cannot process WAL while in transaction");
1302+
}
1303+
1304+
physicallyWrittenRowsSinceLastCommit.reset();
1305+
dedupRowsRemovedSinceLastCommit.reset();
1306+
txWriter.beginPartitionSizeUpdate();
1307+
long commitToTimestamp = walTxnDetails.getCommitToTimestamp(seqTxn);
1308+
int transactionBlock = calculateInsertTransactionBlock(seqTxn, pressureControl);
1309+
// Capture wall clock once to reduce syscalls. Used for:
1310+
// - commit latency threshold check in processWalCommit()
1311+
// - recording last WAL commit timestamp
1312+
// - TTL wall clock comparison in housekeep()
1313+
final long wallClockMicros = configuration.getMicrosecondClock().getTicks();
1314+
1315+
boolean committed;
1316+
final long initialCommittedRowCount = txWriter.getRowCount();
1317+
walRowsProcessed = 0;
1318+
1319+
try {
1320+
if (transactionBlock == 1) {
1321+
committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
1322+
} else {
1323+
try {
1324+
int blockSize = processWalCommitBlock(
1325+
seqTxn,
1326+
transactionBlock,
1327+
pressureControl
1328+
);
1329+
committed = blockSize > 0;
1330+
seqTxn += blockSize - 1;
1331+
} catch (CairoException e) {
1332+
if (e.isBlockApplyError()) {
1333+
if (configuration.getDebugWalApplyBlockFailureNoRetry()) {
1334+
// Do not re-try the application as 1 by 1 in tests.
1335+
throw e;
1336+
}
1337+
pressureControl.onBlockApplyError();
1338+
pressureControl.updateInflightTxnBlockLength(
1339+
1,
1340+
Math.max(1, walTxnDetails.getSegmentRowHi(seqTxn) - walTxnDetails.getSegmentRowLo(seqTxn))
1341+
);
1342+
LOG.info().$("failed to apply block, trying to apply 1 by 1 [table=").$(tableToken)
1343+
.$(", startTxn=").$(seqTxn)
1344+
.I$();
1345+
// Try applying 1 transaction at a time
1346+
committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
1347+
} else {
1348+
throw e;
1349+
}
1350+
}
1351+
}
1352+
} catch (CairoException e) {
1353+
if (e.isOutOfMemory()) {
1354+
// oom -> we cannot rely on internal TableWriter consistency, all bets are off, better to discard it and re-recreate
1355+
distressed = true;
1356+
}
1357+
throw e;
1358+
}
1359+
1360+
walTxnDetails.setIncrementRowsCommitted(walRowsProcessed);
1361+
if (committed) {
1362+
assert txWriter.getLagRowCount() == 0;
1363+
1364+
txWriter.setSeqTxn(seqTxn);
1365+
txWriter.setLagTxnCount(0);
1366+
txWriter.setLagOrdered(true);
1367+
1368+
commit00();
1369+
lastWalCommitTimestampMicros = wallClockMicros;
1370+
housekeep(wallClockMicros);
1371+
shrinkO3Mem();
1372+
1373+
assert txWriter.getPartitionCount() == 0 || txWriter.getMinTimestamp() >= txWriter.getPartitionTimestampByIndex(0);
1374+
LOG.debug().$("table ranges after the commit [table=").$(tableToken)
1375+
.$(", minTs=").$ts(timestampDriver, txWriter.getMinTimestamp())
1376+
.$(", maxTs=").$ts(timestampDriver, txWriter.getMaxTimestamp()).I$();
1377+
}
1378+
1379+
// Sometimes nothing is committed to the table, only copied to LAG.
1380+
// Sometimes data from LAG is made visible to the table using fast commit that increment transient row count.
1381+
// Keep in memory last committed seq txn, but do not write it to _txn file.
1382+
assert txWriter.getLagTxnCount() == (seqTxn - txWriter.getSeqTxn());
1383+
long rowsCommitted = txWriter.getRowCount() - initialCommittedRowCount;
1384+
metrics.tableWriterMetrics().addCommittedRows(rowsCommitted);
12901385
}
12911386

12921387
@Override
@@ -2167,6 +2262,10 @@ public DedupColumnCommitAddresses getDedupCommitAddresses() {
21672262
return dedupColumnCommitAddresses;
21682263
}
21692264

2265+
public long getDedupRowsRemovedSinceLastCommit() {
2266+
return dedupRowsRemovedSinceLastCommit.sum();
2267+
}
2268+
21702269
@TestOnly
21712270
public ObjList<MapWriter> getDenseSymbolMapWriters() {
21722271
return denseSymbolMapWriters;
@@ -2184,10 +2283,6 @@ public long getMaxTimestamp() {
21842283
return txWriter.getMaxTimestamp();
21852284
}
21862285

2187-
public long getMinTimestamp() {
2188-
return txWriter.getMinTimestamp();
2189-
}
2190-
21912286
@Override
21922287
public int getMetaMaxUncommittedRows() {
21932288
return metadata.getMaxUncommittedRows();
@@ -2203,6 +2298,10 @@ public long getMetadataVersion() {
22032298
return txWriter.getMetadataVersion();
22042299
}
22052300

2301+
public long getMinTimestamp() {
2302+
return txWriter.getMinTimestamp();
2303+
}
2304+
22062305
public long getO3RowCount() {
22072306
return hasO3() ? getO3RowCount0() : 0L;
22082307
}
@@ -2253,105 +2352,6 @@ public long getPartitionTimestamp(int partitionIndex) {
22532352
return txWriter.getPartitionTimestampByIndex(partitionIndex);
22542353
}
22552354

2256-
public void commitWalInsertTransactions(
2257-
@Transient Path walPath,
2258-
long seqTxn,
2259-
TableWriterPressureControl pressureControl
2260-
) {
2261-
if (hasO3() || columnVersionWriter.hasChanges()) {
2262-
// When the writer is returned to the pool, it should be rolled back. Having an open transaction is very suspicious.
2263-
// Set the writer to distressed state and throw exception so that the writer is re-created.
2264-
distressed = true;
2265-
throw CairoException.critical(0).put("cannot process WAL while in transaction");
2266-
}
2267-
2268-
physicallyWrittenRowsSinceLastCommit.reset();
2269-
dedupRowsRemovedSinceLastCommit.reset();
2270-
txWriter.beginPartitionSizeUpdate();
2271-
long commitToTimestamp = walTxnDetails.getCommitToTimestamp(seqTxn);
2272-
int transactionBlock = calculateInsertTransactionBlock(seqTxn, pressureControl);
2273-
// Capture wall clock once to reduce syscalls. Used for:
2274-
// - commit latency threshold check in processWalCommit()
2275-
// - recording last WAL commit timestamp
2276-
// - TTL wall clock comparison in housekeep()
2277-
final long wallClockMicros = configuration.getMicrosecondClock().getTicks();
2278-
2279-
boolean committed;
2280-
final long initialCommittedRowCount = txWriter.getRowCount();
2281-
walRowsProcessed = 0;
2282-
2283-
try {
2284-
if (transactionBlock == 1) {
2285-
committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
2286-
} else {
2287-
try {
2288-
int blockSize = processWalCommitBlock(
2289-
seqTxn,
2290-
transactionBlock,
2291-
pressureControl
2292-
);
2293-
committed = blockSize > 0;
2294-
seqTxn += blockSize - 1;
2295-
} catch (CairoException e) {
2296-
if (e.isBlockApplyError()) {
2297-
if (configuration.getDebugWalApplyBlockFailureNoRetry()) {
2298-
// Do not re-try the application as 1 by 1 in tests.
2299-
throw e;
2300-
}
2301-
pressureControl.onBlockApplyError();
2302-
pressureControl.updateInflightTxnBlockLength(
2303-
1,
2304-
Math.max(1, walTxnDetails.getSegmentRowHi(seqTxn) - walTxnDetails.getSegmentRowLo(seqTxn))
2305-
);
2306-
LOG.info().$("failed to apply block, trying to apply 1 by 1 [table=").$(tableToken)
2307-
.$(", startTxn=").$(seqTxn)
2308-
.I$();
2309-
// Try applying 1 transaction at a time
2310-
committed = processWalCommit(walPath, seqTxn, pressureControl, commitToTimestamp, wallClockMicros);
2311-
} else {
2312-
throw e;
2313-
}
2314-
}
2315-
}
2316-
} catch (CairoException e) {
2317-
if (e.isOutOfMemory()) {
2318-
// oom -> we cannot rely on internal TableWriter consistency, all bets are off, better to discard it and re-recreate
2319-
distressed = true;
2320-
}
2321-
throw e;
2322-
}
2323-
2324-
walTxnDetails.setIncrementRowsCommitted(walRowsProcessed);
2325-
if (committed) {
2326-
assert txWriter.getLagRowCount() == 0;
2327-
2328-
txWriter.setSeqTxn(seqTxn);
2329-
txWriter.setLagTxnCount(0);
2330-
txWriter.setLagOrdered(true);
2331-
2332-
commit00();
2333-
lastWalCommitTimestampMicros = wallClockMicros;
2334-
housekeep(wallClockMicros);
2335-
shrinkO3Mem();
2336-
2337-
assert txWriter.getPartitionCount() == 0 || txWriter.getMinTimestamp() >= txWriter.getPartitionTimestampByIndex(0);
2338-
LOG.debug().$("table ranges after the commit [table=").$(tableToken)
2339-
.$(", minTs=").$ts(timestampDriver, txWriter.getMinTimestamp())
2340-
.$(", maxTs=").$ts(timestampDriver, txWriter.getMaxTimestamp()).I$();
2341-
}
2342-
2343-
// Sometimes nothing is committed to the table, only copied to LAG.
2344-
// Sometimes data from LAG is made visible to the table using fast commit that increment transient row count.
2345-
// Keep in memory last committed seq txn, but do not write it to _txn file.
2346-
assert txWriter.getLagTxnCount() == (seqTxn - txWriter.getSeqTxn());
2347-
long rowsCommitted = txWriter.getRowCount() - initialCommittedRowCount;
2348-
metrics.tableWriterMetrics().addCommittedRows(rowsCommitted);
2349-
}
2350-
2351-
public long getDedupRowsRemovedSinceLastCommit() {
2352-
return dedupRowsRemovedSinceLastCommit.sum();
2353-
}
2354-
23552355
public long getPhysicallyWrittenRowsSinceLastCommit() {
23562356
return physicallyWrittenRowsSinceLastCommit.sum();
23572357
}
@@ -3156,7 +3156,7 @@ private static void closeRemove(FilesFacade ff, long fd, LPSZ path) {
31563156
}
31573157
}
31583158

3159-
private static void configureNullSetters(ObjList<Runnable> nullers, int columnType, MemoryA dataMem, MemoryA auxMem) {
3159+
private static void configureNullSetters(ObjList<Runnable> nullers, int columnType, MemoryA dataMem, MemoryA auxMem, int columnIndex, ObjList<MapWriter> symbolWriters) {
31603160
short columnTag = ColumnType.tagOf(columnType);
31613161
if (ColumnType.isVarSize(columnTag)) {
31623162
final ColumnTypeDriver typeDriver = ColumnType.getDriver(columnTag);
@@ -3199,7 +3199,10 @@ private static void configureNullSetters(ObjList<Runnable> nullers, int columnTy
31993199
nullers.add(() -> dataMem.putChar((char) 0));
32003200
break;
32013201
case ColumnType.SYMBOL:
3202-
nullers.add(() -> dataMem.putInt(SymbolTable.VALUE_IS_NULL));
3202+
nullers.add(() -> {
3203+
symbolWriters.getQuick(columnIndex).updateNullFlag(true);
3204+
dataMem.putInt(SymbolTable.VALUE_IS_NULL);
3205+
});
32033206
break;
32043207
case ColumnType.GEOBYTE:
32053208
nullers.add(() -> dataMem.putByte(GeoHashes.BYTE_NULL));
@@ -4165,9 +4168,9 @@ private void configureColumn(int type, boolean indexFlag, int index) {
41654168
o3MemColumns1.extendAndSet(baseIndex + 1, o3AuxMem1);
41664169
o3MemColumns2.extendAndSet(baseIndex, o3DataMem2);
41674170
o3MemColumns2.extendAndSet(baseIndex + 1, o3AuxMem2);
4168-
configureNullSetters(nullSetters, type, dataMem, auxMem);
4169-
configureNullSetters(o3NullSetters1, type, o3DataMem1, o3AuxMem1);
4170-
configureNullSetters(o3NullSetters2, type, o3DataMem2, o3AuxMem2);
4171+
configureNullSetters(nullSetters, type, dataMem, auxMem, index, symbolMapWriters);
4172+
configureNullSetters(o3NullSetters1, type, o3DataMem1, o3AuxMem1, index, symbolMapWriters);
4173+
configureNullSetters(o3NullSetters2, type, o3DataMem2, o3AuxMem2, index, symbolMapWriters);
41714174

41724175
if (indexFlag && type > 0) {
41734176
indexers.extendAndSet(index, new SymbolColumnIndexer(configuration));

core/src/main/java/io/questdb/cairo/wal/ViewWalWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public ViewWalWriter(
6868
lockWal();
6969
mkWalDir();
7070

71-
events.of(null, null, null);
71+
events.of(null, null, null, null);
7272

7373
openNewSegment();
7474
} catch (Throwable e) {

core/src/main/java/io/questdb/cairo/wal/WalEventWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class WalEventWriter implements Closeable {
7070
private boolean legacyMatViewFormat;
7171
private long startOffset = 0;
7272
private BoolList symbolMapNullFlags;
73+
private BoolList symbolMapNullFlagsChanged;
7374
private int txn = 0;
7475
private ObjList<DirectCharSequenceIntHashMap> txnSymbolMaps;
7576

@@ -240,7 +241,7 @@ private void writeSymbolMapDiffs() {
240241
final var symbolMap = txnSymbolMaps.getQuick(columnIndex);
241242
if (symbolMap != null) {
242243
final int initialCount = initialSymbolCounts.get(columnIndex);
243-
if (initialCount > 0 || (initialCount == 0 && symbolMap.size() > 0)) {
244+
if (initialCount > 0 || (initialCount == 0 && symbolMap.size() > 0) || symbolMapNullFlagsChanged.get(columnIndex)) {
244245
eventMem.putInt(columnIndex);
245246
eventMem.putBool(symbolMapNullFlags.get(columnIndex));
246247
eventMem.putInt(initialCount);
@@ -440,10 +441,11 @@ int appendViewDefinition(
440441
return txn++;
441442
}
442443

443-
void of(ObjList<DirectCharSequenceIntHashMap> txnSymbolMaps, AtomicIntList initialSymbolCounts, BoolList symbolMapNullFlags) {
444+
void of(ObjList<DirectCharSequenceIntHashMap> txnSymbolMaps, AtomicIntList initialSymbolCounts, BoolList symbolMapNullFlags, BoolList symbolMapNullFlagsChanged) {
444445
this.txnSymbolMaps = txnSymbolMaps;
445446
this.initialSymbolCounts = initialSymbolCounts;
446447
this.symbolMapNullFlags = symbolMapNullFlags;
448+
this.symbolMapNullFlagsChanged = symbolMapNullFlagsChanged;
447449
}
448450

449451
void openEventFile(Path path, int pathLen, boolean truncate, boolean systemTable) {

0 commit comments

Comments
 (0)