Skip to content

Commit b3b3756

Browse files
committed
Fix tests
1 parent e3aed89 commit b3b3756

File tree

1 file changed

+78
-6
lines changed
  • beacon_node/network/src/sync/tests

1 file changed

+78
-6
lines changed

beacon_node/network/src/sync/tests/range.rs

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lighthouse_network::rpc::methods::{
1616
};
1717
use lighthouse_network::service::api_types::{
1818
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
19-
SyncRequestId,
19+
DataColumnsByRootRequestId, SyncRequestId,
2020
};
2121
use lighthouse_network::{PeerId, SyncInfo};
2222
use std::time::Duration;
@@ -36,6 +36,7 @@ enum ByRangeDataRequestIds {
3636
PreDeneb,
3737
PrePeerDAS(BlobsByRangeRequestId, PeerId),
3838
PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>),
39+
PostPeerDASByRoot(Vec<(DataColumnsByRootRequestId, PeerId)>),
3940
}
4041

4142
/// Sync tests are usually written in the form:
@@ -233,7 +234,8 @@ impl TestRig {
233234
});
234235

235236
let by_range_data_requests = if self.after_fulu() {
236-
let mut data_columns_requests = vec![];
237+
// First check for DataColumnsByRange requests (old paradigm)
238+
let mut data_columns_range_requests = vec![];
237239
while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev {
238240
NetworkMessage::SendRequest {
239241
peer_id,
@@ -245,12 +247,34 @@ impl TestRig {
245247
} if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)),
246248
_ => None,
247249
}) {
248-
data_columns_requests.push(data_columns_request);
250+
data_columns_range_requests.push(data_columns_request);
249251
}
250-
if data_columns_requests.is_empty() {
251-
panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}");
252+
253+
// If we found range requests, use the `ByRangeRequestType::BlocksAndColumns` paradigm
254+
if !data_columns_range_requests.is_empty() {
255+
ByRangeDataRequestIds::PostPeerDAS(data_columns_range_requests)
256+
} else {
257+
// Try to find the byroot requests associated with the `ByRangeRequestType::BlocksAndColumnsSeparate`
258+
let mut data_columns_root_requests = vec![];
259+
while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev {
260+
NetworkMessage::SendRequest {
261+
peer_id,
262+
request: RequestType::DataColumnsByRoot(_),
263+
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
264+
} => Some((*id, *peer_id)),
265+
_ => None,
266+
}) {
267+
data_columns_root_requests.push(data_columns_request);
268+
}
269+
270+
if !data_columns_root_requests.is_empty() {
271+
ByRangeDataRequestIds::PostPeerDASByRoot(data_columns_root_requests)
272+
} else {
273+
// No data column requests found - this is expected for the new paradigm
274+
// since DataColumnsByRoot requests are sent after blocks are received
275+
ByRangeDataRequestIds::PostPeerDASByRoot(vec![])
276+
}
252277
}
253-
ByRangeDataRequestIds::PostPeerDAS(data_columns_requests)
254278
} else if self.after_deneb() {
255279
let (id, peer) = self
256280
.pop_received_network_event(|ev| match ev {
@@ -318,11 +342,54 @@ impl TestRig {
318342
});
319343
}
320344
}
345+
ByRangeDataRequestIds::PostPeerDASByRoot(data_column_req_ids) => {
346+
// Complete the DataColumnsByRoot requests with stream termination
347+
for (id, peer_id) in data_column_req_ids {
348+
self.log(&format!(
349+
"Completing DataColumnsByRoot request {id:?} with empty stream"
350+
));
351+
self.send_sync_message(SyncMessage::RpcDataColumn {
352+
sync_request_id: SyncRequestId::DataColumnsByRoot(id),
353+
peer_id,
354+
data_column: None,
355+
seen_timestamp: D,
356+
});
357+
}
358+
}
321359
}
322360

323361
blocks_req_id.parent_request_id.requester
324362
}
325363

364+
fn find_and_complete_data_columns_by_root_requests(&mut self) {
365+
// In the new paradigm, DataColumnsByRoot requests are sent after blocks are received
366+
// We need to complete any pending DataColumnsByRoot requests
367+
let mut data_columns_root_requests = vec![];
368+
while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev {
369+
NetworkMessage::SendRequest {
370+
peer_id,
371+
request: RequestType::DataColumnsByRoot(_),
372+
app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRoot(id)),
373+
} => Some((*id, *peer_id)),
374+
_ => None,
375+
}) {
376+
data_columns_root_requests.push(data_columns_request);
377+
}
378+
379+
// Complete the DataColumnsByRoot requests
380+
for (id, peer_id) in data_columns_root_requests {
381+
self.log(&format!(
382+
"Completing DataColumnsByRoot request {id:?} with empty stream"
383+
));
384+
self.send_sync_message(SyncMessage::RpcDataColumn {
385+
sync_request_id: SyncRequestId::DataColumnsByRoot(id),
386+
peer_id,
387+
data_column: None,
388+
seen_timestamp: D,
389+
});
390+
}
391+
}
392+
326393
fn find_and_complete_processing_chain_segment(&mut self, id: ChainSegmentProcessId) {
327394
self.pop_received_processor_event(|ev| {
328395
(ev.work_type() == WorkType::ChainSegment).then_some(())
@@ -366,6 +433,11 @@ impl TestRig {
366433
};
367434

368435
self.find_and_complete_processing_chain_segment(id);
436+
437+
// In the new paradigm, DataColumnsByRoot requests are sent after blocks are processed
438+
// We need to complete any pending DataColumnsByRoot requests
439+
self.find_and_complete_data_columns_by_root_requests();
440+
369441
if epoch < last_epoch - 1 {
370442
self.assert_state(RangeSyncType::Finalized);
371443
} else {

0 commit comments

Comments
 (0)