zebrad/components/mempool/queue_checker.rs
1//! Zebra Mempool queue checker.
2//!
3//! The queue checker periodically sends a request to the mempool,
4//! so that newly verified transactions are added to the mempool,
5//! and gossiped to peers.
6//!
7//! The mempool performs these actions on every request,
8//! but we can't guarantee that requests will arrive from peers
9//! on a regular basis.
10//!
11//! Crawler queue requests are also too infrequent,
12//! and they only happen if peers respond within the timeout.
13
14use std::time::Duration;
15
16use tokio::{task::JoinHandle, time::sleep};
17use tower::{BoxError, Service, ServiceExt};
18use tracing_futures::Instrument;
19
20use crate::components::mempool;
21
22/// The delay between queue check events.
23///
24/// This interval is chosen so that there are a significant number of
25/// queue checks in each target block interval.
26///
27/// This allows transactions to propagate across the network for each block,
28/// even if some peers are poorly connected.
29const RATE_LIMIT_DELAY: Duration = Duration::from_secs(5);
30
31/// The mempool queue checker.
32///
33/// The queue checker relies on the mempool to ignore requests when the mempool is inactive.
34pub struct QueueChecker<Mempool> {
35 /// The mempool service that receives crawled transaction IDs.
36 mempool: Mempool,
37}
38
39impl<Mempool> QueueChecker<Mempool>
40where
41 Mempool:
42 Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
43 Mempool::Future: Send,
44{
45 /// Spawn an asynchronous task to run the mempool queue checker.
46 pub fn spawn(mempool: Mempool) -> JoinHandle<Result<(), BoxError>> {
47 let queue_checker = QueueChecker { mempool };
48
49 tokio::spawn(queue_checker.run().in_current_span())
50 }
51
52 /// Periodically check if the mempool has newly verified transactions.
53 ///
54 /// Runs until the mempool returns an error,
55 /// which happens when Zebra is shutting down.
56 pub async fn run(mut self) -> Result<(), BoxError> {
57 info!("initializing mempool queue checker task");
58
59 loop {
60 sleep(RATE_LIMIT_DELAY).await;
61 self.check_queue().await?;
62 }
63 }
64
65 /// Check if the mempool has newly verified transactions.
66 async fn check_queue(&mut self) -> Result<(), BoxError> {
67 debug!("checking for newly verified mempool transactions");
68
69 // Since this is an internal request, we don't expect any errors.
70 // So we propagate any unexpected errors to the task that spawned us.
71 let response = self
72 .mempool
73 .ready()
74 .await?
75 .call(mempool::Request::CheckForVerifiedTransactions)
76 .await?;
77
78 match response {
79 mempool::Response::CheckedForVerifiedTransactions => {}
80 _ => {
81 unreachable!("mempool did not respond with checked queue to mempool queue checker")
82 }
83 };
84
85 Ok(())
86 }
87}