Skip to content

Commit fc24a7f

Browse files
authored
feat(core): add minTimestamp and maxTimestamp columns to sys.telemetry_wal (#6779)
1 parent ed02dab commit fc24a7f

File tree

11 files changed

+147
-36
lines changed

11 files changed

+147
-36
lines changed

core/src/main/java/io/questdb/ServerMain.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -375,9 +375,7 @@ protected void configureWorkerPools(final WorkerPool sharedPoolQuery, final Work
375375
if (!cairoConfig.getTelemetryConfiguration().getDisableCompletely()) {
376376
final TelemetryJob telemetryJob = new TelemetryJob(engine);
377377
freeOnExit(telemetryJob);
378-
if (cairoConfig.getTelemetryConfiguration().getEnabled()) {
379-
sharedPoolWrite.assign(telemetryJob);
380-
}
378+
sharedPoolWrite.assign(telemetryJob);
381379
}
382380

383381
} catch (Throwable thr) {

core/src/main/java/io/questdb/Telemetry.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ public void init(CairoEngine engine, SqlCompiler compiler, SqlExecutionContext s
155155
} else if (ttlWeeks > 0 && ttl > 0 && ttl != ttlWeeks * 24 * 7) {
156156
shouldAlterTtl = true;
157157
}
158+
// Drop and recreate when schema changes (safe for short-TTL telemetry tables).
159+
// Note: uses != rather than <, so a rollback to older code will also re-drop.
160+
int expectedColumnCount = telemetryType.getExpectedColumnCount();
161+
if (expectedColumnCount > 0 && meta.getColumnCount() != expectedColumnCount) {
162+
shouldDropTable = true;
163+
}
158164
}
159165
} catch (CairoException e) {
160166
if (!Chars.contains(e.getFlyweightMessage(), "table does not exist")) {
@@ -373,6 +379,10 @@ protected boolean hasTimedOut() {
373379
public interface TelemetryType<T extends AbstractTelemetryTask> {
374380
QueryBuilder getCreateSql(QueryBuilder builder, int ttlWeeks);
375381

382+
default int getExpectedColumnCount() {
383+
return -1;
384+
}
385+
376386
String getName();
377387

378388
String getTableName();

core/src/main/java/io/questdb/TelemetryEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public final class TelemetryEvent {
6262
public static final short WAL_APPLY_RESUME = 108;
6363
public static final short WAL_APPLY_SUSPEND = 107;
6464
public static final short WAL_TXN_APPLY_START = 103;
65+
public static final short WAL_TXN_COMMITTED = 109;
6566
public static final short WAL_TXN_DATA_APPLIED = 105;
6667
public static final short WAL_TXN_SQL_APPLIED = 106;
6768
public static final short WAL_TXN_STRUCTURE_CHANGE_APPLIED = 104;

core/src/main/java/io/questdb/cairo/CairoEngine.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,11 @@ public CairoEngine(CairoConfiguration configuration, @NotNull WalLocker walLocke
250250
this.telemetry = createTelemetry(TelemetryTask.TELEMETRY, configuration);
251251
this.telemetryWal = createTelemetry(TelemetryWalTask.WAL_TELEMETRY, configuration);
252252
this.telemetryMatView = createTelemetry(TelemetryMatViewTask.MAT_VIEW_TELEMETRY, configuration);
253-
this.telemetries = new ObjList<>(telemetry, telemetryWal, telemetryMatView);
253+
this.telemetries = new ObjList<>(telemetryWal, telemetryMatView);
254+
if (configuration.getTelemetryConfiguration().getEnabled()) {
255+
// This is the only one that can be switched off by the configuration
256+
telemetries.add(telemetry);
257+
}
254258
this.tableIdGenerator = IDGeneratorFactory.newIDGenerator(configuration, TableUtils.TAB_INDEX_FILE_NAME, 1);
255259
this.checkpointAgent = new DatabaseCheckpointAgent(this);
256260
this.queryRegistry = new QueryRegistry(configuration);

core/src/main/java/io/questdb/cairo/pool/WalWriterPool.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
package io.questdb.cairo.pool;
2626

27+
import io.questdb.Telemetry;
2728
import io.questdb.cairo.CairoConfiguration;
2829
import io.questdb.cairo.CairoEngine;
2930
import io.questdb.cairo.DdlListener;
@@ -33,6 +34,7 @@
3334
import io.questdb.cairo.wal.WalWriter;
3435
import io.questdb.cairo.wal.seq.TableSequencerAPI;
3536
import io.questdb.std.str.CharSink;
37+
import io.questdb.tasks.TelemetryWalTask;
3638
import org.jetbrains.annotations.NotNull;
3739
import org.jetbrains.annotations.Nullable;
3840

@@ -68,7 +70,8 @@ protected WalWriterTenant newTenant(
6870
engine.getDdlListener(tableToken),
6971
engine.getWalDirectoryPolicy(),
7072
engine.getWalLocker(),
71-
engine.getRecentWriteTracker()
73+
engine.getRecentWriteTracker(),
74+
engine.getTelemetryWal()
7275
);
7376
}
7477

@@ -88,7 +91,8 @@ private WalWriterTenant(
8891
DdlListener ddlListener,
8992
WalDirectoryPolicy walDirectoryPolicy,
9093
WalLocker walLocker,
91-
RecentWriteTracker recentWriteTracker
94+
RecentWriteTracker recentWriteTracker,
95+
Telemetry<TelemetryWalTask> telemetryWal
9296
) {
9397
super(
9498
pool.getConfiguration(),
@@ -97,7 +101,8 @@ private WalWriterTenant(
97101
ddlListener,
98102
walDirectoryPolicy,
99103
walLocker,
100-
recentWriteTracker
104+
recentWriteTracker,
105+
telemetryWal
101106
);
102107
this.pool = pool;
103108
this.rootEntry = rootEntry;

core/src/main/java/io/questdb/cairo/wal/ApplyWal2TableJob.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ private void applyOutstandingWalTransactions(
422422

423423
if (hasNext) {
424424
final long start = microClock.getTicks();
425-
walTelemetryFacade.store(WAL_TXN_APPLY_START, tableToken, walId, seqTxn, -1L, -1L, start - commitTimestamp);
425+
walTelemetryFacade.store(WAL_TXN_APPLY_START, tableToken, walId, seqTxn, -1L, -1L, start - commitTimestamp, Numbers.LONG_NULL, Numbers.LONG_NULL);
426426
writer.setSeqTxn(seqTxn);
427427
try {
428428
final TableMetadataChange metadataChangeOp = structuralChangeCursor.next();
@@ -440,7 +440,7 @@ private void applyOutstandingWalTransactions(
440440
writer.setSeqTxn(seqTxn - 1);
441441
throw th;
442442
}
443-
walTelemetryFacade.store(WAL_TXN_STRUCTURE_CHANGE_APPLIED, tableToken, walId, seqTxn, -1L, -1L, microClock.getTicks() - start);
443+
walTelemetryFacade.store(WAL_TXN_STRUCTURE_CHANGE_APPLIED, tableToken, walId, seqTxn, -1L, -1L, microClock.getTicks() - start, Numbers.LONG_NULL, Numbers.LONG_NULL);
444444
} else {
445445
// Something messed up in sequencer.
446446
// There is a transaction in WAL but no structure change record.
@@ -562,8 +562,8 @@ private void doStoreTelemetry(short event, short origin) {
562562
TelemetryTask.store(telemetry, origin, event);
563563
}
564564

565-
private void doStoreWalTelemetry(short event, TableToken tableToken, int walId, long seqTxn, long rowCount, long physicalRowCount, long latencyUs) {
566-
TelemetryWalTask.store(walTelemetry, event, tableToken.getTableId(), walId, seqTxn, rowCount, physicalRowCount, latencyUs);
565+
private void doStoreWalTelemetry(short event, TableToken tableToken, int walId, long seqTxn, long rowCount, long physicalRowCount, long latencyUs, long minTimestamp, long maxTimestamp) {
566+
TelemetryWalTask.store(walTelemetry, event, tableToken.getTableId(), walId, seqTxn, rowCount, physicalRowCount, latencyUs, minTimestamp, maxTimestamp);
567567
}
568568

569569
private void handleWalApplyFailure(TableToken tableToken, Throwable throwable, SeqTxnTracker txnTracker) {
@@ -635,7 +635,7 @@ private int processWalCommit(
635635
switch (walTxnType) {
636636
case DATA:
637637
case MAT_VIEW_DATA:
638-
walTelemetryFacade.store(WAL_TXN_APPLY_START, writer.getTableToken(), walId, seqTxn, -1L, -1L, start - commitTimestamp);
638+
walTelemetryFacade.store(WAL_TXN_APPLY_START, writer.getTableToken(), walId, seqTxn, -1L, -1L, start - commitTimestamp, txnDetails.getMinTimestamp(seqTxn), txnDetails.getMaxTimestamp(seqTxn));
639639
long skipTxnCount = calculateSkipTransactionCount(seqTxn, txnDetails);
640640
// Ask TableWriter to skip applying transactions entirely when possible
641641
boolean skipped = false;
@@ -660,7 +660,7 @@ private int processWalCommit(
660660
long walRowCount = txnDetails.getSegmentRowHi(s) - txnDetails.getSegmentRowLo(s);
661661
long commitPhRowCount = s == lastCommittedSeqTxn ? totalPhysicalRowCount : 0;
662662
metrics.addApplyRowsWritten(walRowCount, commitPhRowCount, latency);
663-
walTelemetryFacade.store(WAL_TXN_DATA_APPLIED, writer.getTableToken(), walId, s, walRowCount, commitPhRowCount, latency);
663+
walTelemetryFacade.store(WAL_TXN_DATA_APPLIED, writer.getTableToken(), walId, s, walRowCount, commitPhRowCount, latency, txnDetails.getMinTimestamp(s), txnDetails.getMaxTimestamp(s));
664664
lastCommittedRows += walRowCount;
665665
}
666666

@@ -702,9 +702,9 @@ private int processWalCommit(
702702
try (WalEventReader eventReader = walEventReader) {
703703
final WalEventCursor walEventCursor = eventReader.of(walPath, segmentTxn);
704704
final WalEventCursor.SqlInfo sqlInfo = walEventCursor.getSqlInfo();
705-
walTelemetryFacade.store(WAL_TXN_APPLY_START, writer.getTableToken(), walId, seqTxn, -1L, -1L, start - commitTimestamp);
705+
walTelemetryFacade.store(WAL_TXN_APPLY_START, writer.getTableToken(), walId, seqTxn, -1L, -1L, start - commitTimestamp, Numbers.LONG_NULL, Numbers.LONG_NULL);
706706
processWalSql(writer, sqlInfo, operationExecutor, seqTxn);
707-
walTelemetryFacade.store(WAL_TXN_SQL_APPLIED, writer.getTableToken(), walId, seqTxn, -1L, -1L, microClock.getTicks() - start);
707+
walTelemetryFacade.store(WAL_TXN_SQL_APPLIED, writer.getTableToken(), walId, seqTxn, -1L, -1L, microClock.getTicks() - start, Numbers.LONG_NULL, Numbers.LONG_NULL);
708708
lastCommittedRows = 0;
709709
return 1;
710710
}
@@ -871,7 +871,8 @@ private void processWalSql(TableWriter tableWriter, WalEventCursor.SqlInfo sqlIn
871871
private void storeTelemetryNoOp(short event, short origin) {
872872
}
873873

874-
private void storeWalTelemetryNoop(short event, TableToken tableToken, int walId, long seqTxn, long rowCount, long physicalRowCount, long latencyUs) {
874+
@SuppressWarnings("unused")
875+
private void storeWalTelemetryNoop(short event, TableToken tableToken, int walId, long seqTxn, long rowCount, long physicalRowCount, long latencyUs, long minTimestamp, long maxTimestamp) {
875876
}
876877

877878
private void updateMatViewRefreshState(
@@ -996,7 +997,7 @@ private interface TelemetryFacade {
996997

997998
@FunctionalInterface
998999
private interface WalTelemetryFacade {
999-
void store(short event, TableToken tableToken, int walId, long seqTxn, long rowCount, long physicalRowCount, long latencyUs);
1000+
void store(short event, TableToken tableToken, int walId, long seqTxn, long rowCount, long physicalRowCount, long latencyUs, long minTimestamp, long maxTimestamp);
10001001
}
10011002

10021003
private static class EjectApplyWalException extends RuntimeException {

core/src/main/java/io/questdb/cairo/wal/WalWriter.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
package io.questdb.cairo.wal;
2626

2727
import io.questdb.Metrics;
28+
import io.questdb.Telemetry;
29+
import io.questdb.TelemetryEvent;
2830
import io.questdb.cairo.AlterTableContextException;
2931
import io.questdb.cairo.BitmapIndexUtils;
3032
import io.questdb.cairo.CairoConfiguration;
@@ -96,6 +98,7 @@
9698
import io.questdb.std.str.Utf8String;
9799
import io.questdb.std.str.Utf8StringSink;
98100
import io.questdb.std.str.Utf8s;
101+
import io.questdb.tasks.TelemetryWalTask;
99102
import org.jetbrains.annotations.NotNull;
100103
import org.jetbrains.annotations.Nullable;
101104
import org.jetbrains.annotations.TestOnly;
@@ -128,10 +131,12 @@ public class WalWriter extends WalWriterBase implements TableWriterAPI {
128131
private final BoolList symbolMapNullFlagsChanged = new BoolList();
129132
private final ObjList<SymbolMapReader> symbolMapReaders = new ObjList<>();
130133
private final ObjList<DirectCharSequenceIntHashMap> symbolMaps = new ObjList<>();
134+
private final Telemetry<TelemetryWalTask> telemetryWal;
131135
private final TimestampDriver timestampDriver;
132136
private final int timestampIndex;
133137
private final ObjList<Utf8StringIntHashMap> utf8SymbolMaps = new ObjList<>();
134138
private final Uuid uuid = new Uuid();
139+
private final boolean walTelemetryEnabled;
135140
private long avgRecordSize;
136141
private SegmentColumnRollSink columnConversionSink;
137142
private int columnCount;
@@ -163,14 +168,17 @@ public WalWriter(
163168
DdlListener ddlListener,
164169
WalDirectoryPolicy walDirectoryPolicy,
165170
WalLocker walLocker,
166-
RecentWriteTracker recentWriteTracker
171+
RecentWriteTracker recentWriteTracker,
172+
Telemetry<TelemetryWalTask> telemetryWal
167173
) {
168174
super(configuration, tableToken, tableSequencerAPI, walDirectoryPolicy, walLocker);
169175

170176
LOG.info().$("open [table=").$(tableToken).I$();
171177
this.ddlListener = ddlListener;
172178
this.recentWriteTracker = recentWriteTracker;
179+
this.telemetryWal = telemetryWal;
173180
this.metrics = configuration.getMetrics();
181+
this.walTelemetryEnabled = !configuration.getTelemetryConfiguration().getDisableCompletely();
174182

175183
try {
176184
lockWal();
@@ -882,14 +890,32 @@ private void commit0(
882890
// flush disk before getting next txn
883891
syncIfRequired();
884892
final long seqTxn = getSequencerTxn();
885-
LogRecord logLine = LOG.info();
893+
if (walTelemetryEnabled) {
894+
final long minTs = txnRowCount > 0 ? txnMinTimestamp : Numbers.LONG_NULL;
895+
final long maxTs = txnRowCount > 0 ? txnMaxTimestamp : Numbers.LONG_NULL;
896+
TelemetryWalTask.store(
897+
telemetryWal,
898+
TelemetryEvent.WAL_TXN_COMMITTED,
899+
tableToken.getTableId(),
900+
walId,
901+
seqTxn,
902+
txnRowCount,
903+
txnRowCount,
904+
0L,
905+
minTs,
906+
maxTs
907+
);
908+
}
909+
final boolean hasReplaceRange = replaceRangeHiTs > replaceRangeLowTs;
910+
// Reduce logging when telemetry is enabled; all the information is saved in sys.telemetry_wal
911+
LogRecord logLine = hasReplaceRange || !walTelemetryEnabled ? LOG.info() : LOG.debug();
886912
try {
887913
logLine.$("commit [wal=").$substr(pathRootSize, path).$(Files.SEPARATOR).$(segmentId)
888914
.$(", segTxn=").$(lastSegmentTxn)
889915
.$(", seqTxn=").$(seqTxn)
890916
.$(", rowLo=").$(currentTxnStartRowNum).$(", rowHi=").$(segmentRowCount)
891917
.$(", minTs=").$ts(timestampDriver, txnMinTimestamp).$(", maxTs=").$ts(timestampDriver, txnMaxTimestamp);
892-
if (replaceRangeHiTs > replaceRangeLowTs) {
918+
if (hasReplaceRange) {
893919
logLine.$(", replaceRangeLo=").$ts(timestampDriver, replaceRangeLowTs).$(", replaceRangeHi=").$ts(timestampDriver, replaceRangeHiTs);
894920
}
895921
} finally {

core/src/main/java/io/questdb/tasks/TelemetryWalTask.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,18 @@ public QueryBuilder getCreateSql(QueryBuilder builder, int ttlWeeks) {
5454
"seqTxn LONG, " +
5555
"rowCount LONG, " +
5656
"physicalRowCount LONG, " +
57-
"latency FLOAT " +
57+
"latency FLOAT, " +
58+
"minTimestamp TIMESTAMP, " +
59+
"maxTimestamp TIMESTAMP" +
5860
") TIMESTAMP(created) PARTITION BY DAY TTL ").$(ttlWeeks > 0 ? ttlWeeks : 1).$(" WEEKS BYPASS WAL"
5961
);
6062
}
6163

64+
@Override
65+
public int getExpectedColumnCount() {
66+
return 10;
67+
}
68+
6269
@Override
6370
public String getName() {
6471
return NAME;
@@ -75,11 +82,17 @@ public ObjectFactory<TelemetryWalTask> getTaskFactory() {
7582
}
7683

7784
// Hardcoded configuration for telemetry_wal table:
85+
// - Always enabled regardless of the main telemetry setting
7886
// - Throttling disabled (0L) to record every WAL event without rate limiting
7987
// - TTL fixed at 1 week
8088
@Override
8189
public TelemetryConfiguration getTelemetryConfiguration(@NotNull CairoConfiguration configuration) {
8290
return new TelemetryConfigurationWrapper(configuration.getTelemetryConfiguration()) {
91+
@Override
92+
public boolean getEnabled() {
93+
return true;
94+
}
95+
8396
@Override
8497
public long getThrottleIntervalMicros() {
8598
return 0L;
@@ -96,6 +109,8 @@ public int getTtlWeeks() {
96109
private static final Log LOG = LogFactory.getLog(TelemetryWalTask.class);
97110
private short event;
98111
private float latency; // millis
112+
private long maxTimestamp;
113+
private long minTimestamp;
99114
private long physicalRowCount;
100115
private long queueCursor;
101116
private long rowCount;
@@ -114,7 +129,9 @@ public static void store(
114129
long seqTxn,
115130
long rowCount,
116131
long physicalRowCount,
117-
long latencyUs
132+
long latencyUs,
133+
long minTimestamp,
134+
long maxTimestamp
118135
) {
119136
final TelemetryWalTask task = telemetry.nextTask();
120137
if (task != null) {
@@ -125,6 +142,8 @@ public static void store(
125142
task.rowCount = rowCount;
126143
task.physicalRowCount = physicalRowCount;
127144
task.latency = latencyUs / 1000.0f; // millis
145+
task.minTimestamp = minTimestamp;
146+
task.maxTimestamp = maxTimestamp;
128147
telemetry.store(task);
129148
}
130149
}
@@ -154,6 +173,8 @@ public void writeTo(TableWriter writer, long timestamp) {
154173
row.putLong(5, rowCount);
155174
row.putLong(6, physicalRowCount);
156175
row.putFloat(7, latency);
176+
row.putTimestamp(8, minTimestamp);
177+
row.putTimestamp(9, maxTimestamp);
157178
row.append();
158179
} catch (CairoException e) {
159180
LOG.error().$("Could not insert a new ").$(TABLE_NAME).$(" row [errno=").$(e.getErrno())

core/src/test/java/io/questdb/test/TelemetryTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ public void testTelemetryWalTableUpgrade() throws Exception {
399399
") TIMESTAMP(created) PARTITION BY MONTH BYPASS WAL");
400400

401401
String showCreateTable = "SHOW CREATE TABLE '" + tableName + "'";
402-
String start = "ddl\n" +
402+
String header = "ddl\n" +
403403
"CREATE TABLE '" + tableName + "' ( \n" +
404404
"\tcreated TIMESTAMP,\n" +
405405
"\tevent SHORT,\n" +
@@ -408,15 +408,15 @@ public void testTelemetryWalTableUpgrade() throws Exception {
408408
"\tseqTxn LONG,\n" +
409409
"\trowCount LONG,\n" +
410410
"\tphysicalRowCount LONG,\n" +
411-
"\tlatency FLOAT\n" +
412-
") timestamp(created)";
413-
String midOld = " PARTITION BY MONTH";
414-
String midNew = " PARTITION BY DAY TTL 1 WEEK";
411+
"\tlatency FLOAT\n";
415412
String end = " BYPASS WAL;\n";
416413

417-
assertSql(start + midOld + end, showCreateTable);
414+
assertSql(header + ") timestamp(created) PARTITION BY MONTH" + end, showCreateTable);
418415
try (TelemetryJob ignore = new TelemetryJob(engine)) {
419-
assertSql(start + midNew + end, showCreateTable);
416+
assertSql(header.replace("\tlatency FLOAT\n", "\tlatency FLOAT,\n") +
417+
"\tminTimestamp TIMESTAMP,\n" +
418+
"\tmaxTimestamp TIMESTAMP\n" +
419+
") timestamp(created) PARTITION BY DAY TTL 1 WEEK" + end, showCreateTable);
420420
}
421421
});
422422
}

0 commit comments

Comments
 (0)