zebrad/components/sync/
gossip.rs1use futures::TryFutureExt;
6use thiserror::Error;
7use tokio::sync::watch;
8use tower::{timeout::Timeout, Service, ServiceExt};
9use tracing::Instrument;
10
11use zebra_chain::block;
12use zebra_network as zn;
13use zebra_state::ChainTipChange;
14
15use crate::{
16 components::sync::{SyncStatus, PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
17 BoxError,
18};
19
20use BlockGossipError::*;
21
22#[derive(Error, Debug)]
24pub enum BlockGossipError {
25 #[error("chain tip sender was dropped")]
26 TipChange(watch::error::RecvError),
27
28 #[error("sync status sender was dropped")]
29 SyncStatus(watch::error::RecvError),
30
31 #[error("permanent peer set failure")]
32 PeerSetReadiness(zn::BoxError),
33}
34
35pub async fn gossip_best_tip_block_hashes<ZN>(
49 sync_status: SyncStatus,
50 mut chain_state: ChainTipChange,
51 broadcast_network: ZN,
52 mut mined_block_receiver: Option<watch::Receiver<(block::Hash, block::Height)>>,
53) -> Result<(), BlockGossipError>
54where
55 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
56 ZN::Future: Send,
57{
58 info!("initializing block gossip task");
59
60 let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
63
64 loop {
65 let mut sync_status = sync_status.clone();
66 let mut chain_tip = chain_state.clone();
67 let tip_change_close_to_network_tip_fut = async move {
68 let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;
70
71 sync_status
75 .wait_until_close_to_tip()
76 .map_err(SyncStatus)
77 .await?;
78
79 let best_tip = chain_tip
82 .last_tip_change()
83 .unwrap_or(tip_action)
84 .best_tip_hash_and_height();
85
86 Ok((best_tip, "sending committed block broadcast", chain_tip))
87 }
88 .in_current_span();
89
90 let ((hash, height), log_msg, updated_chain_state) = if let Some(mined_block_receiver) =
91 mined_block_receiver.as_mut()
92 {
93 tokio::select! {
94 tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
95 mined_block_receiver.mark_unchanged();
96 tip_change_close_to_network_tip?
97 },
98
99 Ok(_) = mined_block_receiver.changed() => {
100 (*mined_block_receiver.borrow_and_update(), "sending mined block broadcast", chain_state)
102 }
103 }
104 } else {
105 tip_change_close_to_network_tip_fut.await?
106 };
107
108 chain_state = updated_chain_state;
109
110 let request = zn::Request::AdvertiseBlock(hash);
113
114 info!(?height, ?request, log_msg);
115 let _ = broadcast_network
117 .ready()
118 .await
119 .map_err(PeerSetReadiness)?
120 .call(request)
121 .await;
122
123 tokio::time::sleep(PEER_GOSSIP_DELAY).await;
128 }
129}