Skip to content

Commit ee50628

Browse files
committed
Replace syncing chain child spans with events
1 parent fa3051a commit ee50628

File tree

1 file changed

+23
-12
lines changed
  • beacon_node/network/src/sync/range_sync

1 file changed

+23
-12
lines changed

beacon_node/network/src/sync/range_sync/chain.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
192192

193193
/// Removes a peer from the chain.
194194
/// If the peer has active batches, those are considered failed and re-requested.
195-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?peer_id))]
196195
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
196+
let _guard = self.span.clone().entered();
197+
debug!(peer = %peer_id, "Removing peer from chain");
197198
self.peers.remove(peer_id);
198199

199200
if self.peers.is_empty() {
@@ -213,7 +214,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
213214

214215
/// A block has been received for a batch on this chain.
215216
/// If the block correctly completes the batch it will be processed if possible.
216-
#[instrument(parent = &self.span, level="debug", skip_all)]
217217
pub fn on_block_response(
218218
&mut self,
219219
network: &mut SyncNetworkContext<T>,
@@ -222,6 +222,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
222222
request_id: Id,
223223
blocks: Vec<RpcBlock<T::EthSpec>>,
224224
) -> ProcessingResult {
225+
let _guard = self.span.clone().entered();
226+
debug!(peer = %peer_id, ?batch_id, blocks = blocks.len(), "RPC blocks received");
225227
// check if we have this batch
226228
let batch = match self.batches.get_mut(&batch_id) {
227229
None => {
@@ -260,7 +262,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
260262

261263
/// Processes the batch with the given id.
262264
/// The batch must exist and be ready for processing
263-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?batch_id))]
264265
fn process_batch(
265266
&mut self,
266267
network: &mut SyncNetworkContext<T>,
@@ -308,7 +309,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
308309
}
309310

310311
/// Processes the next ready batch, prioritizing optimistic batches over the processing target.
311-
#[instrument(parent = &self.span, level="debug", skip_all)]
312312
fn process_completed_batches(
313313
&mut self,
314314
network: &mut SyncNetworkContext<T>,
@@ -416,13 +416,14 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
416416

417417
/// The block processor has completed processing a batch. This function handles the result
418418
/// of the batch processor.
419-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?batch_id, ?result))]
420419
pub fn on_batch_process_result(
421420
&mut self,
422421
network: &mut SyncNetworkContext<T>,
423422
batch_id: BatchId,
424423
result: &BatchProcessResult,
425424
) -> ProcessingResult {
425+
let _guard = self.span.clone().entered();
426+
debug!(%batch_id, ?result, "Batch processing result");
426427
// the first two cases are possible if the chain advances while waiting for a processing
427428
// result
428429
let batch_state = self.visualize_batch_state();
@@ -761,20 +762,26 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
761762
}
762763

763764
pub fn stop_syncing(&mut self) {
765+
debug!(parent: &self.span, "Stopping syncing");
764766
self.state = ChainSyncingState::Stopped;
765767
}
766768

767769
/// Either a new chain, or an old one with a peer list
768770
/// This chain has been requested to start syncing.
769771
///
770772
/// This could be new chain, or an old chain that is being resumed.
771-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?local_finalized_epoch, ?optimistic_start_epoch))]
772773
pub fn start_syncing(
773774
&mut self,
774775
network: &mut SyncNetworkContext<T>,
775776
local_finalized_epoch: Epoch,
776777
optimistic_start_epoch: Epoch,
777778
) -> ProcessingResult {
779+
let _guard = self.span.clone().entered();
780+
debug!(
781+
?local_finalized_epoch,
782+
?optimistic_start_epoch,
783+
"Start syncing chain"
784+
);
778785
// to avoid dropping local progress, we advance the chain wrt its batch boundaries. This
779786
let align = |epoch| {
780787
// start_epoch + (number of batches in between)*length_of_batch
@@ -807,20 +814,20 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
807814
/// Add a peer to the chain.
808815
///
809816
/// If the chain is active, this starts requesting batches from this peer.
810-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?peer_id))]
811817
pub fn add_peer(
812818
&mut self,
813819
network: &mut SyncNetworkContext<T>,
814820
peer_id: PeerId,
815821
) -> ProcessingResult {
822+
let _guard = self.span.clone().entered();
823+
debug!(peer_id = %peer_id, "Adding peer to chain");
816824
self.peers.insert(peer_id);
817825
self.request_batches(network)
818826
}
819827

