Skip to content

Commit 36ca44d

Browse files
committed
Merge branch 'unstable' into wait-before-reconstruction
2 parents 5a24f45 + 5472cb8 commit 36ca44d

File tree

8 files changed

+107
-66
lines changed

8 files changed

+107
-66
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3699,7 +3699,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36993699
data_columns.iter().map(|c| c.as_data_column()),
37003700
)?;
37013701
self.data_availability_checker
3702-
.put_gossip_verified_data_columns(block_root, data_columns)?
3702+
.put_kzg_verified_custody_data_columns(block_root, data_columns)?
37033703
}
37043704
};
37053705

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
234234
custody_columns: DataColumnSidecarList<T::EthSpec>,
235235
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
236236
// Attributes fault to the specific peer that sent an invalid column
237-
let kzg_verified_columns = KzgVerifiedDataColumn::from_batch(custody_columns, &self.kzg)
238-
.map_err(AvailabilityCheckError::InvalidColumn)?;
237+
let kzg_verified_columns =
238+
KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg)
239+
.map_err(AvailabilityCheckError::InvalidColumn)?;
239240

240241
let verified_custody_columns = kzg_verified_columns
241242
.into_iter()
@@ -285,6 +286,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
285286
.put_kzg_verified_data_columns(block_root, custody_columns)
286287
}
287288

289+
pub fn put_kzg_verified_custody_data_columns<
290+
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
291+
>(
292+
&self,
293+
block_root: Hash256,
294+
custody_columns: I,
295+
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
296+
self.availability_cache
297+
.put_kzg_verified_data_columns(block_root, custody_columns)
298+
}
299+
288300
/// Check if we have all the blobs for a block. Returns `Availability` which has information
289301
/// about whether all components have been received or more are required.
290302
pub fn put_pending_executed_block(

beacon_node/beacon_chain/src/data_column_verification.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,17 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
274274
pub fn from_batch(
275275
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
276276
kzg: &Kzg,
277+
) -> Result<Vec<Self>, KzgError> {
278+
verify_kzg_for_data_column_list(data_columns.iter(), kzg)?;
279+
Ok(data_columns
280+
.into_iter()
281+
.map(|column| Self { data: column })
282+
.collect())
283+
}
284+
285+
pub fn from_batch_with_scoring(
286+
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
287+
kzg: &Kzg,
277288
) -> Result<Vec<Self>, Vec<(ColumnIndex, KzgError)>> {
278289
verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?;
279290
Ok(data_columns

beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
2-
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
2+
use crate::data_column_verification::KzgVerifiedDataColumn;
33
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
4+
use crate::observed_block_producers::ProposalKey;
45
use crate::observed_data_sidecars::DoNotObserve;
56
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
67
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
7-
use kzg::Kzg;
8+
use kzg::{Error as KzgError, Kzg};
89
#[cfg(test)]
910
use mockall::automock;
11+
use std::collections::HashSet;
1012
use std::sync::Arc;
1113
use task_executor::TaskExecutor;
12-
use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Hash256, Slot};
14+
use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Hash256, Slot};
1315

1416
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
1517
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
@@ -75,12 +77,28 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
7577
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
7678
}
7779

78-
pub(crate) fn verify_data_column_for_gossip(
80+
pub(crate) fn verify_data_columns_kzg(
7981
&self,
80-
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
81-
) -> Result<GossipVerifiedDataColumn<T, DoNotObserve>, GossipDataColumnError> {
82-
let index = data_column.index;
83-
GossipVerifiedDataColumn::<T, DoNotObserve>::new(data_column, index, &self.chain)
82+
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
83+
) -> Result<Vec<KzgVerifiedDataColumn<T::EthSpec>>, KzgError> {
84+
KzgVerifiedDataColumn::from_batch(data_columns, &self.chain.kzg)
85+
}
86+
87+
pub(crate) fn known_for_proposal(
88+
&self,
89+
proposal_key: ProposalKey,
90+
) -> Option<HashSet<ColumnIndex>> {
91+
self.chain
92+
.observed_column_sidecars
93+
.read()
94+
.known_for_proposal(&proposal_key)
95+
.cloned()
96+
}
97+
98+
pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
99+
self.chain
100+
.data_availability_checker
101+
.cached_data_column_indexes(block_root)
84102
}
85103

