@@ -422,7 +422,7 @@ private void applyOutstandingWalTransactions(
422422
423423 if (hasNext ) {
424424 final long start = microClock .getTicks ();
425- walTelemetryFacade .store (WAL_TXN_APPLY_START , tableToken , walId , seqTxn , -1L , -1L , start - commitTimestamp );
425+ walTelemetryFacade .store (WAL_TXN_APPLY_START , tableToken , walId , seqTxn , -1L , -1L , start - commitTimestamp , Numbers . LONG_NULL , Numbers . LONG_NULL );
426426 writer .setSeqTxn (seqTxn );
427427 try {
428428 final TableMetadataChange metadataChangeOp = structuralChangeCursor .next ();
@@ -440,7 +440,7 @@ private void applyOutstandingWalTransactions(
440440 writer .setSeqTxn (seqTxn - 1 );
441441 throw th ;
442442 }
443- walTelemetryFacade .store (WAL_TXN_STRUCTURE_CHANGE_APPLIED , tableToken , walId , seqTxn , -1L , -1L , microClock .getTicks () - start );
443+ walTelemetryFacade .store (WAL_TXN_STRUCTURE_CHANGE_APPLIED , tableToken , walId , seqTxn , -1L , -1L , microClock .getTicks () - start , Numbers . LONG_NULL , Numbers . LONG_NULL );
444444 } else {
445445 // Something messed up in sequencer.
446446 // There is a transaction in WAL but no structure change record.
@@ -562,8 +562,8 @@ private void doStoreTelemetry(short event, short origin) {
562562 TelemetryTask .store (telemetry , origin , event );
563563 }
564564
565- private void doStoreWalTelemetry (short event , TableToken tableToken , int walId , long seqTxn , long rowCount , long physicalRowCount , long latencyUs ) {
566- TelemetryWalTask .store (walTelemetry , event , tableToken .getTableId (), walId , seqTxn , rowCount , physicalRowCount , latencyUs );
565+ private void doStoreWalTelemetry (short event , TableToken tableToken , int walId , long seqTxn , long rowCount , long physicalRowCount , long latencyUs , long minTimestamp , long maxTimestamp ) {
566+ TelemetryWalTask .store (walTelemetry , event , tableToken .getTableId (), walId , seqTxn , rowCount , physicalRowCount , latencyUs , minTimestamp , maxTimestamp );
567567 }
568568
569569 private void handleWalApplyFailure (TableToken tableToken , Throwable throwable , SeqTxnTracker txnTracker ) {
@@ -635,7 +635,7 @@ private int processWalCommit(
635635 switch (walTxnType ) {
636636 case DATA :
637637 case MAT_VIEW_DATA :
638- walTelemetryFacade .store (WAL_TXN_APPLY_START , writer .getTableToken (), walId , seqTxn , -1L , -1L , start - commitTimestamp );
638+ walTelemetryFacade .store (WAL_TXN_APPLY_START , writer .getTableToken (), walId , seqTxn , -1L , -1L , start - commitTimestamp , txnDetails . getMinTimestamp ( seqTxn ), txnDetails . getMaxTimestamp ( seqTxn ) );
639639 long skipTxnCount = calculateSkipTransactionCount (seqTxn , txnDetails );
640640 // Ask TableWriter to skip applying transactions entirely when possible
641641 boolean skipped = false ;
@@ -660,7 +660,7 @@ private int processWalCommit(
660660 long walRowCount = txnDetails .getSegmentRowHi (s ) - txnDetails .getSegmentRowLo (s );
661661 long commitPhRowCount = s == lastCommittedSeqTxn ? totalPhysicalRowCount : 0 ;
662662 metrics .addApplyRowsWritten (walRowCount , commitPhRowCount , latency );
663- walTelemetryFacade .store (WAL_TXN_DATA_APPLIED , writer .getTableToken (), walId , s , walRowCount , commitPhRowCount , latency );
663+ walTelemetryFacade .store (WAL_TXN_DATA_APPLIED , writer .getTableToken (), walId , s , walRowCount , commitPhRowCount , latency , txnDetails . getMinTimestamp ( s ), txnDetails . getMaxTimestamp ( s ) );
664664 lastCommittedRows += walRowCount ;
665665 }
666666
@@ -702,9 +702,9 @@ private int processWalCommit(
702702 try (WalEventReader eventReader = walEventReader ) {
703703 final WalEventCursor walEventCursor = eventReader .of (walPath , segmentTxn );
704704 final WalEventCursor .SqlInfo sqlInfo = walEventCursor .getSqlInfo ();
705- walTelemetryFacade .store (WAL_TXN_APPLY_START , writer .getTableToken (), walId , seqTxn , -1L , -1L , start - commitTimestamp );
705+ walTelemetryFacade .store (WAL_TXN_APPLY_START , writer .getTableToken (), walId , seqTxn , -1L , -1L , start - commitTimestamp , Numbers . LONG_NULL , Numbers . LONG_NULL );
706706 processWalSql (writer , sqlInfo , operationExecutor , seqTxn );
707- walTelemetryFacade .store (WAL_TXN_SQL_APPLIED , writer .getTableToken (), walId , seqTxn , -1L , -1L , microClock .getTicks () - start );
707+ walTelemetryFacade .store (WAL_TXN_SQL_APPLIED , writer .getTableToken (), walId , seqTxn , -1L , -1L , microClock .getTicks () - start , Numbers . LONG_NULL , Numbers . LONG_NULL );
708708 lastCommittedRows = 0 ;
709709 return 1 ;
710710 }
@@ -871,7 +871,8 @@ private void processWalSql(TableWriter tableWriter, WalEventCursor.SqlInfo sqlIn
871871 private void storeTelemetryNoOp (short event , short origin ) {
872872 }
873873
874- private void storeWalTelemetryNoop (short event , TableToken tableToken , int walId , long seqTxn , long rowCount , long physicalRowCount , long latencyUs ) {
874+ @ SuppressWarnings ("unused" )
875+ private void storeWalTelemetryNoop (short event , TableToken tableToken , int walId , long seqTxn , long rowCount , long physicalRowCount , long latencyUs , long minTimestamp , long maxTimestamp ) {
875876 }
876877
877878 private void updateMatViewRefreshState (
@@ -996,7 +997,7 @@ private interface TelemetryFacade {
996997
997998 @ FunctionalInterface
998999 private interface WalTelemetryFacade {
999- void store (short event , TableToken tableToken , int walId , long seqTxn , long rowCount , long physicalRowCount , long latencyUs );
1000+ void store (short event , TableToken tableToken , int walId , long seqTxn , long rowCount , long physicalRowCount , long latencyUs , long minTimestamp , long maxTimestamp );
10001001 }
10011002
10021003 private static class EjectApplyWalException extends RuntimeException {
0 commit comments