Skip to content

Commit f923054

Browse files
authored
Merge pull request #680 from morph-l2/op_process_tx
feat(tx-submitter): optimize rollup transaction processing Main Changes: * Optimized transaction processing logic: Remove tracking of tx after six blocks * Removed failedindex and related logic * Added extensive mocks to simulate transaction state transitions and validate tx parameters after operations in different states * Increased max blobfee per gas quote to 3x to handle a busy blob market
2 parents de03c80 + aea1001 commit f923054

37 files changed

+3253
-789
lines changed

tx-submitter/.gitignore

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,56 @@
1-
.idea
2-
.env*
3-
build
4-
tx-submitter
1+
# IDE and Editor files
2+
.idea/
3+
.vscode/
4+
*.swp
5+
*.swo
6+
*~
7+
8+
# OS generated files
59
.DS_Store
6-
.vscode
7-
*.log
10+
.DS_Store?
11+
._*
12+
.Spotlight-V100
13+
.Trashes
14+
ehthumbs.db
15+
Thumbs.db
16+
17+
# Go specific
18+
*.exe
19+
*.exe~
20+
*.dll
21+
*.so
22+
*.dylib
23+
*.test
24+
*.out
25+
go.work
26+
27+
# Binary files
28+
tx-submitter
29+
**/tx-submitter
30+
build/
831
*debug_bin*
9-
journal.rlp
32+
33+
# Config and Environment files
34+
.env*
35+
36+
# Test and Debug files
37+
test/
38+
testleveldb/
39+
test_data/
40+
debug/
41+
*.test
42+
*.log
43+
*.pprof
44+
cpu.prof
45+
mem.prof
46+
trace.out
47+
48+
# Coverage reports
49+
coverage.txt
50+
51+
# Transaction files
52+
journal.rlp
53+
journal*.rlp
54+
55+
# Temporary files
56+
tmp/

tx-submitter/constants/methods.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package constants
2+
3+
const (
4+
// MethodCommitBatch is the method name for committing a batch
5+
MethodCommitBatch = "commitBatch"
6+
// MethodFinalizeBatch is the method name for finalizing a batch
7+
MethodFinalizeBatch = "finalizeBatch"
8+
)

tx-submitter/db/db.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import (
88
"morph-l2/tx-submitter/utils"
99

1010
"github.com/morph-l2/go-ethereum/ethdb/leveldb"
11+
"github.com/syndtr/goleveldb/leveldb/errors"
1112
)
1213

1314
var (
14-
ErrKeyNotFound = fmt.Errorf("not found")
15+
ErrKeyNotFound = errors.ErrNotFound
1516
)
1617

