@@ -13,10 +13,12 @@ mod fetch_blobs_beacon_adapter;
1313mod tests;
1414
1515use crate :: blob_verification:: { GossipBlobError , GossipVerifiedBlob } ;
16- use crate :: data_column_verification:: { GossipDataColumnError , GossipVerifiedDataColumn } ;
16+ use crate :: block_verification_types:: AsBlock ;
17+ use crate :: data_column_verification:: KzgVerifiedCustodyDataColumn ;
1718#[ cfg_attr( test, double) ]
1819use crate :: fetch_blobs:: fetch_blobs_beacon_adapter:: FetchBlobsBeaconAdapter ;
1920use crate :: kzg_utils:: blobs_to_data_column_sidecars;
21+ use crate :: observed_block_producers:: ProposalKey ;
2022use crate :: observed_data_sidecars:: DoNotObserve ;
2123use crate :: {
2224 metrics, AvailabilityProcessingStatus , BeaconChain , BeaconChainError , BeaconChainTypes ,
@@ -46,7 +48,7 @@ use types::{
4648pub enum EngineGetBlobsOutput < T : BeaconChainTypes > {
4749 Blobs ( Vec < GossipVerifiedBlob < T , DoNotObserve > > ) ,
4850 /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
49- CustodyColumns ( Vec < GossipVerifiedDataColumn < T , DoNotObserve > > ) ,
51+ CustodyColumns ( Vec < KzgVerifiedCustodyDataColumn < T :: EthSpec > > ) ,
5052}
5153
5254#[ derive( Debug ) ]
@@ -59,7 +61,7 @@ pub enum FetchEngineBlobError {
5961 ExecutionLayerMissing ,
6062 InternalError ( String ) ,
6163 GossipBlob ( GossipBlobError ) ,
62- GossipDataColumn ( GossipDataColumnError ) ,
64+ KzgError ( kzg :: Error ) ,
6365 RequestFailed ( ExecutionLayerError ) ,
6466 RuntimeShutdown ,
6567 TokioJoin ( tokio:: task:: JoinError ) ,
@@ -293,6 +295,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
293295 let chain_adapter = Arc :: new ( chain_adapter) ;
294296 let custody_columns_to_import = compute_custody_columns_to_import (
295297 & chain_adapter,
298+ block_root,
296299 block. clone ( ) ,
297300 blobs,
298301 proofs,
@@ -326,11 +329,12 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
326329/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
327330async fn compute_custody_columns_to_import < T : BeaconChainTypes > (
328331 chain_adapter : & Arc < FetchBlobsBeaconAdapter < T > > ,
332+ block_root : Hash256 ,
329333 block : Arc < SignedBeaconBlock < T :: EthSpec , FullPayload < T :: EthSpec > > > ,
330334 blobs : Vec < Blob < T :: EthSpec > > ,
331335 proofs : Vec < KzgProofs < T :: EthSpec > > ,
332336 custody_columns_indices : HashSet < ColumnIndex > ,
333- ) -> Result < Vec < GossipVerifiedDataColumn < T , DoNotObserve > > , FetchEngineBlobError > {
337+ ) -> Result < Vec < KzgVerifiedCustodyDataColumn < T :: EthSpec > > , FetchEngineBlobError > {
334338 let kzg = chain_adapter. kzg ( ) . clone ( ) ;
335339 let spec = chain_adapter. spec ( ) . clone ( ) ;
336340 let chain_adapter_cloned = chain_adapter. clone ( ) ;
@@ -353,57 +357,47 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
353357 // This filtering ensures we only import and publish the custody columns.
354358 // `DataAvailabilityChecker` requires a strict match on custody columns count to
355359 // consider a block available.
356- let custody_columns = data_columns_result
360+ let mut custody_columns = data_columns_result
357361 . map ( |mut data_columns| {
358362 data_columns. retain ( |col| custody_columns_indices. contains ( & col. index ) ) ;
359363 data_columns
360364 } )
361365 . map_err ( FetchEngineBlobError :: DataColumnSidecarError ) ?;
362366
363- // Gossip verify data columns before publishing. This prevents blobs with invalid
367+ // Only consider columns that are not already observed on gossip.
368+ if let Some ( observed_columns) = chain_adapter_cloned. known_for_proposal (
369+ ProposalKey :: new ( block. message ( ) . proposer_index ( ) , block. slot ( ) ) ,
370+ ) {
371+ custody_columns. retain ( |col| !observed_columns. contains ( & col. index ) ) ;
372+ if custody_columns. is_empty ( ) {
373+ return Ok ( vec ! [ ] ) ;
374+ }
375+ }
376+
377+ // Only consider columns that are not already known to data availability.
378+ if let Some ( known_columns) =
379+ chain_adapter_cloned. cached_data_column_indexes ( & block_root)
380+ {
381+ custody_columns. retain ( |col| !known_columns. contains ( & col. index ) ) ;
382+ if custody_columns. is_empty ( ) {
383+ return Ok ( vec ! [ ] ) ;
384+ }
385+ }
386+
387+ // KZG verify data columns before publishing. This prevents blobs with invalid
364388 // KZG proofs from the EL making it into the data availability checker. We do not
365389 // immediately add these blobs to the observed blobs/columns cache because we want
366390 // to allow blobs/columns to arrive on gossip and be accepted (and propagated) while
367391 // we are waiting to publish. Just before publishing we will observe the blobs/columns
368392 // and only proceed with publishing if they are not yet seen.
369- // TODO(das): we may want to just perform kzg proof verification here, since the
370- // `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary
371- // to verify them.
372- let columns_to_import_and_publish = custody_columns
373- . into_iter ( )
374- . filter_map ( |col| {
375- match chain_adapter_cloned. verify_data_column_for_gossip ( col) {
376- Ok ( verified) => Some ( Ok ( verified) ) ,
377- Err ( e) => match e {
378- // Ignore already seen data columns
379- GossipDataColumnError :: PriorKnown { .. }
380- | GossipDataColumnError :: PriorKnownUnpublished => None ,
381- GossipDataColumnError :: BeaconChainError ( _)
382- | GossipDataColumnError :: ProposalSignatureInvalid
383- | GossipDataColumnError :: UnknownValidator ( _)
384- | GossipDataColumnError :: IsNotLaterThanParent { .. }
385- | GossipDataColumnError :: InvalidKzgProof ( _)
386- | GossipDataColumnError :: InvalidSubnetId { .. }
387- | GossipDataColumnError :: FutureSlot { .. }
388- | GossipDataColumnError :: PastFinalizedSlot { .. }
389- | GossipDataColumnError :: PubkeyCacheTimeout
390- | GossipDataColumnError :: ProposerIndexMismatch { .. }
391- | GossipDataColumnError :: ParentUnknown { .. }
392- | GossipDataColumnError :: NotFinalizedDescendant { .. }
393- | GossipDataColumnError :: InvalidInclusionProof
394- | GossipDataColumnError :: InvalidColumnIndex ( _)
395- | GossipDataColumnError :: UnexpectedDataColumn
396- | GossipDataColumnError :: InconsistentCommitmentsLength { .. }
397- | GossipDataColumnError :: InconsistentProofsLength { .. } => {
398- Some ( Err ( e) )
399- }
400- } ,
401- }
402- } )
403- . collect :: < Result < Vec < _ > , _ > > ( )
404- . map_err ( FetchEngineBlobError :: GossipDataColumn ) ?;
393+ let verified = chain_adapter_cloned
394+ . verify_data_columns_kzg ( custody_columns)
395+ . map_err ( FetchEngineBlobError :: KzgError ) ?;
405396
406- Ok ( columns_to_import_and_publish)
397+ Ok ( verified
398+ . into_iter ( )
399+ . map ( KzgVerifiedCustodyDataColumn :: from_asserted_custody)
400+ . collect ( ) )
407401 } ,
408402 "compute_custody_columns_to_import" ,
409403 )
0 commit comments