@@ -16,10 +16,10 @@ type EventIndexer struct {
1616 deployBlock * big.Int // Block number of contract deployment
1717 filterQuery ethereum.FilterQuery
1818 indexStep uint64 // index step
19- storage * EventInfoStorage
19+ storage IEventStorage
2020}
2121
22- func NewEventIndexer (client * ethclient.Client , deployedBlock * big.Int , filter ethereum.FilterQuery , indexStep uint64 , storage * EventInfoStorage ) * EventIndexer {
22+ func NewEventIndexer (client * ethclient.Client , deployedBlock * big.Int , filter ethereum.FilterQuery , indexStep uint64 , storage IEventStorage ) * EventIndexer {
2323 return & EventIndexer {
2424 client : client ,
2525 deployBlock : deployedBlock ,
@@ -32,8 +32,8 @@ func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter et
3232func (l * EventIndexer ) Index () {
3333 log .Info ("event indexer started" )
3434
35- if l .storage .BlockProcessed == 0 {
36- l .storage .BlockProcessed = l .deployBlock .Uint64 ()
35+ if l .storage .BlockProcessed () == 0 {
36+ l .storage .SetBlockProcessed ( l .deployBlock .Uint64 () )
3737 err := l .storage .Store ()
3838 if err != nil {
3939 log .Error ("failed to store initial block number" , "error" , err )
@@ -45,43 +45,40 @@ func (l *EventIndexer) Index() {
4545 defer ticker .Stop ()
4646
4747 for range ticker .C {
48-
4948 // Get the current block number
5049 currentBlock , err := l .client .BlockNumber (context .Background ())
5150 if err != nil {
5251 log .Error ("failed to get current block number" , "error" , err )
5352 continue
5453 }
5554
56- if currentBlock <= l .storage .BlockProcessed {
57- log .Info ("no new block to index" , "current_block" , currentBlock , "last_processed_block" , l .storage .BlockProcessed )
55+ if currentBlock <= l .storage .BlockProcessed () {
56+ log .Info ("no new block to index" , "current_block" , currentBlock , "last_processed_block" , l .storage .BlockProcessed () )
5857 continue
5958 }
6059
6160 // Perform indexing operation
62- indexedEventInfo , err := l .index (l .client , big .NewInt (int64 (l .storage .BlockProcessed )), big .NewInt (int64 (currentBlock )))
61+ indexedEventInfo , err := l .index (l .client , big .NewInt (int64 (l .storage .BlockProcessed () )), big .NewInt (int64 (currentBlock )))
6362 if err != nil {
6463 log .Error ("indexing operation failed" , "error" , err )
6564 continue
6665 }
6766
6867 if indexedEventInfo != nil {
69- l .storage .EventInfo = * indexedEventInfo
68+ l .storage .SetBlockProcessed ( indexedEventInfo . BlockProcessed )
7069 } else {
71- l .storage .EventInfo = EventInfo {
72- BlockProcessed : currentBlock ,
73- }
70+ l .storage .SetBlockProcessed (currentBlock )
7471 }
72+
7573 // Update storage
7674 err = l .storage .Store ()
7775 if err != nil {
7876 log .Error ("event index complete, failed to update storage" , "error" , err )
7977 } else {
80- log .Info ("event index complete, storage updated" , "processed_block" , l .storage .EventInfo .BlockProcessed , "block_time" , l .storage .EventInfo .BlockTime )
78+ info := l .storage .EventInfo ()
79+ log .Info ("event index complete, storage updated" , "processed_block" , info .BlockProcessed , "block_time" , info .BlockTime )
8180 }
82-
8381 }
84-
8582}
8683
8784// filter logs from from_block to to_block
@@ -143,6 +140,6 @@ func (ei *EventIndexer) index(client *ethclient.Client, fromBlock, toBlock *big.
143140func (l * EventIndexer ) GetFilter () ethereum.FilterQuery {
144141 return l .filterQuery
145142}
146- func (l * EventIndexer ) GetStorage () * EventInfoStorage {
143+ func (l * EventIndexer ) GetStorage () IEventStorage {
147144 return l .storage
148145}
0 commit comments