5050import io .questdb .cairo .vm .api .MemoryMA ;
5151import io .questdb .cairo .vm .api .MemoryMARW ;
5252import io .questdb .cairo .wal .WalUtils ;
53+ import io .questdb .cairo .wal .seq .TableTransactionLogFile ;
54+ import io .questdb .cairo .wal .seq .TableTransactionLogV1 ;
5355import io .questdb .griffin .engine .ops .AlterOperation ;
5456import io .questdb .griffin .engine .ops .AlterOperationBuilder ;
5557import io .questdb .griffin .model .IntervalUtils ;
9193import java .util .concurrent .CyclicBarrier ;
9294import java .util .concurrent .atomic .AtomicInteger ;
9395
96+ import static io .questdb .cairo .TableUtils .openSmallFile ;
97+ import static io .questdb .cairo .wal .WalUtils .SEQ_DIR ;
98+ import static io .questdb .cairo .wal .WalUtils .TXNLOG_FILE_NAME ;
99+
94100public class TableWriterTest extends AbstractCairoTest {
95101
96102 public static final String PRODUCT = "product" ;
@@ -2868,6 +2874,27 @@ public void testSetAppendPositionFailureBin2() throws Exception {
28682874 testSetAppendPositionFailure ();
28692875 }
28702876
2877+ @ Test
2878+ public void testShouldThrowExceptionWhenTxnLogIsCorrupted () throws Exception {
2879+ assertMemoryLeak (() -> {
2880+ execute ("create table product (ts TIMESTAMP, i INT) timestamp(ts) partition by day wal;" );
2881+ drainWalQueue ();
2882+ TableToken token = engine .getTableTokenIfExists ("product" );
2883+ engine .releaseAllWriters ();
2884+ engine .getTableSequencerAPI ().releaseAll ();
2885+ try (Path path = new Path (); MemoryCMARW mem = Vm .getCMARWInstance ()) {
2886+ path .of (configuration .getDbRoot ()).concat (token .getDirName ()).concat (SEQ_DIR );
2887+ openSmallFile (FF , path , path .size (), mem , TXNLOG_FILE_NAME , MemoryTag .MMAP_TX_LOG );
2888+ mem .putLong (TableTransactionLogFile .MAX_TXN_OFFSET_64 , 2 );
2889+ mem .jumpTo (TableTransactionLogFile .HEADER_SIZE );
2890+ mem .putLong (TableTransactionLogFile .HEADER_SIZE + TableTransactionLogV1 .RECORD_SIZE + TableTransactionLogFile .TX_LOG_STRUCTURE_VERSION_OFFSET , 2 );
2891+ mem .sync (false );
2892+ mem .setTruncateSize (TableTransactionLogFile .HEADER_SIZE + TableTransactionLogV1 .RECORD_SIZE + TableTransactionLogFile .TX_LOG_STRUCTURE_VERSION_OFFSET + Long .BYTES );
2893+ }
2894+ assertException ("alter table " + PRODUCT + " add column abc INT;" , 0 , "possible corruption in transaction metadata [table=product~1, offset=24, newVersion=1]" );
2895+ });
2896+ }
2897+
28712898 @ Test
28722899 public void testSinglePartitionTruncate () throws Exception {
28732900 assertMemoryLeak (() -> {
@@ -2902,7 +2929,7 @@ public void testSkipOverSpuriousDir() throws Exception {
29022929 path .of (configuration .getDbRoot ()).concat (PRODUCT_FS ).concat ("0001-01-01.123" ).slash$ ();
29032930 Assert .assertEquals (0 , ff .mkdirs (path , configuration .getMkDirMode ()));
29042931
2905- path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils . SEQ_DIR ).slash$ ();
2932+ path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (SEQ_DIR ).slash$ ();
29062933 Assert .assertEquals (0 , ff .mkdirs (path , configuration .getMkDirMode ()));
29072934
29082935 path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils .SEQ_DIR_DEPRECATED ).slash$ ();
@@ -2919,7 +2946,7 @@ public void testSkipOverSpuriousDir() throws Exception {
29192946 path .of (configuration .getDbRoot ()).concat (PRODUCT_FS ).concat ("0001-01-01.123" ).slash$ ();
29202947 Assert .assertFalse (ff .exists (path .$ ()));
29212948
2922- path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils . SEQ_DIR ).slash$ ();
2949+ path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (SEQ_DIR ).slash$ ();
29232950 Assert .assertTrue (ff .exists (path .$ ()));
29242951
29252952 path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils .SEQ_DIR_DEPRECATED ).slash$ ();
@@ -2946,7 +2973,7 @@ public void testSkipOverSpuriousDirNonPartitioned() throws Exception {
29462973 path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat ("0001-01-01.123" ).slash$ ();
29472974 Assert .assertEquals (0 , ff .mkdirs (path , configuration .getMkDirMode ()));
29482975
2949- path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils . SEQ_DIR ).slash$ ();
2976+ path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (SEQ_DIR ).slash$ ();
29502977 Assert .assertEquals (0 , ff .mkdirs (path , configuration .getMkDirMode ()));
29512978
29522979 path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils .SEQ_DIR_DEPRECATED ).slash$ ();
@@ -2963,7 +2990,7 @@ public void testSkipOverSpuriousDirNonPartitioned() throws Exception {
29632990 path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat ("0001-01-01.123" ).slash$ ();
29642991 Assert .assertTrue (ff .exists (path .$ ()));
29652992
2966- path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils . SEQ_DIR ).slash$ ();
2993+ path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (SEQ_DIR ).slash$ ();
29672994 Assert .assertTrue (ff .exists (path .$ ()));
29682995
29692996 path .of (configuration .getDbRoot ()).concat (PRODUCT ).concat (WalUtils .SEQ_DIR_DEPRECATED ).slash$ ();
0 commit comments