@@ -20,7 +20,7 @@ use tracing::{debug, error, instrument};
2020use types:: blob_sidecar:: { BlobIdentifier , BlobSidecar , FixedBlobSidecarList } ;
2121use types:: {
2222 BlobSidecarList , ChainSpec , DataColumnSidecar , DataColumnSidecarList , Epoch , EthSpec , Hash256 ,
23- RuntimeVariableList , SignedBeaconBlock ,
23+ RuntimeVariableList , SignedBeaconBlock , Slot ,
2424} ;
2525
2626mod error;
@@ -38,14 +38,20 @@ use crate::observed_data_sidecars::ObservationStrategy;
3838pub use error:: { Error as AvailabilityCheckError , ErrorCategory as AvailabilityCheckErrorCategory } ;
3939use types:: non_zero_usize:: new_non_zero_usize;
4040
41- /// The LRU Cache stores `PendingComponents` which can store up to
42- /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
43- /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
44- /// to 1024 means the maximum size of the cache is ~ 0.8 GB. But the cache
45- /// will target a size of less than 75% of capacity.
46- pub const OVERFLOW_LRU_CAPACITY : NonZeroUsize = new_non_zero_usize ( 1024 ) ;
47- /// Until tree-states is implemented, we can't store very many states in memory :(
48- pub const STATE_LRU_CAPACITY_NON_ZERO : NonZeroUsize = new_non_zero_usize ( 2 ) ;
41+ /// The LRU Cache stores `PendingComponents`, which can store up to `MAX_BLOBS_PER_BLOCK` blobs each.
42+ ///
43+ /// * Deneb blobs are 128 kb each and are stored in the form of `BlobSidecar`.
44+ /// * From Fulu (PeerDAS), blobs are erasure-coded and are 256 kb each, stored in the form of 128 `DataColumnSidecar`s.
45+ ///
46+ /// With `MAX_BLOBS_PER_BLOCK` = 48 (expected in the next year), the maximum size of data columns
47+ /// in `PendingComponents` is ~12.29 MB. Setting this to 64 means the maximum size of the cache is
48+ /// approximately 0.8 GB.
49+ ///
50+ /// Under normal conditions, the cache should only store the current pending block, but could
51+ /// occasionally spike to 2-4 for various reasons e.g. components arriving late, but would very
52+ /// rarely go above this, unless there are many concurrent forks.
53+ pub const OVERFLOW_LRU_CAPACITY : NonZeroUsize = new_non_zero_usize ( 64 ) ;
54+ pub const STATE_LRU_CAPACITY_NON_ZERO : NonZeroUsize = new_non_zero_usize ( 32 ) ;
4955pub const STATE_LRU_CAPACITY : usize = STATE_LRU_CAPACITY_NON_ZERO . get ( ) ;
5056
5157/// Cache to hold fully valid data that can't be imported to fork-choice yet. After Dencun hard-fork
@@ -76,7 +82,7 @@ pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
7682 availability_cache : Arc < DataAvailabilityCheckerInner < T > > ,
7783 slot_clock : T :: SlotClock ,
7884 kzg : Arc < Kzg > ,
79- custody_context : Arc < CustodyContext > ,
85+ custody_context : Arc < CustodyContext < T :: EthSpec > > ,
8086 spec : Arc < ChainSpec > ,
8187}
8288
@@ -114,7 +120,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
114120 slot_clock : T :: SlotClock ,
115121 kzg : Arc < Kzg > ,
116122 store : BeaconStore < T > ,
117- custody_context : Arc < CustodyContext > ,
123+ custody_context : Arc < CustodyContext < T :: EthSpec > > ,
118124 spec : Arc < ChainSpec > ,
119125 ) -> Result < Self , AvailabilityCheckError > {
120126 let inner = DataAvailabilityCheckerInner :: new (
@@ -132,8 +138,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
132138 } )
133139 }
134140
135- pub fn custody_context ( & self ) -> Arc < CustodyContext > {
136- self . custody_context . clone ( )
141+ pub fn custody_context ( & self ) -> & Arc < CustodyContext < T :: EthSpec > > {
142+ & self . custody_context
137143 }
138144
139145 /// Checks if the block root is currenlty in the availability cache awaiting import because
@@ -235,15 +241,25 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
235241 pub fn put_rpc_custody_columns (
236242 & self ,
237243 block_root : Hash256 ,
244+ slot : Slot ,
238245 custody_columns : DataColumnSidecarList < T :: EthSpec > ,
239246 ) -> Result < Availability < T :: EthSpec > , AvailabilityCheckError > {
240247 // Attributes fault to the specific peer that sent an invalid column
241248 let kzg_verified_columns =
242249 KzgVerifiedDataColumn :: from_batch_with_scoring ( custody_columns, & self . kzg )
243250 . map_err ( AvailabilityCheckError :: InvalidColumn ) ?;
244251
252+ // Filter out columns that aren't required for custody for this slot
253+ // This is required because `data_columns_by_root` requests the **latest** CGC that _may_
254+ // not be yet effective for data availability check, as CGC changes are only effecive from
255+ // a new epoch.
256+ let epoch = slot. epoch ( T :: EthSpec :: slots_per_epoch ( ) ) ;
257+ let sampling_columns = self
258+ . custody_context
259+ . sampling_columns_for_epoch ( epoch, & self . spec ) ;
245260 let verified_custody_columns = kzg_verified_columns
246261 . into_iter ( )
262+ . filter ( |col| sampling_columns. contains ( & col. index ( ) ) )
247263 . map ( KzgVerifiedCustodyDataColumn :: from_asserted_custody)
248264 . collect :: < Vec < _ > > ( ) ;
249265
@@ -291,10 +307,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
291307 > (
292308 & self ,
293309 block_root : Hash256 ,
310+ slot : Slot ,
294311 data_columns : I ,
295312 ) -> Result < Availability < T :: EthSpec > , AvailabilityCheckError > {
313+ let epoch = slot. epoch ( T :: EthSpec :: slots_per_epoch ( ) ) ;
314+ let sampling_columns = self
315+ . custody_context
316+ . sampling_columns_for_epoch ( epoch, & self . spec ) ;
296317 let custody_columns = data_columns
297318 . into_iter ( )
319+ . filter ( |col| sampling_columns. contains ( & col. index ( ) ) )
298320 . map ( |c| KzgVerifiedCustodyDataColumn :: from_asserted_custody ( c. into_inner ( ) ) )
299321 . collect :: < Vec < _ > > ( ) ;
300322
@@ -811,3 +833,207 @@ impl<E: EthSpec> MaybeAvailableBlock<E> {
811833 }
812834 }
813835}
836+
837+ #[ cfg( test) ]
838+ mod test {
839+ use super :: * ;
840+ use crate :: test_utils:: {
841+ generate_rand_block_and_data_columns, get_kzg, EphemeralHarnessType , NumBlobs ,
842+ } ;
843+ use crate :: CustodyContext ;
844+ use rand:: prelude:: StdRng ;
845+ use rand:: seq:: SliceRandom ;
846+ use rand:: SeedableRng ;
847+ use slot_clock:: { SlotClock , TestingSlotClock } ;
848+ use std:: collections:: HashSet ;
849+ use std:: sync:: Arc ;
850+ use std:: time:: Duration ;
851+ use store:: HotColdDB ;
852+ use types:: { ChainSpec , ColumnIndex , EthSpec , ForkName , MainnetEthSpec , Slot } ;
853+
854+ type E = MainnetEthSpec ;
855+ type T = EphemeralHarnessType < E > ;
856+
857+ /// Test to verify any extra RPC columns received that are not part of the "effective" CGC for
858+ /// the slot are excluded from import.
859+ #[ test]
860+ fn should_exclude_rpc_columns_not_required_for_sampling ( ) {
861+ // SETUP
862+ let spec = Arc :: new ( ForkName :: Fulu . make_genesis_spec ( E :: default_spec ( ) ) ) ;
863+ let mut rng = StdRng :: seed_from_u64 ( 0xDEADBEEF0BAD5EEDu64 ) ;
864+
865+ let da_checker = new_da_checker ( spec. clone ( ) ) ;
866+ let custody_context = & da_checker. custody_context ;
867+ let all_column_indices_ordered =
868+ init_custody_context_with_ordered_columns ( custody_context, & mut rng, & spec) ;
869+
870+ // GIVEN a single 32 ETH validator is attached slot 0
871+ let epoch = Epoch :: new ( 0 ) ;
872+ let validator_0 = 0 ;
873+ custody_context. register_validators (
874+ vec ! [ ( validator_0, 32_000_000_000 ) ] ,
875+ epoch. start_slot ( E :: slots_per_epoch ( ) ) ,
876+ & spec,
877+ ) ;
878+ assert_eq ! (
879+ custody_context. num_of_data_columns_to_sample( epoch, & spec) ,
880+ spec. validator_custody_requirement as usize ,
881+ "sampling size should be the minimal custody requirement == 8"
882+ ) ;
883+
884+ // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
885+ let validator_1 = 1 ;
886+ let cgc_change_slot = epoch. end_slot ( E :: slots_per_epoch ( ) ) ;
887+ custody_context. register_validators (
888+ vec ! [ ( validator_1, 32_000_000_000 * 9 ) ] ,
889+ cgc_change_slot,
890+ & spec,
891+ ) ;
892+ // AND custody columns (8) and any new extra columns (2) are received via RPC responses.
893+ // NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown.
894+ let ( _, data_columns) = generate_rand_block_and_data_columns :: < E > (
895+ ForkName :: Fulu ,
896+ NumBlobs :: Number ( 1 ) ,
897+ & mut rng,
898+ & spec,
899+ ) ;
900+ let block_root = Hash256 :: random ( ) ;
901+ let requested_columns = & all_column_indices_ordered[ ..10 ] ;
902+ da_checker
903+ . put_rpc_custody_columns (
904+ block_root,
905+ cgc_change_slot,
906+ data_columns
907+ . into_iter ( )
908+ . filter ( |d| requested_columns. contains ( & d. index ) )
909+ . collect ( ) ,
910+ )
911+ . expect ( "should put rpc custody columns" ) ;
912+
913+ // THEN the sampling size for the end slot of the same epoch remains unchanged
914+ let sampling_columns = custody_context. sampling_columns_for_epoch ( epoch, & spec) ;
915+ assert_eq ! (
916+ sampling_columns. len( ) ,
917+ spec. validator_custody_requirement as usize // 8
918+ ) ;
919+ // AND any extra columns received via RPC responses are excluded from import.
920+ let actual_cached: HashSet < ColumnIndex > = da_checker
921+ . cached_data_column_indexes ( & block_root)
922+ . expect ( "should have cached data columns" )
923+ . into_iter ( )
924+ . collect ( ) ;
925+ let expected_sampling_columns = sampling_columns. iter ( ) . copied ( ) . collect :: < HashSet < _ > > ( ) ;
926+ assert_eq ! (
927+ actual_cached, expected_sampling_columns,
928+ "should cache only the effective sampling columns"
929+ ) ;
930+ assert ! (
931+ actual_cached. len( ) < requested_columns. len( ) ,
932+ "extra columns should be excluded"
933+ )
934+ }
935+
936+ /// Test to verify any extra gossip columns received that are not part of the "effective" CGC for
937+ /// the slot are excluded from import.
938+ #[ test]
939+ fn should_exclude_gossip_columns_not_required_for_sampling ( ) {
940+ // SETUP
941+ let spec = Arc :: new ( ForkName :: Fulu . make_genesis_spec ( E :: default_spec ( ) ) ) ;
942+ let mut rng = StdRng :: seed_from_u64 ( 0xDEADBEEF0BAD5EEDu64 ) ;
943+
944+ let da_checker = new_da_checker ( spec. clone ( ) ) ;
945+ let custody_context = & da_checker. custody_context ;
946+ let all_column_indices_ordered =
947+ init_custody_context_with_ordered_columns ( custody_context, & mut rng, & spec) ;
948+
949+ // GIVEN a single 32 ETH validator is attached slot 0
950+ let epoch = Epoch :: new ( 0 ) ;
951+ let validator_0 = 0 ;
952+ custody_context. register_validators (
953+ vec ! [ ( validator_0, 32_000_000_000 ) ] ,
954+ epoch. start_slot ( E :: slots_per_epoch ( ) ) ,
955+ & spec,
956+ ) ;
957+ assert_eq ! (
958+ custody_context. num_of_data_columns_to_sample( epoch, & spec) ,
959+ spec. validator_custody_requirement as usize ,
960+ "sampling size should be the minimal custody requirement == 8"
961+ ) ;
962+
963+ // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
964+ let validator_1 = 1 ;
965+ let cgc_change_slot = epoch. end_slot ( E :: slots_per_epoch ( ) ) ;
966+ custody_context. register_validators (
967+ vec ! [ ( validator_1, 32_000_000_000 * 9 ) ] ,
968+ cgc_change_slot,
969+ & spec,
970+ ) ;
971+ // AND custody columns (8) and any new extra columns (2) are received via gossip.
972+ // NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to
973+ // arrive via gossip.
974+ let ( _, data_columns) = generate_rand_block_and_data_columns :: < E > (
975+ ForkName :: Fulu ,
976+ NumBlobs :: Number ( 1 ) ,
977+ & mut rng,
978+ & spec,
979+ ) ;
980+ let block_root = Hash256 :: random ( ) ;
981+ let requested_columns = & all_column_indices_ordered[ ..10 ] ;
982+ let gossip_columns = data_columns
983+ . into_iter ( )
984+ . filter ( |d| requested_columns. contains ( & d. index ) )
985+ . map ( GossipVerifiedDataColumn :: < T > :: __new_for_testing)
986+ . collect :: < Vec < _ > > ( ) ;
987+ da_checker
988+ . put_gossip_verified_data_columns ( block_root, cgc_change_slot, gossip_columns)
989+ . expect ( "should put gossip custody columns" ) ;
990+
991+ // THEN the sampling size for the end slot of the same epoch remains unchanged
992+ let sampling_columns = custody_context. sampling_columns_for_epoch ( epoch, & spec) ;
993+ assert_eq ! (
994+ sampling_columns. len( ) ,
995+ spec. validator_custody_requirement as usize // 8
996+ ) ;
997+ // AND any extra columns received via gossip responses are excluded from import.
998+ let actual_cached: HashSet < ColumnIndex > = da_checker
999+ . cached_data_column_indexes ( & block_root)
1000+ . expect ( "should have cached data columns" )
1001+ . into_iter ( )
1002+ . collect ( ) ;
1003+ let expected_sampling_columns = sampling_columns. iter ( ) . copied ( ) . collect :: < HashSet < _ > > ( ) ;
1004+ assert_eq ! (
1005+ actual_cached, expected_sampling_columns,
1006+ "should cache only the effective sampling columns"
1007+ ) ;
1008+ assert ! (
1009+ actual_cached. len( ) < requested_columns. len( ) ,
1010+ "extra columns should be excluded"
1011+ )
1012+ }
1013+
1014+ fn init_custody_context_with_ordered_columns (
1015+ custody_context : & Arc < CustodyContext < E > > ,
1016+ mut rng : & mut StdRng ,
1017+ spec : & ChainSpec ,
1018+ ) -> Vec < u64 > {
1019+ let mut all_data_columns = ( 0 ..spec. number_of_custody_groups ) . collect :: < Vec < _ > > ( ) ;
1020+ all_data_columns. shuffle ( & mut rng) ;
1021+ custody_context
1022+ . init_ordered_data_columns_from_custody_groups ( all_data_columns. clone ( ) , spec)
1023+ . expect ( "should initialise ordered custody columns" ) ;
1024+ all_data_columns
1025+ }
1026+
1027+ fn new_da_checker ( spec : Arc < ChainSpec > ) -> DataAvailabilityChecker < T > {
1028+ let slot_clock = TestingSlotClock :: new (
1029+ Slot :: new ( 0 ) ,
1030+ Duration :: from_secs ( 0 ) ,
1031+ Duration :: from_secs ( spec. seconds_per_slot ) ,
1032+ ) ;
1033+ let kzg = get_kzg ( & spec) ;
1034+ let store = Arc :: new ( HotColdDB :: open_ephemeral ( <_ >:: default ( ) , spec. clone ( ) ) . unwrap ( ) ) ;
1035+ let custody_context = Arc :: new ( CustodyContext :: new ( false ) ) ;
1036+ DataAvailabilityChecker :: new ( slot_clock, kzg, store, custody_context, spec)
1037+ . expect ( "should initialise data availability checker" )
1038+ }
1039+ }
0 commit comments