@@ -16,7 +16,7 @@ use lighthouse_network::rpc::methods::{
1616} ;
1717use lighthouse_network:: service:: api_types:: {
1818 AppRequestId , BlobsByRangeRequestId , BlocksByRangeRequestId , DataColumnsByRangeRequestId ,
19- SyncRequestId ,
19+ DataColumnsByRootRequestId , SyncRequestId ,
2020} ;
2121use lighthouse_network:: { PeerId , SyncInfo } ;
2222use std:: time:: Duration ;
@@ -36,6 +36,7 @@ enum ByRangeDataRequestIds {
3636 PreDeneb ,
3737 PrePeerDAS ( BlobsByRangeRequestId , PeerId ) ,
3838 PostPeerDAS ( Vec < ( DataColumnsByRangeRequestId , PeerId ) > ) ,
39+ PostPeerDASByRoot ( Vec < ( DataColumnsByRootRequestId , PeerId ) > ) ,
3940}
4041
4142/// Sync tests are usually written in the form:
@@ -233,7 +234,8 @@ impl TestRig {
233234 } ) ;
234235
235236 let by_range_data_requests = if self . after_fulu ( ) {
236- let mut data_columns_requests = vec ! [ ] ;
237+ // First check for DataColumnsByRange requests (old paradigm)
238+ let mut data_columns_range_requests = vec ! [ ] ;
237239 while let Ok ( data_columns_request) = self . pop_received_network_event ( |ev| match ev {
238240 NetworkMessage :: SendRequest {
239241 peer_id,
@@ -245,12 +247,34 @@ impl TestRig {
245247 } if filter_f ( * peer_id, * start_slot) => Some ( ( * id, * peer_id) ) ,
246248 _ => None ,
247249 } ) {
248- data_columns_requests . push ( data_columns_request) ;
250+ data_columns_range_requests . push ( data_columns_request) ;
249251 }
250- if data_columns_requests. is_empty ( ) {
251- panic ! ( "Found zero DataColumnsByRange requests, filter {request_filter:?}" ) ;
252+
253+ // If we found range requests, use the `ByRangeRequestType::BlocksAndColumns` paradigm
254+ if !data_columns_range_requests. is_empty ( ) {
255+ ByRangeDataRequestIds :: PostPeerDAS ( data_columns_range_requests)
256+ } else {
257+ // Try to find the byroot requests associated with the `ByRangeRequestType::BlocksAndColumnsSeparate`
258+ let mut data_columns_root_requests = vec ! [ ] ;
259+ while let Ok ( data_columns_request) = self . pop_received_network_event ( |ev| match ev {
260+ NetworkMessage :: SendRequest {
261+ peer_id,
262+ request : RequestType :: DataColumnsByRoot ( _) ,
263+ app_request_id : AppRequestId :: Sync ( SyncRequestId :: DataColumnsByRoot ( id) ) ,
264+ } => Some ( ( * id, * peer_id) ) ,
265+ _ => None ,
266+ } ) {
267+ data_columns_root_requests. push ( data_columns_request) ;
268+ }
269+
270+ if !data_columns_root_requests. is_empty ( ) {
271+ ByRangeDataRequestIds :: PostPeerDASByRoot ( data_columns_root_requests)
272+ } else {
273+ // No data column requests found - this is expected for the new paradigm
274+ // since DataColumnsByRoot requests are sent after blocks are received
275+ ByRangeDataRequestIds :: PostPeerDASByRoot ( vec ! [ ] )
276+ }
252277 }
253- ByRangeDataRequestIds :: PostPeerDAS ( data_columns_requests)
254278 } else if self . after_deneb ( ) {
255279 let ( id, peer) = self
256280 . pop_received_network_event ( |ev| match ev {
@@ -318,11 +342,54 @@ impl TestRig {
318342 } ) ;
319343 }
320344 }
345+ ByRangeDataRequestIds :: PostPeerDASByRoot ( data_column_req_ids) => {
346+ // Complete the DataColumnsByRoot requests with stream termination
347+ for ( id, peer_id) in data_column_req_ids {
348+ self . log ( & format ! (
349+ "Completing DataColumnsByRoot request {id:?} with empty stream"
350+ ) ) ;
351+ self . send_sync_message ( SyncMessage :: RpcDataColumn {
352+ sync_request_id : SyncRequestId :: DataColumnsByRoot ( id) ,
353+ peer_id,
354+ data_column : None ,
355+ seen_timestamp : D ,
356+ } ) ;
357+ }
358+ }
321359 }
322360
323361 blocks_req_id. parent_request_id . requester
324362 }
325363
364+ fn find_and_complete_data_columns_by_root_requests ( & mut self ) {
365+ // In the new paradigm, DataColumnsByRoot requests are sent after blocks are received
366+ // We need to complete any pending DataColumnsByRoot requests
367+ let mut data_columns_root_requests = vec ! [ ] ;
368+ while let Ok ( data_columns_request) = self . pop_received_network_event ( |ev| match ev {
369+ NetworkMessage :: SendRequest {
370+ peer_id,
371+ request : RequestType :: DataColumnsByRoot ( _) ,
372+ app_request_id : AppRequestId :: Sync ( SyncRequestId :: DataColumnsByRoot ( id) ) ,
373+ } => Some ( ( * id, * peer_id) ) ,
374+ _ => None ,
375+ } ) {
376+ data_columns_root_requests. push ( data_columns_request) ;
377+ }
378+
379+ // Complete the DataColumnsByRoot requests
380+ for ( id, peer_id) in data_columns_root_requests {
381+ self . log ( & format ! (
382+ "Completing DataColumnsByRoot request {id:?} with empty stream"
383+ ) ) ;
384+ self . send_sync_message ( SyncMessage :: RpcDataColumn {
385+ sync_request_id : SyncRequestId :: DataColumnsByRoot ( id) ,
386+ peer_id,
387+ data_column : None ,
388+ seen_timestamp : D ,
389+ } ) ;
390+ }
391+ }
392+
326393 fn find_and_complete_processing_chain_segment ( & mut self , id : ChainSegmentProcessId ) {
327394 self . pop_received_processor_event ( |ev| {
328395 ( ev. work_type ( ) == WorkType :: ChainSegment ) . then_some ( ( ) )
@@ -366,6 +433,11 @@ impl TestRig {
366433 } ;
367434
368435 self . find_and_complete_processing_chain_segment ( id) ;
436+
437+ // In the new paradigm, DataColumnsByRoot requests are sent after blocks are processed
438+ // We need to complete any pending DataColumnsByRoot requests
439+ self . find_and_complete_data_columns_by_root_requests ( ) ;
440+
369441 if epoch < last_epoch - 1 {
370442 self . assert_state ( RangeSyncType :: Finalized ) ;
371443 } else {
0 commit comments