820828
/// An RPC error has occurred.
821829
///
822830
/// If the batch exists it is re-requested.
823-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?batch_id, ?peer_id, ?request_id))]
824831
pub fn inject_error(
825832
&mut self,
826833
network: &mut SyncNetworkContext<T>,
@@ -829,6 +836,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
829836
request_id: Id,
830837
err: RpcResponseError,
831838
) -> ProcessingResult {
839+
let _guard = self.span.clone().entered();
840+
debug!(%peer_id, ?batch_id, ?request_id, "An RPC error has occurred");
832841
let batch_state = self.visualize_batch_state();
833842
if let Some(batch) = self.batches.get_mut(&batch_id) {
834843
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
@@ -916,12 +925,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
916925
}
917926

918927
/// Requests the batch assigned to the given id from a given peer.
919-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?batch_id))]
920928
pub fn send_batch(
921929
&mut self,
922930
network: &mut SyncNetworkContext<T>,
923931
batch_id: BatchId,
924932
) -> ProcessingResult {
933+
let _guard = self.span.clone().entered();
934+
debug!(batch_epoch = %batch_id, "Requesting batch");
925935
let batch_state = self.visualize_batch_state();
926936
if let Some(batch) = self.batches.get_mut(&batch_id) {
927937
let (request, batch_type) = batch.to_blocks_by_range_request();
@@ -992,7 +1002,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
9921002
}
9931003

9941004
/// Retries partial column requests within the batch by creating new requests for the failed columns.
995-
#[instrument(parent = &self.span, level="debug", skip_all, fields(?batch_id, ?id, ?failed_columns))]
9961005
fn retry_partial_batch(
9971006
&mut self,
9981007
network: &mut SyncNetworkContext<T>,
@@ -1001,6 +1010,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10011010
failed_columns: HashSet<ColumnIndex>,
10021011
mut failed_peers: HashSet<PeerId>,
10031012
) -> ProcessingResult {
1013+
let _guard = self.span.clone().entered();
1014+
debug!(%batch_id, %id, ?failed_columns, "Retrying partial batch");
10041015
if let Some(batch) = self.batches.get_mut(&batch_id) {
10051016
failed_peers.extend(&batch.failed_peers());
10061017
let req = batch.to_blocks_by_range_request().0;
@@ -1045,11 +1056,12 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10451056

10461057
/// Kickstarts the chain by sending for processing batches that are ready and requesting more
10471058
/// batches if needed.
1048-
#[instrument(parent = &self.span, level="debug", skip_all)]
10491059
pub fn resume(
10501060
&mut self,
10511061
network: &mut SyncNetworkContext<T>,
10521062
) -> Result<KeepChain, RemoveChain> {
1063+
let _guard = self.span.clone().entered();
1064+
debug!("Resuming chain");
10531065
// Request more batches if needed.
10541066
self.request_batches(network)?;
10551067
// If there is any batch ready for processing, send it.
@@ -1058,7 +1070,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
10581070

10591071
/// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer
10601072
/// pool and left over batches until the batch buffer is reached or all peers are exhausted.
1061-
#[instrument(parent = &self.span, level="debug", skip_all)]
10621073
fn request_batches(&mut self, network: &mut SyncNetworkContext<T>) -> ProcessingResult {
10631074
if !matches!(self.state, ChainSyncingState::Syncing) {
10641075
return Ok(KeepChain);

0 commit comments

Comments
 (0)