Skip to content

Commit fdfd123

Browse files
MariusVanDerWijdenrjl493456442
authored andcommitted
core/txpool: drop peers on invalid KZG proofs
Co-authored-by: Gary Rong <[email protected]> Co-authored-by: MariusVanDerWijden <[email protected]>:
1 parent 8ecb686 commit fdfd123

4 files changed

Lines changed: 147 additions & 7 deletions

File tree

core/txpool/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,7 @@ var (
7171
// ErrInflightTxLimitReached is returned when the maximum number of in-flight
7272
// transactions is reached for specific accounts.
7373
ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts")
74+
75+
// ErrKZGVerificationError is returned when a KZG proof was not verified correctly.
76+
ErrKZGVerificationError = errors.New("KZG verification error")
7477
)

core/txpool/validation.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func validateBlobSidecarLegacy(sidecar *types.BlobTxSidecar, hashes []common.Has
193193
}
194194
for i := range sidecar.Blobs {
195195
if err := kzg4844.VerifyBlobProof(&sidecar.Blobs[i], sidecar.Commitments[i], sidecar.Proofs[i]); err != nil {
196-
return fmt.Errorf("invalid blob %d: %v", i, err)
196+
return fmt.Errorf("%w: invalid blob proof: %v", ErrKZGVerificationError, err)
197197
}
198198
}
199199
return nil
@@ -203,7 +203,10 @@ func validateBlobSidecarOsaka(sidecar *types.BlobTxSidecar, hashes []common.Hash
203203
if len(sidecar.Proofs) != len(hashes)*kzg4844.CellProofsPerBlob {
204204
return fmt.Errorf("invalid number of %d blob proofs expected %d", len(sidecar.Proofs), len(hashes)*kzg4844.CellProofsPerBlob)
205205
}
206-
return kzg4844.VerifyCellProofs(sidecar.Blobs, sidecar.Commitments, sidecar.Proofs)
206+
if err := kzg4844.VerifyCellProofs(sidecar.Blobs, sidecar.Commitments, sidecar.Proofs); err != nil {
207+
return fmt.Errorf("%w: %v", ErrKZGVerificationError, err)
208+
}
209+
return nil
207210
}
208211

209212
// ValidationOptionsWithState define certain differences between stateful transaction

eth/fetcher/tx_fetcher.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,11 @@ type txRequest struct {
114114
// txDelivery is the notification that a batch of transactions have been added
115115
// to the pool and should be untracked.
116116
type txDelivery struct {
117-
origin string // Identifier of the peer originating the notification
118-
hashes []common.Hash // Batch of transaction hashes having been delivered
119-
metas []txMetadata // Batch of metadata associated with the delivered hashes
120-
direct bool // Whether this is a direct reply or a broadcast
117+
origin string // Identifier of the peer originating the notification
118+
hashes []common.Hash // Batch of transaction hashes having been delivered
119+
metas []txMetadata // Batch of metadata associated with the delivered hashes
120+
direct bool // Whether this is a direct reply or a broadcast
121+
violation error // Whether we encountered a protocol violation
121122
}
122123

123124
// txDrop is the notification that a peer has disconnected.
@@ -285,6 +286,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
285286
knownMeter = txReplyKnownMeter
286287
underpricedMeter = txReplyUnderpricedMeter
287288
otherRejectMeter = txReplyOtherRejectMeter
289+
violation error
288290
)
289291
if !direct {
290292
inMeter = txBroadcastInMeter
@@ -331,6 +333,12 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
331333
case errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow):
332334
underpriced++
333335

336+
case errors.Is(err, txpool.ErrKZGVerificationError):
337+
// KZG verification failed, terminate transaction processing immediately.
338+
// Since KZG verification is computationally expensive, this acts as a
339+
// defensive measure against potential DoS attacks.
340+
violation = err
341+
334342
default:
335343
otherreject++
336344
}
@@ -339,6 +347,11 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
339347
kind: batch[j].Type(),
340348
size: uint32(batch[j].Size()),
341349
})
350+
// Terminate the transaction processing if violation is encountered. All
351+
// the remaining transactions in response will be silently discarded.
352+
if violation != nil {
353+
break
354+
}
342355
}
343356
knownMeter.Mark(duplicate)
344357
underpricedMeter.Mark(underpriced)
@@ -349,9 +362,13 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
349362
time.Sleep(200 * time.Millisecond)
350363
log.Debug("Peer delivering stale transactions", "peer", peer, "rejected", otherreject)
351364
}
365+
// If we encountered a protocol violation, disconnect this peer.
366+
if violation != nil {
367+
break
368+
}
352369
}
353370
select {
354-
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
371+
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct, violation: violation}:
355372
return nil
356373
case <-f.quit:
357374
return errTerminated
@@ -746,6 +763,11 @@ func (f *TxFetcher) loop() {
746763
// Something was delivered, try to reschedule requests
747764
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
748765
}
766+
// If we encountered a protocol violation, disconnect the peer
767+
if delivery.violation != nil {
768+
log.Warn("Disconnect peer for protocol violation", "peer", delivery.origin, "error", delivery.violation)
769+
f.dropPeer(delivery.origin)
770+
}
749771

750772
case drop := <-f.drop:
751773
// A peer was dropped, remove all traces of it

eth/fetcher/tx_fetcher_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package fetcher
1818

