Skip to content

Commit f99fef7

Browse files
committed
gossip light client data durnig sync committee contributions
1 parent 29b5614 commit f99fef7

File tree

5 files changed

+114
-54
lines changed

5 files changed

+114
-54
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2187,6 +2187,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
21872187
})
21882188
}
21892189

2190+
pub fn get_latest_finality_update(&self) -> Option<LightClientFinalityUpdate<T::EthSpec>> {
2191+
self.light_client_server_cache.get_latest_finality_update()
2192+
}
2193+
2194+
pub fn get_latest_optimistic_update(&self) -> Option<LightClientOptimisticUpdate<T::EthSpec>> {
2195+
self.light_client_server_cache
2196+
.get_latest_optimistic_update()
2197+
}
2198+
21902199
/// Accepts some 'LightClientFinalityUpdate' from the network and attempts to verify it
21912200
pub fn verify_finality_update_for_gossip(
21922201
self: &Arc<Self>,

beacon_node/beacon_chain/src/light_client_server_cache.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ pub struct LightClientServerCache<T: BeaconChainTypes> {
4040
latest_written_current_sync_committee: RwLock<Option<Arc<SyncCommittee<T::EthSpec>>>>,
4141
/// Caches state proofs by block root
4242
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData<T::EthSpec>>>,
43+
/// Tracks the latest broadcasted finality update
44+
latest_broadcasted_finality_update: RwLock<Option<LightClientFinalityUpdate<T::EthSpec>>>,
45+
/// Tracks the latest broadcasted optimistic update
46+
latest_broadcasted_optimistic_update: RwLock<Option<LightClientOptimisticUpdate<T::EthSpec>>>,
4347
}
4448

4549
impl<T: BeaconChainTypes> LightClientServerCache<T> {
@@ -49,6 +53,8 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
4953
latest_optimistic_update: None.into(),
5054
latest_light_client_update: None.into(),
5155
latest_written_current_sync_committee: None.into(),
56+
latest_broadcasted_finality_update: None.into(),
57+
latest_broadcasted_optimistic_update: None.into(),
5258
prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(),
5359
}
5460
}
@@ -333,10 +339,66 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
333339
Ok(new_value)
334340
}
335341

342+
pub fn should_broadcast_latest_finality_update(
343+
&self,
344+
) -> Option<LightClientFinalityUpdate<T::EthSpec>> {
345+
if let Some(latest_finality_update) = self.get_latest_finality_update() {
346+
let latest_broadcasted_finality_update = self
347+
.latest_broadcasted_finality_update
348+
.read()
349+
.clone()
350+
.take();
351+
match latest_broadcasted_finality_update {
352+
Some(latest_broadcasted_finality_update) => {
353+
if latest_broadcasted_finality_update != latest_finality_update {
354+
*self.latest_broadcasted_finality_update.write() =
355+
Some(latest_finality_update.clone());
356+
return Some(latest_finality_update);
357+
}
358+
}
359+
None => {
360+
*self.latest_broadcasted_finality_update.write() =
361+
Some(latest_finality_update.clone());
362+
return Some(latest_finality_update);
363+
}
364+
}
365+
}
366+
367+
return None;
368+
}
369+
336370
pub fn get_latest_finality_update(&self) -> Option<LightClientFinalityUpdate<T::EthSpec>> {
337371
self.latest_finality_update.read().clone()
338372
}
339373

374+
/// Checks if we've already broadcasted the latest optimistic update.
375+
/// If we haven't, update the `latest_broadcasted_optimistic_update` cache
376+
/// and return the latest optimistic update for broadcasting
377+
pub fn should_broadcast_latest_optimistic_update(
378+
&self,
379+
) -> Option<LightClientOptimisticUpdate<T::EthSpec>> {
380+
if let Some(latest_optimistic_update) = self.get_latest_optimistic_update() {
381+
let latest_broadcasted_optimistic_update =
382+
self.latest_optimistic_update.read().clone().take();
383+
match latest_broadcasted_optimistic_update {
384+
Some(latest_broadcasted_optimistic_update) => {
385+
if latest_broadcasted_optimistic_update != latest_optimistic_update {
386+
*self.latest_broadcasted_optimistic_update.write() =
387+
Some(latest_optimistic_update.clone());
388+
return Some(latest_optimistic_update);
389+
}
390+
}
391+
None => {
392+
*self.latest_broadcasted_optimistic_update.write() =
393+
Some(latest_optimistic_update.clone());
394+
return Some(latest_optimistic_update);
395+
}
396+
}
397+
}
398+
399+
return None;
400+
}
401+
340402
pub fn get_latest_optimistic_update(&self) -> Option<LightClientOptimisticUpdate<T::EthSpec>> {
341403
self.latest_optimistic_update.read().clone()
342404
}