86104
pub(crate) async fn process_engine_blobs(

beacon_node/beacon_chain/src/fetch_blobs/mod.rs

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ mod fetch_blobs_beacon_adapter;
1313
mod tests;
1414

1515
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
16-
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
16+
use crate::block_verification_types::AsBlock;
17+
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
1718
#[cfg_attr(test, double)]
1819
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
1920
use crate::kzg_utils::blobs_to_data_column_sidecars;
21+
use crate::observed_block_producers::ProposalKey;
2022
use crate::observed_data_sidecars::DoNotObserve;
2123
use crate::{
2224
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
@@ -46,7 +48,7 @@ use types::{
4648
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
4749
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
4850
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
49-
CustodyColumns(Vec<GossipVerifiedDataColumn<T, DoNotObserve>>),
51+
CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>),
5052
}
5153

5254
#[derive(Debug)]
@@ -59,7 +61,7 @@ pub enum FetchEngineBlobError {
5961
ExecutionLayerMissing,
6062
InternalError(String),
6163
GossipBlob(GossipBlobError),
62-
GossipDataColumn(GossipDataColumnError),
64+
KzgError(kzg::Error),
6365
RequestFailed(ExecutionLayerError),
6466
RuntimeShutdown,
6567
TokioJoin(tokio::task::JoinError),
@@ -293,6 +295,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
293295
let chain_adapter = Arc::new(chain_adapter);
294296
let custody_columns_to_import = compute_custody_columns_to_import(
295297
&chain_adapter,
298+
block_root,
296299
block.clone(),
297300
blobs,
298301
proofs,
@@ -326,11 +329,12 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
326329
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
327330
async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
328331
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
332+
block_root: Hash256,
329333
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
330334
blobs: Vec<Blob<T::EthSpec>>,
331335
proofs: Vec<KzgProofs<T::EthSpec>>,
332336
custody_columns_indices: HashSet<ColumnIndex>,
333-
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
337+
) -> Result<Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>, FetchEngineBlobError> {
334338
let kzg = chain_adapter.kzg().clone();
335339
let spec = chain_adapter.spec().clone();
336340
let chain_adapter_cloned = chain_adapter.clone();
@@ -353,57 +357,47 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
353357
// This filtering ensures we only import and publish the custody columns.
354358
// `DataAvailabilityChecker` requires a strict match on custody columns count to
355359
// consider a block available.
356-
let custody_columns = data_columns_result
360+
let mut custody_columns = data_columns_result
357361
.map(|mut data_columns| {
358362
data_columns.retain(|col| custody_columns_indices.contains(&col.index));
359363
data_columns
360364
})
361365
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;
362366

363-
// Gossip verify data columns before publishing. This prevents blobs with invalid
367+
// Only consider columns that are not already observed on gossip.
368+
if let Some(observed_columns) = chain_adapter_cloned.known_for_proposal(
369+
ProposalKey::new(block.message().proposer_index(), block.slot()),
370+
) {
371+
custody_columns.retain(|col| !observed_columns.contains(&col.index));
372+
if custody_columns.is_empty() {
373+
return Ok(vec![]);
374+
}
375+
}
376+
377+
// Only consider columns that are not already known to data availability.
378+
if let Some(known_columns) =
379+
chain_adapter_cloned.cached_data_column_indexes(&block_root)
380+
{
381+
custody_columns.retain(|col| !known_columns.contains(&col.index));
382+
if custody_columns.is_empty() {
383+
return Ok(vec![]);
384+
}
385+
}
386+
387+
// KZG verify data columns before publishing. This prevents blobs with invalid
364388
// KZG proofs from the EL making it into the data availability checker. We do not
365389
// immediately add these blobs to the observed blobs/columns cache because we want
366390
// to allow blobs/columns to arrive on gossip and be accepted (and propagated) while
367391
// we are waiting to publish. Just before publishing we will observe the blobs/columns
368392
// and only proceed with publishing if they are not yet seen.
369-
// TODO(das): we may want to just perform kzg proof verification here, since the
370-
// `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary
371-
// to verify them.
372-
let columns_to_import_and_publish = custody_columns
373-
.into_iter()
374-
.filter_map(|col| {
375-
match chain_adapter_cloned.verify_data_column_for_gossip(col) {
376-
Ok(verified) => Some(Ok(verified)),
377-
Err(e) => match e {
378-
// Ignore already seen data columns
379-
GossipDataColumnError::PriorKnown { .. }
380-
| GossipDataColumnError::PriorKnownUnpublished => None,
381-
GossipDataColumnError::BeaconChainError(_)
382-
| GossipDataColumnError::ProposalSignatureInvalid
383-
| GossipDataColumnError::UnknownValidator(_)
384-
| GossipDataColumnError::IsNotLaterThanParent { .. }
385-
| GossipDataColumnError::InvalidKzgProof(_)
386-
| GossipDataColumnError::InvalidSubnetId { .. }
387-
| GossipDataColumnError::FutureSlot { .. }
388-
| GossipDataColumnError::PastFinalizedSlot { .. }
389-
| GossipDataColumnError::PubkeyCacheTimeout
390-
| GossipDataColumnError::ProposerIndexMismatch { .. }
391-
| GossipDataColumnError::ParentUnknown { .. }
392-
| GossipDataColumnError::NotFinalizedDescendant { .. }
393-
| GossipDataColumnError::InvalidInclusionProof
394-
| GossipDataColumnError::InvalidColumnIndex(_)
395-
| GossipDataColumnError::UnexpectedDataColumn
396-
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
397-
| GossipDataColumnError::InconsistentProofsLength { .. } => {
398-
Some(Err(e))
399-
}
400-
},
401-
}
402-
})
403-
.collect::<Result<Vec<_>, _>>()
404-
.map_err(FetchEngineBlobError::GossipDataColumn)?;
393+
let verified = chain_adapter_cloned
394+
.verify_data_columns_kzg(custody_columns)
395+
.map_err(FetchEngineBlobError::KzgError)?;
405396

406-
Ok(columns_to_import_and_publish)
397+
Ok(verified
398+
.into_iter()
399+
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
400+
.collect())
407401
},
408402
"compute_custody_columns_to_import",
409403
)

beacon_node/beacon_chain/src/fetch_blobs/tests.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
1+
use crate::data_column_verification::KzgVerifiedDataColumn;
22
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
33
use crate::fetch_blobs::{
44
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
@@ -156,14 +156,8 @@ mod get_blobs_v2 {
156156
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
157157
// All data columns already seen on gossip
158158
mock_adapter
159-
.expect_verify_data_column_for_gossip()
160-
.returning(|c| {
161-
Err(GossipDataColumnError::PriorKnown {
162-
proposer: c.block_proposer_index(),
163-
slot: c.slot(),
164-
index: c.index,
165-
})
166-
});
159+
.expect_known_for_proposal()
160+
.returning(|_| Some(hashset![0, 1, 2]));
167161
// No blobs should be processed
168162
mock_adapter.expect_process_engine_blobs().times(0);
169163

@@ -198,9 +192,17 @@ mod get_blobs_v2 {
198192
// All blobs returned, fork choice doesn't contain block
199193
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
200194
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
195+
mock_adapter.expect_known_for_proposal().returning(|_| None);
201196
mock_adapter
202-
.expect_verify_data_column_for_gossip()
203-
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
197+
.expect_cached_data_column_indexes()
198+
.returning(|_| None);
199+
mock_adapter
200+
.expect_verify_data_columns_kzg()
201+
.returning(|c| {
202+
Ok(c.into_iter()
203+
.map(KzgVerifiedDataColumn::__new_for_testing)
204+
.collect())
205+
});
204206
mock_process_engine_blobs_result(
205207
&mut mock_adapter,
206208
Ok(AvailabilityProcessingStatus::Imported(block_root)),

beacon_node/beacon_chain/src/observed_data_sidecars.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ impl<T: ObservableDataSidecar> ObservedDataSidecars<T> {
124124
Ok(is_known)
125125
}
126126

127+
pub fn known_for_proposal(&self, proposal_key: &ProposalKey) -> Option<&HashSet<u64>> {
128+
self.items.get(proposal_key)
129+
}
130+
127131
fn sanitize_data_sidecar(&self, data_sidecar: &T) -> Result<(), Error> {
128132
if data_sidecar.index() >= T::max_num_of_items(&self.spec, data_sidecar.slot()) as u64 {
129133
return Err(Error::InvalidDataIndex(data_sidecar.index()));

beacon_node/network/src/network_beacon_processor/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
853853
}
854854
EngineGetBlobsOutput::CustodyColumns(columns) => {
855855
self_cloned.publish_data_columns_gradually(
856-
columns.into_iter().map(|c| c.clone_data_column()).collect(),
856+
columns.into_iter().map(|c| c.clone_arc()).collect(),
857857
block_root,
858858
);
859859
}

0 commit comments

Comments
 (0)