zebrad/components/mempool/
crawler.rs

1//! Zebra Mempool crawler.
2//!
3//! The [`Crawler`] periodically requests transactions from peers in order to populate the mempool.
4//!
5//! Crawling only happens when the local node has synchronized the chain to be close to its tip. If
6//! synchronization is still happening at a fast rate, the crawler will stay disabled until it
7//! slows down.
8//!
9//! Once enabled, the crawler will periodically request [`FANOUT`] number of peers for transactions
10//! from the `peer_set` specified when it started. These crawl iterations occur at most once per
11//! [`RATE_LIMIT_DELAY`]. The received transaction IDs are forwarded to the `mempool` service so
12//! that they can be downloaded and included in the mempool.
13//!
14//! # Example
15//!
16//! ```
17//! use zebrad::components::mempool;
18//! #
19//! # use zebra_chain::parameters::Network;
20//! # use zebra_state::ChainTipSender;
21//! # use zebra_test::mock_service::MockService;
22//! # use zebrad::components::sync::SyncStatus;
23//! #
24//! # let runtime = tokio::runtime::Builder::new_current_thread()
25//! #     .enable_all()
26//! #     .build()
27//! #     .expect("Failed to create Tokio runtime");
28//! # let _guard = runtime.enter();
29//! #
30//! # let peer_set_service = MockService::build().for_unit_tests();
31//! # let mempool_service = MockService::build().for_unit_tests();
32//! # let (sync_status, _) = SyncStatus::new();
33//! # let (_, _, chain_tip_change) = ChainTipSender::new(None, &Network::Mainnet);
34//!
35//! let crawler_task = mempool::Crawler::spawn(
36//!     &mempool::Config::default(),
37//!     peer_set_service,
38//!     mempool_service,
39//!     sync_status,
40//!     chain_tip_change,
41//! );
42//!
43//! # // Won't actually crawl because the sender endpoint of `sync_status` was dropped immediately
44//! # // when it was created.
45//! # runtime.block_on(async move {
46//! crawler_task.await;
47//! # });
48//! ```
49
50use std::{collections::HashSet, time::Duration};
51
52use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
53use tokio::{
54    sync::watch,
55    task::JoinHandle,
56    time::{sleep, timeout},
57};
58use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
59use tracing_futures::Instrument;
60
61use zebra_chain::{block::Height, transaction::UnminedTxId};
62use zebra_network as zn;
63use zebra_node_services::mempool::Gossip;
64use zebra_state::ChainTipChange;
65
66use crate::components::{
67    mempool::{self, Config},
68    sync::SyncStatus,
69};
70
71#[cfg(test)]
72mod tests;
73
74/// The number of peers to request transactions from per crawl event.
75const FANOUT: usize = 3;
76
77/// The delay between crawl events.
78///
79/// This should be less than the target block interval,
80/// so that we crawl peer mempools at least once per block.
81///
82/// Using a prime number makes sure that mempool crawler fanouts
83/// don't synchronise with other crawls.
84pub const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);
85
86/// The time to wait for a peer response.
87///
88/// # Correctness
89///
90/// If this timeout is removed or set too high, the crawler may hang waiting for a peer to respond.
91///
92/// If this timeout is set too low, the crawler may fail to populate the mempool.
93const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
94
95/// The mempool transaction crawler.
96pub struct Crawler<PeerSet, Mempool> {
97    /// The network peer set to crawl.
98    peer_set: Timeout<PeerSet>,
99
100    /// The mempool service that receives crawled transaction IDs.
101    mempool: Mempool,
102
103    /// Allows checking if we are near the tip to enable/disable the mempool crawler.
104    sync_status: SyncStatus,
105
106    /// Notifies the crawler when the best chain tip height changes.
107    chain_tip_change: ChainTipChange,
108
109    /// If the state's best chain tip has reached this height, always enable the mempool crawler.
110    debug_enable_at_height: Option<Height>,
111}
112
113impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
114where
115    PeerSet:
116        Service<zn::Request, Response = zn::Response, Error = BoxError> + Clone + Send + 'static,
117    PeerSet::Future: Send,
118    Mempool:
119        Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
120    Mempool::Future: Send,
121{
122    /// Spawn an asynchronous task to run the mempool crawler.
123    pub fn spawn(
124        config: &Config,
125        peer_set: PeerSet,
126        mempool: Mempool,
127        sync_status: SyncStatus,
128        chain_tip_change: ChainTipChange,
129    ) -> JoinHandle<Result<(), BoxError>> {
130        let crawler = Crawler {
131            peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
132            mempool,
133            sync_status,
134            chain_tip_change,
135            debug_enable_at_height: config.debug_enable_at_height.map(Height),
136        };
137
138        tokio::spawn(crawler.run().in_current_span())
139    }
140
141    /// Waits until the mempool crawler is enabled by a debug config option.
142    ///
143    /// Returns an error if communication with the state is lost.
144    async fn wait_until_enabled_by_debug(&mut self) -> Result<(), watch::error::RecvError> {
145        // optimise non-debug performance
146        if self.debug_enable_at_height.is_none() {
147            return future::pending().await;
148        }
149
150        let enable_at_height = self
151            .debug_enable_at_height
152            .expect("unexpected debug_enable_at_height: just checked for None");
153
154        loop {
155            let best_tip_height = self
156                .chain_tip_change
157                .wait_for_tip_change()
158                .await?
159                .best_tip_height();
160
161            if best_tip_height >= enable_at_height {
162                return Ok(());
163            }
164        }
165    }
166
167    /// Waits until the mempool crawler is enabled.
168    ///
169    /// Returns an error if communication with the syncer or state is lost.
170    async fn wait_until_enabled(&mut self) -> Result<(), watch::error::RecvError> {
171        let mut sync_status = self.sync_status.clone();
172        let tip_future = sync_status.wait_until_close_to_tip();
173        let debug_future = self.wait_until_enabled_by_debug();
174
175        pin_mut!(tip_future);
176        pin_mut!(debug_future);
177
178        let (result, _unready_future) = future::select(tip_future, debug_future)
179            .await
180            .factor_first();
181
182        result
183    }
184
185    /// Periodically crawl peers for transactions to include in the mempool.
186    ///
187    /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
188    /// Zebra is shutting down.
189    pub async fn run(mut self) -> Result<(), BoxError> {
190        // This log is verbose during tests.
191        #[cfg(not(test))]
192        info!("initializing mempool crawler task");
193        #[cfg(test)]
194        debug!("initializing mempool crawler task");
195
196        loop {
197            self.wait_until_enabled().await?;
198            // Avoid hangs when the peer service is not ready, or due to bugs in async code.
199            timeout(RATE_LIMIT_DELAY, self.crawl_transactions())
200                .await
201                .unwrap_or_else(|timeout| {
202                    // Temporary errors just get logged and ignored.
203                    info!("mempool crawl timed out: {timeout:?}");
204                    Ok(())
205                })?;
206            sleep(RATE_LIMIT_DELAY).await;
207        }
208    }
209
210    /// Crawl peers for transactions.
211    ///
212    /// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
213    async fn crawl_transactions(&mut self) -> Result<(), BoxError> {
214        let peer_set = self.peer_set.clone();
215
216        trace!("Crawling for mempool transactions");
217
218        let mut requests = FuturesUnordered::new();
219        // get readiness for one peer at a time, to avoid peer set contention
220        for attempt in 0..FANOUT {
221            if attempt > 0 {
222                // Let other tasks run, so we're more likely to choose a different peer.
223                //
224                // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
225                tokio::task::yield_now().await;
226            }
227
228            let mut peer_set = peer_set.clone();
229            // end the task on permanent peer set errors
230            let peer_set = peer_set.ready().await?;
231
232            requests.push(peer_set.call(zn::Request::MempoolTransactionIds));
233        }
234
235        while let Some(result) = requests.next().await {
236            // log individual response errors
237            match result {
238                Ok(response) => self.handle_response(response).await?,
239                Err(error) => debug!("Failed to crawl peer for mempool transactions: {}", error),
240            }
241        }
242
243        Ok(())
244    }
245
246    /// Handle a peer's response to the crawler's request for transactions.
247    async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> {
248        let transaction_ids: HashSet<_> = match response {
249            zn::Response::TransactionIds(ids) => ids.into_iter().collect(),
250            _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
251        };
252
253        trace!(
254            "Mempool crawler received {} transaction IDs",
255            transaction_ids.len()
256        );
257
258        if !transaction_ids.is_empty() {
259            self.queue_transactions(transaction_ids).await?;
260        }
261
262        Ok(())
263    }
264
265    /// Forward the crawled transactions IDs to the mempool transaction downloader.
266    async fn queue_transactions(
267        &mut self,
268        transaction_ids: HashSet<UnminedTxId>,
269    ) -> Result<(), BoxError> {
270        let transaction_ids = transaction_ids.into_iter().map(Gossip::Id).collect();
271
272        let call_result = self
273            .mempool
274            .ready()
275            .await?
276            .call(mempool::Request::Queue(transaction_ids))
277            .await;
278
279        let queue_errors = match call_result {
280            Ok(mempool::Response::Queued(queue_results)) => {
281                queue_results.into_iter().filter_map(Result::err)
282            }
283            Ok(_) => unreachable!("Mempool did not respond with queue results to mempool crawler"),
284            Err(call_error) => {
285                debug!("Ignoring unexpected peer behavior: {}", call_error);
286                return Ok(());
287            }
288        };
289
290        for error in queue_errors {
291            debug!("Failed to download a crawled transaction: {}", error);
292        }
293
294        Ok(())
295    }
296}