1718
type Db struct {
@@ -55,6 +56,9 @@ func (d *Db) GetString(key string) (string, error) {
5556
defer d.m.Unlock()
5657
v, err := d.db.Get([]byte(key))
5758
if err != nil {
59+
if err == errors.ErrNotFound {
60+
return "", ErrKeyNotFound
61+
}
5862
return "", fmt.Errorf("failed to get key from leveldb %w", err)
5963
}
6064
return string(v), nil

tx-submitter/db/interface.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package db
2+
3+
// Database defines the interface for database operations
4+
type Database interface {
5+
GetString(key string) (string, error)
6+
PutString(key, val string) error
7+
GetFloat(key string) (float64, error)
8+
PutFloat(key string, val float64) error
9+
Close() error
10+
}

tx-submitter/event/indexer.go

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ type EventIndexer struct {
1616
deployBlock *big.Int // Block number of contract deployment
1717
filterQuery ethereum.FilterQuery
1818
indexStep uint64 // index step
19-
storage *EventInfoStorage
19+
storage IEventStorage
2020
}
2121

22-
func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter ethereum.FilterQuery, indexStep uint64, storage *EventInfoStorage) *EventIndexer {
22+
func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter ethereum.FilterQuery, indexStep uint64, storage IEventStorage) *EventIndexer {
2323
return &EventIndexer{
2424
client: client,
2525
deployBlock: deployedBlock,
@@ -32,8 +32,8 @@ func NewEventIndexer(client *ethclient.Client, deployedBlock *big.Int, filter et
3232
func (l *EventIndexer) Index() {
3333
log.Info("event indexer started")
3434

35-
if l.storage.BlockProcessed == 0 {
36-
l.storage.BlockProcessed = l.deployBlock.Uint64()
35+
if l.storage.BlockProcessed() == 0 {
36+
l.storage.SetBlockProcessed(l.deployBlock.Uint64())
3737
err := l.storage.Store()
3838
if err != nil {
3939
log.Error("failed to store initial block number", "error", err)
@@ -45,43 +45,40 @@ func (l *EventIndexer) Index() {
4545
defer ticker.Stop()
4646

4747
for range ticker.C {
48-
4948
// Get the current block number
5049
currentBlock, err := l.client.BlockNumber(context.Background())
5150
if err != nil {
5251
log.Error("failed to get current block number", "error", err)
5352
continue
5453
}
5554

56-
if currentBlock <= l.storage.BlockProcessed {
57-
log.Info("no new block to index", "current_block", currentBlock, "last_processed_block", l.storage.BlockProcessed)
55+
if currentBlock <= l.storage.BlockProcessed() {
56+
log.Info("no new block to index", "current_block", currentBlock, "last_processed_block", l.storage.BlockProcessed())
5857
continue
5958
}
6059

6160
// Perform indexing operation
62-
indexedEventInfo, err := l.index(l.client, big.NewInt(int64(l.storage.BlockProcessed)), big.NewInt(int64(currentBlock)))
61+
indexedEventInfo, err := l.index(l.client, big.NewInt(int64(l.storage.BlockProcessed())), big.NewInt(int64(currentBlock)))
6362
if err != nil {
6463
log.Error("indexing operation failed", "error", err)
6564
continue
6665
}
6766

6867
if indexedEventInfo != nil {
69-
l.storage.EventInfo = *indexedEventInfo
68+
l.storage.SetBlockProcessed(indexedEventInfo.BlockProcessed)
7069
} else {
71-
l.storage.EventInfo = EventInfo{
72-
BlockProcessed: currentBlock,
73-
}
70+
l.storage.SetBlockProcessed(currentBlock)
7471
}
72+
7573
// Update storage
7674
err = l.storage.Store()
7775
if err != nil {
7876
log.Error("event index complete, failed to update storage", "error", err)
7977
} else {
80-
log.Info("event index complete, storage updated", "processed_block", l.storage.EventInfo.BlockProcessed, "block_time", l.storage.EventInfo.BlockTime)
78+
info := l.storage.EventInfo()
79+
log.Info("event index complete, storage updated", "processed_block", info.BlockProcessed, "block_time", info.BlockTime)
8180
}
82-
8381
}
84-
8582
}
8683

8784
// filter logs from from_block to to_block
@@ -143,6 +140,6 @@ func (ei *EventIndexer) index(client *ethclient.Client, fromBlock, toBlock *big.
143140
func (l *EventIndexer) GetFilter() ethereum.FilterQuery {
144141
return l.filterQuery
145142
}
146-
func (l *EventIndexer) GetStorage() *EventInfoStorage {
143+
func (l *EventIndexer) GetStorage() IEventStorage {
147144
return l.storage
148145
}

tx-submitter/event/storage.go

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@ import (
77

88
"morph-l2/tx-submitter/db"
99
"morph-l2/tx-submitter/params"
10-
"morph-l2/tx-submitter/utils"
1110
)
1211

1312
type IEventStorage interface {
1413
Store() error
1514
Load() error
15+
BlockProcessed() uint64
16+
SetBlockProcessed(blockNum uint64)
17+
BlockTime() uint64
18+
SetBlockTime(blockTime uint64)
19+
EventInfo() EventInfo
1620
}
1721

1822
type EventInfo struct {
@@ -21,21 +25,20 @@ type EventInfo struct {
2125
}
2226

2327
type EventInfoStorage struct {
24-
EventInfo
25-
db *db.Db
26-
mu sync.Mutex
28+
eventInfo EventInfo
29+
db db.Database
30+
mu sync.RWMutex
2731
}
2832

29-
func NewEventInfoStorage(db *db.Db) *EventInfoStorage {
33+
func NewEventInfoStorage(db db.Database) *EventInfoStorage {
3034
return &EventInfoStorage{
3135
db: db,
3236
}
3337
}
3438

3539
func (e *EventInfoStorage) Store() error {
36-
3740
// Convert struct to JSON string
38-
jsonData, err := json.Marshal(e.EventInfo)
41+
jsonData, err := json.Marshal(e.eventInfo)
3942
if err != nil {
4043
return fmt.Errorf("failed to convert struct to JSON: %w", err)
4144
}
@@ -49,31 +52,57 @@ func (e *EventInfoStorage) Store() error {
4952
}
5053
return nil
5154
}
55+
5256
func (e *EventInfoStorage) Load() error {
5357
e.mu.Lock()
5458
defer e.mu.Unlock()
55-
evnetInfo, err := e.db.GetString(params.EventInfoKey)
59+
60+
jsonStr, err := e.db.GetString(params.EventInfoKey)
5661
if err != nil {
57-
if utils.ErrStringMatch(err, db.ErrKeyNotFound) {
58-
e.EventInfo = EventInfo{}
59-
jsonData, err := json.Marshal(e.EventInfo)
60-
if err != nil {
61-
return fmt.Errorf("failed to marshal json: %w", err)
62-
}
63-
err = e.db.PutString(params.EventInfoKey, string(jsonData))
64-
if err != nil {
65-
return fmt.Errorf("failed to init eventinfo to db: %w", err)
66-
}
62+
if err == db.ErrKeyNotFound {
63+
// Initialize with default values if not found
64+
e.eventInfo = EventInfo{}
6765
return nil
6866
}
69-
return fmt.Errorf("failed to load eventinfo from db: %w", err)
67+
return fmt.Errorf("failed to read from db: %w", err)
7068
}
7169

72-
// parse json data to struct
73-
err = json.Unmarshal([]byte(evnetInfo), &e.EventInfo)
70+
err = json.Unmarshal([]byte(jsonStr), &e.eventInfo)
7471
if err != nil {
75-
return fmt.Errorf("failed to unmarshal JSON: %w", err)
72+
return fmt.Errorf("failed to parse JSON: %w", err)
7673
}
7774

7875
return nil
7976
}
77+
78+
func (e *EventInfoStorage) BlockProcessed() uint64 {
79+
e.mu.RLock()
80+
defer e.mu.RUnlock()
81+
return e.eventInfo.BlockProcessed
82+
}
83+
84+
func (e *EventInfoStorage) SetBlockProcessed(blockNum uint64) {
85+
e.mu.Lock()
86+
defer e.mu.Unlock()
87+
e.eventInfo.BlockProcessed = blockNum
88+
}
89+
90+
func (e *EventInfoStorage) BlockTime() uint64 {
91+
e.mu.RLock()
92+
defer e.mu.RUnlock()
93+
return e.eventInfo.BlockTime
94+
}
95+
96+
func (e *EventInfoStorage) SetBlockTime(blockTime uint64) {
97+
e.mu.Lock()
98+
defer e.mu.Unlock()
99+
e.eventInfo.BlockTime = blockTime
100+
}
101+
102+
// EventInfo returns a copy of the current event info.
103+
// This ensures thread safety without exposing the internal state.
104+
func (e *EventInfoStorage) EventInfo() EventInfo {
105+
e.mu.RLock()
106+
defer e.mu.RUnlock()
107+
return e.eventInfo
108+
}

tx-submitter/event/storage_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,39 @@ package event
22

33
import (
44
"morph-l2/tx-submitter/db"
5+
"os"
6+
"path/filepath"
57
"testing"
68

79
"github.com/stretchr/testify/require"
810
)
911

1012
func TestEventInfoStorage(t *testing.T) {
13+
// Create a unique test directory
14+
testDir := filepath.Join(t.TempDir(), "testleveldb")
1115

12-
db, err := db.New("./testleveldb")
16+
// Cleanup before test (in case it exists)
17+
os.RemoveAll(testDir)
18+
19+
// Cleanup after test
20+
t.Cleanup(func() {
21+
os.RemoveAll(testDir)
22+
})
23+
24+
db, err := db.New(testDir)
1325
require.NoError(t, err)
1426
storage := NewEventInfoStorage(db)
1527
err = storage.Load()
1628
require.NoError(t, err)
1729

18-
storage.BlockTime = 100
19-
storage.BlockProcessed = 100
30+
storage.SetBlockTime(100)
31+
storage.SetBlockProcessed(100)
2032
err = storage.Store()
2133
require.NoError(t, err)
2234

2335
storage2 := NewEventInfoStorage(db)
2436
err = storage2.Load()
2537
require.NoError(t, err)
26-
require.Equal(t, storage.BlockTime, storage2.BlockTime)
27-
require.Equal(t, storage.BlockProcessed, storage2.BlockProcessed)
38+
require.Equal(t, storage.BlockTime(), storage2.BlockTime())
39+
require.Equal(t, storage.BlockProcessed(), storage2.BlockProcessed())
2840
}

tx-submitter/iface/metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package iface
2+
3+
type IMetrics interface {
4+
// Reorg metrics
5+
IncReorgs()
6+
SetReorgDepth(depth float64)
7+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package iface
2+
3+
import "context"
4+
5+
// IReorgDetector defines the interface for chain reorganization detection
6+
type IReorgDetector interface {
7+
// DetectReorg checks if a chain reorganization has occurred
8+
// Returns:
9+
// - bool: whether a reorg was detected
10+
// - uint64: the depth of the reorg (number of blocks from head)
11+
// - error: any error that occurred during detection
12+
DetectReorg(ctx context.Context) (bool, uint64, error)
13+
}

tx-submitter/localpool/journal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (j *Journal) ParseAllTxs() ([]*types.Transaction, error) {
8282
content, err := readFileContent(j.path)
8383

8484
if err != nil {
85-
return nil, fmt.Errorf("failed to parse txs: %w", err)
85+
return nil, fmt.Errorf("failed to read journal file: %w", err)
8686
}
8787

8888
if len(content) == 0 {

0 commit comments

Comments
 (0)