zebrad/components/sync/
gossip.rs

1//! A task that gossips newly verified [`block::Hash`]es to peers.
2//!
3//! [`block::Hash`]: zebra_chain::block::Hash
4
5use futures::TryFutureExt;
6use thiserror::Error;
7use tokio::sync::watch;
8use tower::{timeout::Timeout, Service, ServiceExt};
9use tracing::Instrument;
10
11use zebra_chain::block;
12use zebra_network as zn;
13use zebra_state::ChainTipChange;
14
15use crate::{
16    components::sync::{SyncStatus, PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
17    BoxError,
18};
19
20use BlockGossipError::*;
21
22/// Errors that can occur when gossiping committed blocks
23#[derive(Error, Debug)]
24pub enum BlockGossipError {
25    #[error("chain tip sender was dropped")]
26    TipChange(watch::error::RecvError),
27
28    #[error("sync status sender was dropped")]
29    SyncStatus(watch::error::RecvError),
30
31    #[error("permanent peer set failure")]
32    PeerSetReadiness(zn::BoxError),
33}
34
35/// Run continuously, gossiping newly verified [`block::Hash`]es to peers.
36///
37/// Once the state has reached the chain tip, broadcast the [`block::Hash`]es
38/// of newly verified blocks to all ready peers.
39///
40/// Blocks are only gossiped if they are:
41/// - on the best chain, and
42/// - the most recent block verified since the last gossip.
43///
44/// In particular, if a lot of blocks are committed at the same time,
45/// gossips will be disabled or skipped until the state reaches the latest tip.
46///
47/// [`block::Hash`]: zebra_chain::block::Hash
48pub async fn gossip_best_tip_block_hashes<ZN>(
49    sync_status: SyncStatus,
50    mut chain_state: ChainTipChange,
51    broadcast_network: ZN,
52    mut mined_block_receiver: Option<watch::Receiver<(block::Hash, block::Height)>>,
53) -> Result<(), BlockGossipError>
54where
55    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
56    ZN::Future: Send,
57{
58    info!("initializing block gossip task");
59
60    // use the same timeout as tips requests,
61    // so broadcasts don't delay the syncer too long
62    let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
63
64    loop {
65        let mut sync_status = sync_status.clone();
66        let mut chain_tip = chain_state.clone();
67        let tip_change_close_to_network_tip_fut = async move {
68            // wait for at least one tip change, to make sure we have a new block hash to broadcast
69            let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;
70
71            // wait until we're close to the tip, because broadcasts are only useful for nodes near the tip
72            // (if they're a long way from the tip, they use the syncer and block locators), unless a mined block
73            // hash is received before `wait_until_close_to_tip()` is ready.
74            sync_status
75                .wait_until_close_to_tip()
76                .map_err(SyncStatus)
77                .await?;
78
79            // get the latest tip change when close to tip - it might be different to the change we awaited,
80            // because the syncer might take a long time to reach the tip
81            let best_tip = chain_tip
82                .last_tip_change()
83                .unwrap_or(tip_action)
84                .best_tip_hash_and_height();
85
86            Ok((best_tip, "sending committed block broadcast", chain_tip))
87        }
88        .in_current_span();
89
90        let ((hash, height), log_msg, updated_chain_state) = if let Some(mined_block_receiver) =
91            mined_block_receiver.as_mut()
92        {
93            tokio::select! {
94                tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
95                    mined_block_receiver.mark_unchanged();
96                    tip_change_close_to_network_tip?
97                },
98
99                Ok(_) = mined_block_receiver.changed() => {
100                    // we have a new block to broadcast from the `submitblock `RPC method, get block data and release the channel.
101                   (*mined_block_receiver.borrow_and_update(), "sending mined block broadcast", chain_state)
102                }
103            }
104        } else {
105            tip_change_close_to_network_tip_fut.await?
106        };
107
108        chain_state = updated_chain_state;
109
110        // block broadcasts inform other nodes about new blocks,
111        // so our internal Grow or Reset state doesn't matter to them
112        let request = zn::Request::AdvertiseBlock(hash);
113
114        info!(?height, ?request, log_msg);
115        // broadcast requests don't return errors, and we'd just want to ignore them anyway
116        let _ = broadcast_network
117            .ready()
118            .await
119            .map_err(PeerSetReadiness)?
120            .call(request)
121            .await;
122
123        // wait for at least the network timeout between gossips
124        //
125        // in practice, we expect blocks to arrive approximately every 75 seconds,
126        // so waiting 6 seconds won't make much difference
127        tokio::time::sleep(PEER_GOSSIP_DELAY).await;
128    }
129}