1919
import (
20+
"crypto/sha256"
2021
"errors"
2122
"math/big"
2223
"math/rand"
@@ -28,7 +29,10 @@ import (
2829
"github.com/ethereum/go-ethereum/common/mclock"
2930
"github.com/ethereum/go-ethereum/core/txpool"
3031
"github.com/ethereum/go-ethereum/core/types"
32+
"github.com/ethereum/go-ethereum/crypto"
33+
"github.com/ethereum/go-ethereum/crypto/kzg4844"
3134
"github.com/ethereum/go-ethereum/params"
35+
"github.com/holiman/uint256"
3236
)
3337

3438
var (
@@ -1908,6 +1912,114 @@ func TestTransactionFetcherDropAlternates(t *testing.T) {
19081912
})
19091913
}
19101914

1915+
func makeInvalidBlobTx() *types.Transaction {
1916+
key, _ := crypto.GenerateKey()
1917+
blob := &kzg4844.Blob{byte(0xa)}
1918+
commitment, _ := kzg4844.BlobToCommitment(blob)
1919+
blobHash := kzg4844.CalcBlobHashV1(sha256.New(), &commitment)
1920+
cellProof, _ := kzg4844.ComputeCellProofs(blob)
1921+
1922+
// Mutate the cell proof
1923+
cellProof[0][0] = 0x0
1924+
1925+
blobtx := &types.BlobTx{
1926+
ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
1927+
Nonce: 0,
1928+
GasTipCap: uint256.NewInt(100),
1929+
GasFeeCap: uint256.NewInt(200),
1930+
Gas: 21000,
1931+
BlobFeeCap: uint256.NewInt(200),
1932+
BlobHashes: []common.Hash{blobHash},
1933+
Value: uint256.NewInt(100),
1934+
Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, []kzg4844.Blob{*blob}, []kzg4844.Commitment{commitment}, cellProof),
1935+
}
1936+
return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
1937+
}
1938+
1939+
// This test ensures that the peer will be disconnected for protocol violation
1940+
// and all its internal traces should be removed properly.
1941+
func TestTransactionProtocolViolation(t *testing.T) {
1942+
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
1943+
1944+
var (
1945+
badTx = makeInvalidBlobTx()
1946+
drop = make(chan struct{}, 1)
1947+
)
1948+
testTransactionFetcherParallel(t, txFetcherTest{
1949+
init: func() *TxFetcher {
1950+
return NewTxFetcher(
1951+
func(common.Hash) bool { return false },
1952+
func(txs []*types.Transaction) []error {
1953+
var errs []error
1954+
for range txs {
1955+
errs = append(errs, txpool.ErrKZGVerificationError)
1956+
}
1957+
return errs
1958+
},
1959+
func(a string, b []common.Hash) error {
1960+
return nil
1961+
},
1962+
func(peer string) { drop <- struct{}{} },
1963+
)
1964+
},
1965+
steps: []interface{}{
1966+
// Initial announcement to get something into the waitlist
1967+
doTxNotify{
1968+
peer: "A",
1969+
hashes: []common.Hash{testTxs[0].Hash(), badTx.Hash(), testTxs[1].Hash()},
1970+
types: []byte{types.LegacyTxType, types.BlobTxType, types.LegacyTxType},
1971+
sizes: []uint32{uint32(testTxs[0].Size()), uint32(badTx.Size()), uint32(testTxs[1].Size())},
1972+
},
1973+
isWaiting(map[string][]announce{
1974+
"A": {
1975+
{testTxs[0].Hash(), types.LegacyTxType, uint32(testTxs[0].Size())},
1976+
{badTx.Hash(), types.BlobTxType, uint32(badTx.Size())},
1977+
{testTxs[1].Hash(), types.LegacyTxType, uint32(testTxs[1].Size())},
1978+
},
1979+
}),
1980+
doWait{time: 0, step: true}, // zero time, but the blob fetching should be scheduled
1981+
1982+
isWaiting(map[string][]announce{
1983+
"A": {
1984+
{testTxs[0].Hash(), types.LegacyTxType, uint32(testTxs[0].Size())},
1985+
{testTxs[1].Hash(), types.LegacyTxType, uint32(testTxs[1].Size())},
1986+
},
1987+
}),
1988+
isScheduled{
1989+
tracking: map[string][]announce{
1990+
"A": {
1991+
{badTx.Hash(), types.BlobTxType, uint32(badTx.Size())},
1992+
},
1993+
},
1994+
fetching: map[string][]common.Hash{
1995+
"A": {badTx.Hash()},
1996+
},
1997+
},
1998+
1999+
doTxEnqueue{
2000+
peer: "A",
2001+
txs: []*types.Transaction{badTx},
2002+
direct: true,
2003+
},
2004+
// Some internal traces are left and will be cleaned by a following drop
2005+
// operation.
2006+
isWaiting(map[string][]announce{
2007+
"A": {
2008+
{testTxs[0].Hash(), types.LegacyTxType, uint32(testTxs[0].Size())},
2009+
{testTxs[1].Hash(), types.LegacyTxType, uint32(testTxs[1].Size())},
2010+
},
2011+
}),
2012+
isScheduled{},
2013+
doFunc(func() { <-drop }),
2014+
2015+
// Simulate the drop operation emitted by the server
2016+
doDrop("A"),
2017+
isWaiting(nil),
2018+
isScheduled{nil, nil, nil},
2019+
},
2020+
})
2021+
}
2022+
19112023
func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) {
19122024
t.Parallel()
19132025
testTransactionFetcher(t, tt)

0 commit comments

Comments
 (0)