@@ -2,6 +2,8 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
22use beacon_processor:: work_reprocessing_queue:: ReprocessQueueMessage ;
33use futures:: channel:: mpsc:: Receiver ;
44use futures:: StreamExt ;
5+ use lighthouse_network:: PubsubMessage ;
6+ use network:: { NetworkMessage , NetworkSenders } ;
57use slog:: { error, Logger } ;
68use tokio:: sync:: mpsc:: Sender ;
79
@@ -11,10 +13,11 @@ use tokio::sync::mpsc::Sender;
1113// take a few milliseconds. 32 is a small enough arbitrary number.
1214pub ( crate ) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY : usize = 32 ;
1315
14- pub async fn compute_light_client_updates < T : BeaconChainTypes > (
16+ pub async fn compute_and_gossip_light_client_updates < T : BeaconChainTypes > (
1517 chain : & BeaconChain < T > ,
1618 mut light_client_server_rv : Receiver < LightClientProducerEvent < T :: EthSpec > > ,
1719 reprocess_tx : Sender < ReprocessQueueMessage > ,
20+ network_senders : Option < NetworkSenders < T :: EthSpec > > ,
1821 log : & Logger ,
1922) {
2023 // Should only receive events for recent blocks, import_block filters by blocks close to clock.
@@ -25,12 +28,53 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
2528 while let Some ( event) = light_client_server_rv. next ( ) . await {
2629 let parent_root = event. 0 ;
2730
28- chain
29- . recompute_and_cache_light_client_updates ( event)
30- . unwrap_or_else ( |e| {
31- error ! ( log, "error computing light_client updates {:?}" , e) ;
32- } ) ;
33-
31+ match chain. recompute_and_cache_light_client_updates ( event) {
32+ Err ( e) => error ! ( log, "error computing light_client updates {:?}" , e) ,
33+ Ok ( ( ) ) => {
34+ // gossip the latest computed light client updates
35+ if let (
36+ Some ( network_tx) ,
37+ Some ( latest_finality_update) ,
38+ Some ( latest_optimistic_update) ,
39+ ) = (
40+ network_senders. as_ref ( ) . map ( |n| n. network_send ( ) ) ,
41+ chain. light_client_server_cache . get_latest_finality_update ( ) ,
42+ chain
43+ . light_client_server_cache
44+ . get_latest_optimistic_update ( ) ,
45+ ) {
46+ let _ = network_tx
47+ . send ( NetworkMessage :: Publish {
48+ messages : vec ! [ PubsubMessage :: LightClientFinalityUpdate ( Box :: new(
49+ latest_finality_update,
50+ ) ) ] ,
51+ } )
52+ . inspect_err ( |e| {
53+ error ! ( log, "error publishing light_client finality update {:?}" , e)
54+ } ) ;
55+ let _ = network_tx
56+ . send ( NetworkMessage :: Publish {
57+ messages : vec ! [ PubsubMessage :: LightClientOptimisticUpdate ( Box :: new(
58+ latest_optimistic_update,
59+ ) ) ] ,
60+ } )
61+ . inspect_err ( |e| {
62+ error ! (
63+ log,
64+ "error publishing light_client optimistic update {:?}" , e
65+ )
66+ } ) ;
67+ } else {
68+ error ! (
69+ log,
70+ "couldn't publish light_client updates" ;
71+ "network_senders_avail" => network_senders. is_some( ) ,
72+ "finality_update_avail" => chain. light_client_server_cache. get_latest_finality_update( ) . is_some( ) ,
73+ "optimistic_update_avail" => chain. light_client_server_cache. get_latest_optimistic_update( ) . is_some( ) ,
74+ )
75+ }
76+ }
77+ } ;
3478 let msg = ReprocessQueueMessage :: NewLightClientOptimisticUpdate { parent_root } ;
3579 if reprocess_tx. try_send ( msg) . is_err ( ) {
3680 error ! ( log, "Failed to inform light client update" ; "parent_root" => %parent_root)
0 commit comments