beacon_node/client/src/builder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::compute_light_client_updates::{
2-
compute_and_gossip_light_client_updates, LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY,
2+
compute_light_client_updates, LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY,
33
};
44
use crate::config::{ClientGenesis, Config as ClientConfig};
55
use crate::notifier::spawn_notifier;
@@ -957,11 +957,10 @@ where
957957
let log = light_client_update_context.log().clone();
958958
light_client_update_context.executor.spawn(
959959
async move {
960-
compute_and_gossip_light_client_updates(
960+
compute_light_client_updates(
961961
&inner_chain,
962962
light_client_server_rv,
963963
beacon_processor_channels.work_reprocessing_tx,
964-
self.network_senders.clone(),
965964
&log,
966965
)
967966
.await

beacon_node/client/src/compute_light_client_updates.rs

Lines changed: 7 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
22
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
33
use futures::channel::mpsc::Receiver;
44
use futures::StreamExt;
5-
use lighthouse_network::PubsubMessage;
6-
use network::{NetworkMessage, NetworkSenders};
75
use slog::{error, Logger};
86
use tokio::sync::mpsc::Sender;
97

@@ -13,11 +11,10 @@ use tokio::sync::mpsc::Sender;
1311
// take a few milliseconds. 32 is a small enough arbitrary number.
1412
pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
1513

16-
pub async fn compute_and_gossip_light_client_updates<T: BeaconChainTypes>(
14+
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
1715
chain: &BeaconChain<T>,
1816
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
1917
reprocess_tx: Sender<ReprocessQueueMessage>,
20-
network_senders: Option<NetworkSenders<T::EthSpec>>,
2118
log: &Logger,
2219
) {
2320
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
@@ -28,53 +25,12 @@ pub async fn compute_and_gossip_light_client_updates<T: BeaconChainTypes>(
2825
while let Some(event) = light_client_server_rv.next().await {
2926
let parent_root = event.0;
3027

31-
match chain.recompute_and_cache_light_client_updates(event) {
32-
Err(e) => error!(log, "error computing light_client updates {:?}", e),
33-
Ok(()) => {
34-
// gossip the latest computed light client updates
35-
if let (
36-
Some(network_tx),
37-
Some(latest_finality_update),
38-
Some(latest_optimistic_update),
39-
) = (
40-
network_senders.as_ref().map(|n| n.network_send()),
41-
chain.light_client_server_cache.get_latest_finality_update(),
42-
chain
43-
.light_client_server_cache
44-
.get_latest_optimistic_update(),
45-
) {
46-
let _ = network_tx
47-
.send(NetworkMessage::Publish {
48-
messages: vec![PubsubMessage::LightClientFinalityUpdate(Box::new(
49-
latest_finality_update,
50-
))],
51-
})
52-
.inspect_err(|e| {
53-
error!(log, "error publishing light_client finality update {:?}", e)
54-
});
55-
let _ = network_tx
56-
.send(NetworkMessage::Publish {
57-
messages: vec![PubsubMessage::LightClientOptimisticUpdate(Box::new(
58-
latest_optimistic_update,
59-
))],
60-
})
61-
.inspect_err(|e| {
62-
error!(
63-
log,
64-
"error publishing light_client optimistic update {:?}", e
65-
)
66-
});
67-
} else {
68-
error!(
69-
log,
70-
"couldn't publish light_client updates";
71-
"network_senders_avail" => network_senders.is_some(),
72-
"finality_update_avail" => chain.light_client_server_cache.get_latest_finality_update().is_some(),
73-
"optimistic_update_avail" => chain.light_client_server_cache.get_latest_optimistic_update().is_some(),
74-
)
75-
}
76-
}
77-
};
28+
chain
29+
.recompute_and_cache_light_client_updates(event)
30+
.unwrap_or_else(|e| {
31+
error!(log, "error computing light_client updates {:?}", e);
32+
});
33+
7834
let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
7935
if reprocess_tx.try_send(msg).is_err() {
8036
error!(log, "Failed to inform light client update"; "parent_root" => %parent_root)

beacon_node/http_api/src/sync_committees.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,40 @@ pub fn process_signed_contribution_and_proofs<T: BeaconChainTypes>(
319319

320320
let seen_timestamp = timestamp_now();
321321

322+
if let Some(latest_optimistic_update) = chain
323+
.light_client_server_cache
324+
.should_broadcast_latest_optimistic_update()
325+
{
326+
let _ = publish_pubsub_message(
327+
&network_tx,
328+
PubsubMessage::LightClientOptimisticUpdate(Box::new(latest_optimistic_update)),
329+
)
330+
.inspect_err(|e| {
331+
error!(
332+
log,
333+
"Unable to broadcast latest light client optimistic update";
334+
"error" => ?e,
335+
);
336+
});
337+
};
338+
339+
if let Some(latest_finality_update) = chain
340+
.light_client_server_cache
341+
.should_broadcast_latest_finality_update()
342+
{
343+
let _ = publish_pubsub_message(
344+
&network_tx,
345+
PubsubMessage::LightClientFinalityUpdate(Box::new(latest_finality_update)),
346+
)
347+
.inspect_err(|e| {
348+
error!(
349+
log,
350+
"Unable to broadcast latest light client finality update";
351+
"error" => ?e,
352+
);
353+
});
354+
};
355+
322356
// Verify contributions & broadcast to the network.
323357
for (index, contribution) in signed_contribution_and_proofs.into_iter().enumerate() {
324358
let aggregator_index = contribution.message.aggregator_index;

0 commit comments

Comments
 (0)