1//! A task that gossips newly verified [`block::Hash`]es to peers.
2//!
3//! [`block::Hash`]: zebra_chain::block::Hash
45use futures::TryFutureExt;
6use thiserror::Error;
7use tokio::sync::watch;
8use tower::{timeout::Timeout, Service, ServiceExt};
9use tracing::Instrument;
1011use zebra_chain::block;
12use zebra_network as zn;
13use zebra_state::ChainTipChange;
1415use crate::{
16 components::sync::{SyncStatus, PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
17 BoxError,
18};
1920use BlockGossipError::*;
2122/// Errors that can occur when gossiping committed blocks
23#[derive(Error, Debug)]
24pub enum BlockGossipError {
25#[error("chain tip sender was dropped")]
26TipChange(watch::error::RecvError),
2728#[error("sync status sender was dropped")]
29SyncStatus(watch::error::RecvError),
3031#[error("permanent peer set failure")]
32PeerSetReadiness(zn::BoxError),
33}
3435/// Run continuously, gossiping newly verified [`block::Hash`]es to peers.
36///
37/// Once the state has reached the chain tip, broadcast the [`block::Hash`]es
38/// of newly verified blocks to all ready peers.
39///
40/// Blocks are only gossiped if they are:
41/// - on the best chain, and
42/// - the most recent block verified since the last gossip.
43///
44/// In particular, if a lot of blocks are committed at the same time,
45/// gossips will be disabled or skipped until the state reaches the latest tip.
46///
47/// [`block::Hash`]: zebra_chain::block::Hash
48pub async fn gossip_best_tip_block_hashes<ZN>(
49 sync_status: SyncStatus,
50mut chain_state: ChainTipChange,
51 broadcast_network: ZN,
52mut mined_block_receiver: Option<watch::Receiver<(block::Hash, block::Height)>>,
53) -> Result<(), BlockGossipError>
54where
55ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
56 ZN::Future: Send,
57{
58info!("initializing block gossip task");
5960// use the same timeout as tips requests,
61 // so broadcasts don't delay the syncer too long
62let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
6364loop {
65let mut sync_status = sync_status.clone();
66let mut chain_tip = chain_state.clone();
67let tip_change_close_to_network_tip_fut = async move {
68// wait for at least one tip change, to make sure we have a new block hash to broadcast
69let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;
7071// wait until we're close to the tip, because broadcasts are only useful for nodes near the tip
72 // (if they're a long way from the tip, they use the syncer and block locators), unless a mined block
73 // hash is received before `wait_until_close_to_tip()` is ready.
74sync_status
75 .wait_until_close_to_tip()
76 .map_err(SyncStatus)
77 .await?;
7879// get the latest tip change when close to tip - it might be different to the change we awaited,
80 // because the syncer might take a long time to reach the tip
81let best_tip = chain_tip
82 .last_tip_change()
83 .unwrap_or(tip_action)
84 .best_tip_hash_and_height();
8586Ok((best_tip, "sending committed block broadcast", chain_tip))
87 }
88 .in_current_span();
8990let ((hash, height), log_msg, updated_chain_state) = if let Some(mined_block_receiver) =
91 mined_block_receiver.as_mut()
92 {
93tokio::select! {
94 tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
95 mined_block_receiver.mark_unchanged();
96 tip_change_close_to_network_tip?
97},
9899Ok(_) = mined_block_receiver.changed() => {
100// we have a new block to broadcast from the `submitblock `RPC method, get block data and release the channel.
101(*mined_block_receiver.borrow_and_update(), "sending mined block broadcast", chain_state)
102 }
103 }
104 } else {
105 tip_change_close_to_network_tip_fut.await?
106};
107108 chain_state = updated_chain_state;
109110// block broadcasts inform other nodes about new blocks,
111 // so our internal Grow or Reset state doesn't matter to them
112let request = zn::Request::AdvertiseBlock(hash);
113114info!(?height, ?request, log_msg);
115// broadcast requests don't return errors, and we'd just want to ignore them anyway
116let _ = broadcast_network
117 .ready()
118 .await
119.map_err(PeerSetReadiness)?
120.call(request)
121 .await;
122123// wait for at least the network timeout between gossips
124 //
125 // in practice, we expect blocks to arrive approximately every 75 seconds,
126 // so waiting 6 seconds won't make much difference
127tokio::time::sleep(PEER_GOSSIP_DELAY).await;
128 }
129}