Skip to content

Commit c34ea51

Browse files
committed
feat(tx-submitter): add rollup loss control functionality
- Implement batch cache to store and retrieve rollup batches - Add logic to calculate and compare transaction fees with collected L1 fees - Introduce new configuration options for rollup loss control - Update transaction submission process to include fee checks - Add unit tests for new functionality
1 parent c56fb93 commit c34ea51

File tree

9 files changed

+309
-47
lines changed

9 files changed

+309
-47
lines changed

tx-submitter/entry.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ func Main() func(ctx *cli.Context) error {
6969
"rough_estimate_per_l1_msg", cfg.RollupTxGasPerL1Msg,
7070
"log_level", cfg.LogLevel,
7171
"leveldb_pathname", cfg.LeveldbPathName,
72+
"l1_stop_in_blocks", cfg.BlockNotIncreasedThreshold,
73+
"rollup_loss_control", cfg.RollupLossControl,
74+
"rolluo_loss_buffer", cfg.MaxRollupLossBuffer,
7275
)
7376

7477
ctx, cancel := context.WithCancel(context.Background())

tx-submitter/flags/flags.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,37 +229,37 @@ var (
229229
}
230230

231231
// external sign
232-
ExternalSign = cli.BoolFlag{
232+
ExternalSignFlag = cli.BoolFlag{
233233
Name: "external_sign",
234234
Usage: "Enable external sign",
235235
EnvVar: prefixEnvVar("EXTERNAL_SIGN"),
236236
}
237237

238238
// address
239-
ExternalSignAddress = cli.StringFlag{
239+
ExternalSignAddressFlag = cli.StringFlag{
240240
Name: "external_sign_address",
241241
Usage: "The address of the external sign",
242242
EnvVar: prefixEnvVar("EXTERNAL_SIGN_ADDRESS"),
243243
}
244244
// appid
245-
ExternalSignAppid = cli.StringFlag{
245+
ExternalSignAppidFlag = cli.StringFlag{
246246
Name: "external_sign_appid",
247247
Usage: "The appid of the external sign",
248248
EnvVar: prefixEnvVar("EXTERNAL_SIGN_APPID"),
249249
}
250250
// chain
251-
ExternalSignChain = cli.StringFlag{
251+
ExternalSignChainFlag = cli.StringFlag{
252252
Name: "external_sign_chain",
253253
Usage: "The chain of the external sign",
254254
EnvVar: prefixEnvVar("EXTERNAL_SIGN_CHAIN"),
255255
}
256256
// url
257-
ExternalSignUrl = cli.StringFlag{
257+
ExternalSignUrlFlag = cli.StringFlag{
258258
Name: "external_sign_url",
259259
Usage: "The url of the external sign",
260260
EnvVar: prefixEnvVar("EXTERNAL_SIGN_URL"),
261261
}
262-
ExternalSignRsaPriv = cli.StringFlag{
262+
ExternalSignRsaPrivFlag = cli.StringFlag{
263263
Name: "external_rsa_priv",
264264
Usage: "The rsa private key of the external sign",
265265
EnvVar: "SEQUENCER_EXTERNAL_SIGN_RSA_PRIV", // use sequencer rsa from xxx
@@ -300,12 +300,24 @@ var (
300300
}
301301

302302
// l1 block not incremented threshold
303-
BlockNotIncreasedThreshold = cli.Int64Flag{
303+
BlockNotIncreasedThresholdFlag = cli.Int64Flag{
304304
Name: "block_not_increased_threshold",
305305
Usage: "The threshold for block not incremented",
306306
Value: 5,
307307
EnvVar: prefixEnvVar("BLOCK_NOT_INCREASED_THRESHOLD"),
308308
}
309+
RollupLossControlFlag = cli.BoolFlag{
310+
Name: "rollup_loss_control",
311+
Usage: "Enable rollup loss control",
312+
EnvVar: prefixEnvVar("ROLLUP_LOSS_CONTROL"),
313+
}
314+
// max rollup loss buffer
315+
MaxRollupLossBufferFlag = cli.Int64Flag{
316+
Name: "max_rollup_loss_buffer",
317+
Usage: "single transaction rollup maximum allowable loss buff",
318+
Value: 50, // default 50% percent
319+
EnvVar: prefixEnvVar("MAX_ROLLUP_LOSS_BUFFER"),
320+
}
309321
)
310322

311323
var requiredFlags = []cli.Flag{
@@ -349,18 +361,19 @@ var optionalFlags = []cli.Flag{
349361
MaxTxsInPendingPoolFlag,
350362

351363
// external sign
352-
ExternalSign,
353-
ExternalSignAddress,
354-
ExternalSignAppid,
355-
ExternalSignChain,
356-
ExternalSignUrl,
357-
ExternalSignRsaPriv,
364+
ExternalSignFlag,
365+
ExternalSignAddressFlag,
366+
ExternalSignAppidFlag,
367+
ExternalSignChainFlag,
368+
ExternalSignUrlFlag,
369+
ExternalSignRsaPrivFlag,
358370
RoughEstimateGasFlag,
359371
RotatorBufferFlag,
360372
StakingEventStoreFileFlag,
361373
EventIndexStepFlag,
362374
LeveldbPathNameFlag,
363-
BlockNotIncreasedThreshold,
375+
BlockNotIncreasedThresholdFlag,
376+
MaxRollupLossBufferFlag,
364377
}
365378

366379
// Flags contains the list of configuration options available to the binary.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package services
2+
3+
import (
4+
"sync"
5+
6+
"github.com/morph-l2/go-ethereum/eth"
7+
)
8+
9+
type BatchCache struct {
10+
batchCache map[uint64]*eth.RPCRollupBatch
11+
m sync.Mutex
12+
}
13+
14+
func NewBatchCache() *BatchCache {
15+
return &BatchCache{
16+
batchCache: make(map[uint64]*eth.RPCRollupBatch),
17+
}
18+
}
19+
20+
func (b *BatchCache) Get(batchIndex uint64) (*eth.RPCRollupBatch, bool) {
21+
b.m.Lock()
22+
defer b.m.Unlock()
23+
24+
batch, ok := b.batchCache[batchIndex]
25+
return batch, ok
26+
}
27+
func (b *BatchCache) Set(batchIndex uint64, batch *eth.RPCRollupBatch) {
28+
b.m.Lock()
29+
defer b.m.Unlock()
30+
31+
b.batchCache[batchIndex] = batch
32+
}
33+
34+
func (b *BatchCache) Delete(batchIndex uint64) {
35+
b.m.Lock()
36+
defer b.m.Unlock()
37+
38+
delete(b.batchCache, batchIndex)
39+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package services
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestBatchCache(t *testing.T) {
10+
11+
cache := NewBatchCache()
12+
cache.Set(1, nil)
13+
_, ok := cache.Get(1)
14+
require.True(t, ok)
15+
cache.Delete(1)
16+
_, ok = cache.Get(1)
17+
require.False(t, ok)
18+
_, ok = cache.Get(2)
19+
require.False(t, ok)
20+
21+
}

tx-submitter/services/rollup.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type Rollup struct {
7575
// collectedL1FeeSum
7676
collectedL1FeeSum float64
7777
// batchcache
78-
batchCache map[uint64]*eth.RPCRollupBatch
78+
batchCache *BatchCache
7979
bm *l1checker.BlockMonitor
8080
}
8181

@@ -114,7 +114,7 @@ func NewRollup(
114114
cfg: cfg,
115115
signer: types.LatestSignerForChainID(chainId),
116116
externalRsaPriv: rsaPriv,
117-
batchCache: make(map[uint64]*eth.RPCRollupBatch),
117+
batchCache: NewBatchCache(),
118118
ldb: ldb,
119119
bm: bm,
120120
}
@@ -256,7 +256,7 @@ func (r *Rollup) ProcessTx() error {
256256
if ispending {
257257
if txRecord.sendTime+uint64(r.cfg.TxTimeout.Seconds()) < uint64(time.Now().Unix()) {
258258
log.Info("tx timeout", "tx", rtx.Hash().Hex(), "nonce", rtx.Nonce(), "method", method)
259-
newtx, err := r.ReSubmitTx(false, rtx)
259+
newtx, err := r.ReSubmitTx(false, rtx, r.GetReSubmitBatchIndex(method, rtx.Data()))
260260
if err != nil {
261261
log.Error("resubmit tx", "error", err, "tx", rtx.Hash().Hex(), "nonce", rtx.Nonce())
262262
return fmt.Errorf("resubmit tx error:%w", err)
@@ -281,7 +281,8 @@ func (r *Rollup) ProcessTx() error {
281281
"nonce", rtx.Nonce(),
282282
"query_times", txRecord.queryTimes,
283283
)
284-
replacedtx, err := r.ReSubmitTx(true, rtx)
284+
285+
replacedtx, err := r.ReSubmitTx(true, rtx, r.GetReSubmitBatchIndex(method, rtx.Data()))
285286
if err != nil {
286287
log.Error("resend discarded tx", "old_tx", rtx.Hash().String(), "nonce", rtx.Nonce(), "error", err)
287288
if utils.ErrStringMatch(err, core.ErrNonceTooLow) {
@@ -358,7 +359,7 @@ func (r *Rollup) ProcessTx() error {
358359
}
359360
r.metrics.SetRollupCost(fee)
360361
index := utils.ParseParentBatchIndex(rtx.Data()) + 1
361-
batch, ok := r.batchCache[index]
362+
batch, ok := r.batchCache.Get(index)
362363
if ok {
363364
collectedL1FeeFloat := ToEtherFloat((*big.Int)(batch.CollectedL1Fee))
364365
r.collectedL1FeeSum += collectedL1FeeFloat
@@ -368,7 +369,7 @@ func (r *Rollup) ProcessTx() error {
368369
}
369370
r.metrics.SetCollectedL1Fee(ToEtherFloat((*big.Int)(batch.CollectedL1Fee)))
370371
// remove batch from cache
371-
delete(r.batchCache, index)
372+
r.batchCache.Delete(index)
372373
} else {
373374
log.Warn("batch not found in batchCache while set collect fee metrics",
374375
"index", index,
@@ -527,7 +528,7 @@ func (r *Rollup) finalize() error {
527528
"size", signedTx.Size(),
528529
)
529530

530-
err = r.SendTx(signedTx)
531+
err = r.SendTx(signedTx, 0)
531532
if err != nil {
532533
log.Error("send finalize tx to mempool", "error", err.Error())
533534
if utils.ErrStringMatch(err, core.ErrNonceTooLow) {
@@ -650,9 +651,13 @@ func (r *Rollup) rollup() error {
650651
return nil
651652
}
652653

653-
batch, err := GetRollupBatchByIndex(batchIndex, r.L2Clients)
654-
if err != nil {
655-
return fmt.Errorf("get rollup batch by index err:%v", err)
654+
var batch *eth.RPCRollupBatch
655+
batch, ok := r.batchCache.Get(batchIndex)
656+
if !ok {
657+
batch, err = GetRollupBatchByIndex(batchIndex, r.L2Clients)
658+
if err != nil {
659+
return fmt.Errorf("get rollup batch by index err:%v", err)
660+
}
656661
}
657662

658663
// check if the batch is valid
@@ -668,7 +673,7 @@ func (r *Rollup) rollup() error {
668673

669674
// set batch cache
670675
// it shoud be removed after the batch is committed
671-
r.batchCache[batchIndex] = batch
676+
r.batchCache.Set(batchIndex, batch)
672677

673678
signature, err := r.buildSignatureInput(batch)
674679
if err != nil {
@@ -708,7 +713,7 @@ func (r *Rollup) rollup() error {
708713
}
709714

710715
if r.cfg.RoughEstimateGas {
711-
msgcnt := utils.ParseL1MessageCnt(batch.BlockContexts)
716+
msgcnt := r.ParseL1MessageCnt(batch.BlockContexts)
712717
gas = r.RoughRollupGasEstimate(msgcnt)
713718
log.Info("rough estimate rollup tx gas", "gas", gas, "msgcnt", msgcnt)
714719
} else {
@@ -786,7 +791,7 @@ func (r *Rollup) rollup() error {
786791
"blob_len", len(signedTx.BlobHashes()),
787792
)
788793

789-
err = r.SendTx(signedTx)
794+
err = r.SendTx(signedTx, batchIndex)
790795
if err != nil {
791796
log.Error("send tx to mempool", "error", err.Error())
792797
if utils.ErrStringMatch(err, core.ErrNonceTooLow) {
@@ -1091,7 +1096,7 @@ func UpdateGasLimit(tx *types.Transaction) (*types.Transaction, error) {
10911096
}
10921097

10931098
// send tx to l1 with business logic check
1094-
func (r *Rollup) SendTx(tx *types.Transaction) error {
1099+
func (r *Rollup) SendTx(tx *types.Transaction, batchIndex uint64) error {
10951100

10961101
// judge tx info is valid
10971102
if tx == nil {
@@ -1101,7 +1106,30 @@ func (r *Rollup) SendTx(tx *types.Transaction) error {
11011106
if !r.bm.IsGrowth() {
11021107
return fmt.Errorf("block not growth in %d blocks time", r.cfg.BlockNotIncreasedThreshold)
11031108
}
1109+
// check batch loss
1110+
if r.cfg.RollupLossControl {
1111+
// calc fee
1112+
fee := utils.CalcFeeForTx(tx)
1113+
// get batch
1114+
var collectedL1Fee *big.Int
1115+
batch, ok := r.batchCache.Get(batchIndex)
1116+
if ok {
1117+
collectedL1Fee = batch.CollectedL1Fee.ToInt()
1118+
if collectedL1Fee == nil {
1119+
collectedL1Fee = big.NewInt(0)
1120+
}
1121+
} else {
1122+
log.Warn("batch not found in cache when calc fee before SendTx", "batchIndex", batchIndex)
1123+
collectedL1Fee = big.NewInt(0)
1124+
}
1125+
// targetFee = fee * (100 + cfg.RotatorBuffer)/100
1126+
targetFee := new(big.Int).Mul(fee, big.NewInt(100+r.cfg.RotatorBuffer))
1127+
targetFee.Div(targetFee, big.NewInt(100))
11041128

1129+
if collectedL1Fee.Cmp(targetFee) < 0 {
1130+
return fmt.Errorf("tx fee exceed collectedL1Fee: targetFee=%v,fee=%v,collectedL1Fee:=%v", targetFee, fee, collectedL1Fee)
1131+
}
1132+
}
11051133
err := sendTx(r.L1Client, r.cfg.TxFeeLimit, tx)
11061134
if err != nil {
11071135
return err
@@ -1138,7 +1166,7 @@ func sendTx(client iface.Client, txFeeLimit uint64, tx *types.Transaction) error
11381166
return client.SendTransaction(context.Background(), tx)
11391167
}
11401168

1141-
func (r *Rollup) ReSubmitTx(resend bool, tx *types.Transaction) (*types.Transaction, error) {
1169+
func (r *Rollup) ReSubmitTx(resend bool, tx *types.Transaction, batchIndex uint64) (*types.Transaction, error) {
11421170
if tx == nil {
11431171
return nil, errors.New("nil tx")
11441172
}
@@ -1229,7 +1257,7 @@ func (r *Rollup) ReSubmitTx(resend bool, tx *types.Transaction) (*types.Transact
12291257
return nil, fmt.Errorf("sign tx error:%w", err)
12301258
}
12311259
// send tx
1232-
err = r.SendTx(newTx)
1260+
err = r.SendTx(newTx, batchIndex)
12331261
if err != nil {
12341262
return nil, fmt.Errorf("send tx error:%w", err)
12351263
}
@@ -1335,3 +1363,11 @@ func (r *Rollup) InitFeeMetricsSum() error {
13351363
r.metrics.CollectedL1FeeSum.Add(r.collectedL1FeeSum)
13361364
return nil
13371365
}
1366+
1367+
func (r *Rollup) GetReSubmitBatchIndex(method string, calldta []byte) uint64 {
1368+
if method == "commitBatch" {
1369+
return utils.ParseBatchIndex(method, calldta)
1370+
} else {
1371+
return 0
1372+
}
1373+
}

tx-submitter/services/utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package services
22

33
import (
44
"crypto/sha256"
5+
"encoding/binary"
56
"math/big"
67

78
"github.com/morph-l2/go-ethereum/common"
9+
"github.com/morph-l2/go-ethereum/common/hexutil"
810
"github.com/morph-l2/go-ethereum/core/types"
911
"github.com/morph-l2/go-ethereum/crypto/kzg4844"
1012
"github.com/morph-l2/go-ethereum/params"
@@ -73,3 +75,16 @@ func ToEtherFloat(weiAmt *big.Int) float64 {
7375
return fEtherAmt
7476

7577
}
78+
79+
func (r *Rollup) ParseL1MessageCnt(blockContexts hexutil.Bytes) uint64 {
80+
81+
var l1msgcnt uint64
82+
blockNum := binary.BigEndian.Uint16(blockContexts[:2])
83+
remainingBz := blockContexts[2:]
84+
for i := 0; i < int(blockNum); i++ {
85+
l1msgcnt += uint64(binary.BigEndian.Uint16(remainingBz[58:60]))
86+
remainingBz = remainingBz[60:]
87+
}
88+
89+
return l1msgcnt
90+
}

0 commit comments

Comments
 (0)