zebrad/components/mempool/crawler.rs
1//! Zebra Mempool crawler.
2//!
3//! The [`Crawler`] periodically requests transactions from peers in order to populate the mempool.
4//!
5//! Crawling only happens when the local node has synchronized the chain to be close to its tip. If
6//! synchronization is still happening at a fast rate, the crawler will stay disabled until it
7//! slows down.
8//!
9//! Once enabled, the crawler will periodically request [`FANOUT`] number of peers for transactions
10//! from the `peer_set` specified when it started. These crawl iterations occur at most once per
11//! [`RATE_LIMIT_DELAY`]. The received transaction IDs are forwarded to the `mempool` service so
12//! that they can be downloaded and included in the mempool.
13//!
14//! # Example
15//!
16//! ```
17//! use zebrad::components::mempool;
18//! #
19//! # use zebra_chain::parameters::Network;
20//! # use zebra_state::ChainTipSender;
21//! # use zebra_test::mock_service::MockService;
22//! # use zebrad::components::sync::SyncStatus;
23//! #
24//! # let runtime = tokio::runtime::Builder::new_current_thread()
25//! # .enable_all()
26//! # .build()
27//! # .expect("Failed to create Tokio runtime");
28//! # let _guard = runtime.enter();
29//! #
30//! # let peer_set_service = MockService::build().for_unit_tests();
31//! # let mempool_service = MockService::build().for_unit_tests();
32//! # let (sync_status, _) = SyncStatus::new();
33//! # let (_, _, chain_tip_change) = ChainTipSender::new(None, &Network::Mainnet);
34//!
35//! let crawler_task = mempool::Crawler::spawn(
36//! &mempool::Config::default(),
37//! peer_set_service,
38//! mempool_service,
39//! sync_status,
40//! chain_tip_change,
41//! );
42//!
43//! # // Won't actually crawl because the sender endpoint of `sync_status` was dropped immediately
44//! # // when it was created.
45//! # runtime.block_on(async move {
46//! crawler_task.await;
47//! # });
48//! ```
49
50use std::{collections::HashSet, time::Duration};
51
52use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
53use tokio::{
54 sync::watch,
55 task::JoinHandle,
56 time::{sleep, timeout},
57};
58use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
59use tracing_futures::Instrument;
60
61use zebra_chain::{block::Height, transaction::UnminedTxId};
62use zebra_network as zn;
63use zebra_node_services::mempool::Gossip;
64use zebra_state::ChainTipChange;
65
66use crate::components::{
67 mempool::{self, Config},
68 sync::SyncStatus,
69};
70
71#[cfg(test)]
72mod tests;
73
74/// The number of peers to request transactions from per crawl event.
75const FANOUT: usize = 3;
76
77/// The delay between crawl events.
78///
79/// This should be less than the target block interval,
80/// so that we crawl peer mempools at least once per block.
81///
82/// Using a prime number makes sure that mempool crawler fanouts
83/// don't synchronise with other crawls.
84pub const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);
85
86/// The time to wait for a peer response.
87///
88/// # Correctness
89///
90/// If this timeout is removed or set too high, the crawler may hang waiting for a peer to respond.
91///
92/// If this timeout is set too low, the crawler may fail to populate the mempool.
93const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
94
95/// The mempool transaction crawler.
96pub struct Crawler<PeerSet, Mempool> {
97 /// The network peer set to crawl.
98 peer_set: Timeout<PeerSet>,
99
100 /// The mempool service that receives crawled transaction IDs.
101 mempool: Mempool,
102
103 /// Allows checking if we are near the tip to enable/disable the mempool crawler.
104 sync_status: SyncStatus,
105
106 /// Notifies the crawler when the best chain tip height changes.
107 chain_tip_change: ChainTipChange,
108
109 /// If the state's best chain tip has reached this height, always enable the mempool crawler.
110 debug_enable_at_height: Option<Height>,
111}
112
113impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
114where
115 PeerSet:
116 Service<zn::Request, Response = zn::Response, Error = BoxError> + Clone + Send + 'static,
117 PeerSet::Future: Send,
118 Mempool:
119 Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
120 Mempool::Future: Send,
121{
122 /// Spawn an asynchronous task to run the mempool crawler.
123 pub fn spawn(
124 config: &Config,
125 peer_set: PeerSet,
126 mempool: Mempool,
127 sync_status: SyncStatus,
128 chain_tip_change: ChainTipChange,
129 ) -> JoinHandle<Result<(), BoxError>> {
130 let crawler = Crawler {
131 peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
132 mempool,
133 sync_status,
134 chain_tip_change,
135 debug_enable_at_height: config.debug_enable_at_height.map(Height),
136 };
137
138 tokio::spawn(crawler.run().in_current_span())
139 }
140
141 /// Waits until the mempool crawler is enabled by a debug config option.
142 ///
143 /// Returns an error if communication with the state is lost.
144 async fn wait_until_enabled_by_debug(&mut self) -> Result<(), watch::error::RecvError> {
145 // optimise non-debug performance
146 if self.debug_enable_at_height.is_none() {
147 return future::pending().await;
148 }
149
150 let enable_at_height = self
151 .debug_enable_at_height
152 .expect("unexpected debug_enable_at_height: just checked for None");
153
154 loop {
155 let best_tip_height = self
156 .chain_tip_change
157 .wait_for_tip_change()
158 .await?
159 .best_tip_height();
160
161 if best_tip_height >= enable_at_height {
162 return Ok(());
163 }
164 }
165 }
166
167 /// Waits until the mempool crawler is enabled.
168 ///
169 /// Returns an error if communication with the syncer or state is lost.
170 async fn wait_until_enabled(&mut self) -> Result<(), watch::error::RecvError> {
171 let mut sync_status = self.sync_status.clone();
172 let tip_future = sync_status.wait_until_close_to_tip();
173 let debug_future = self.wait_until_enabled_by_debug();
174
175 pin_mut!(tip_future);
176 pin_mut!(debug_future);
177
178 let (result, _unready_future) = future::select(tip_future, debug_future)
179 .await
180 .factor_first();
181
182 result
183 }
184
185 /// Periodically crawl peers for transactions to include in the mempool.
186 ///
187 /// Runs until the [`SyncStatus`] loses its connection to the chain syncer, which happens when
188 /// Zebra is shutting down.
189 pub async fn run(mut self) -> Result<(), BoxError> {
190 // This log is verbose during tests.
191 #[cfg(not(test))]
192 info!("initializing mempool crawler task");
193 #[cfg(test)]
194 debug!("initializing mempool crawler task");
195
196 loop {
197 self.wait_until_enabled().await?;
198 // Avoid hangs when the peer service is not ready, or due to bugs in async code.
199 timeout(RATE_LIMIT_DELAY, self.crawl_transactions())
200 .await
201 .unwrap_or_else(|timeout| {
202 // Temporary errors just get logged and ignored.
203 info!("mempool crawl timed out: {timeout:?}");
204 Ok(())
205 })?;
206 sleep(RATE_LIMIT_DELAY).await;
207 }
208 }
209
210 /// Crawl peers for transactions.
211 ///
212 /// Concurrently request [`FANOUT`] peers for transactions to include in the mempool.
213 async fn crawl_transactions(&mut self) -> Result<(), BoxError> {
214 let peer_set = self.peer_set.clone();
215
216 trace!("Crawling for mempool transactions");
217
218 let mut requests = FuturesUnordered::new();
219 // get readiness for one peer at a time, to avoid peer set contention
220 for attempt in 0..FANOUT {
221 if attempt > 0 {
222 // Let other tasks run, so we're more likely to choose a different peer.
223 //
224 // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
225 tokio::task::yield_now().await;
226 }
227
228 let mut peer_set = peer_set.clone();
229 // end the task on permanent peer set errors
230 let peer_set = peer_set.ready().await?;
231
232 requests.push(peer_set.call(zn::Request::MempoolTransactionIds));
233 }
234
235 while let Some(result) = requests.next().await {
236 // log individual response errors
237 match result {
238 Ok(response) => self.handle_response(response).await?,
239 Err(error) => debug!("Failed to crawl peer for mempool transactions: {}", error),
240 }
241 }
242
243 Ok(())
244 }
245
246 /// Handle a peer's response to the crawler's request for transactions.
247 async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> {
248 let transaction_ids: HashSet<_> = match response {
249 zn::Response::TransactionIds(ids) => ids.into_iter().collect(),
250 _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
251 };
252
253 trace!(
254 "Mempool crawler received {} transaction IDs",
255 transaction_ids.len()
256 );
257
258 if !transaction_ids.is_empty() {
259 self.queue_transactions(transaction_ids).await?;
260 }
261
262 Ok(())
263 }
264
265 /// Forward the crawled transactions IDs to the mempool transaction downloader.
266 async fn queue_transactions(
267 &mut self,
268 transaction_ids: HashSet<UnminedTxId>,
269 ) -> Result<(), BoxError> {
270 let transaction_ids = transaction_ids.into_iter().map(Gossip::Id).collect();
271
272 let call_result = self
273 .mempool
274 .ready()
275 .await?
276 .call(mempool::Request::Queue(transaction_ids))
277 .await;
278
279 let queue_errors = match call_result {
280 Ok(mempool::Response::Queued(queue_results)) => {
281 queue_results.into_iter().filter_map(Result::err)
282 }
283 Ok(_) => unreachable!("Mempool did not respond with queue results to mempool crawler"),
284 Err(call_error) => {
285 debug!("Ignoring unexpected peer behavior: {}", call_error);
286 return Ok(());
287 }
288 };
289
290 for error in queue_errors {
291 debug!("Failed to download a crawled transaction: {}", error);
292 }
293
294 Ok(())
295 }
296}