zebrad/components/mempool/
queue_checker.rs

1//! Zebra Mempool queue checker.
2//!
3//! The queue checker periodically sends a request to the mempool,
4//! so that newly verified transactions are added to the mempool,
5//! and gossiped to peers.
6//!
7//! The mempool performs these actions on every request,
8//! but we can't guarantee that requests will arrive from peers
9//! on a regular basis.
10//!
11//! Crawler queue requests are also too infrequent,
12//! and they only happen if peers respond within the timeout.
13
14use std::time::Duration;
15
16use tokio::{task::JoinHandle, time::sleep};
17use tower::{BoxError, Service, ServiceExt};
18use tracing_futures::Instrument;
19
20use crate::components::mempool;
21
22/// The delay between queue check events.
23///
24/// This interval is chosen so that there are a significant number of
25/// queue checks in each target block interval.
26///
27/// This allows transactions to propagate across the network for each block,
28/// even if some peers are poorly connected.
29const RATE_LIMIT_DELAY: Duration = Duration::from_secs(5);
30
31/// The mempool queue checker.
32///
33/// The queue checker relies on the mempool to ignore requests when the mempool is inactive.
34pub struct QueueChecker<Mempool> {
35    /// The mempool service that receives crawled transaction IDs.
36    mempool: Mempool,
37}
38
39impl<Mempool> QueueChecker<Mempool>
40where
41    Mempool:
42        Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
43    Mempool::Future: Send,
44{
45    /// Spawn an asynchronous task to run the mempool queue checker.
46    pub fn spawn(mempool: Mempool) -> JoinHandle<Result<(), BoxError>> {
47        let queue_checker = QueueChecker { mempool };
48
49        tokio::spawn(queue_checker.run().in_current_span())
50    }
51
52    /// Periodically check if the mempool has newly verified transactions.
53    ///
54    /// Runs until the mempool returns an error,
55    /// which happens when Zebra is shutting down.
56    pub async fn run(mut self) -> Result<(), BoxError> {
57        info!("initializing mempool queue checker task");
58
59        loop {
60            sleep(RATE_LIMIT_DELAY).await;
61            self.check_queue().await?;
62        }
63    }
64
65    /// Check if the mempool has newly verified transactions.
66    async fn check_queue(&mut self) -> Result<(), BoxError> {
67        debug!("checking for newly verified mempool transactions");
68
69        // Since this is an internal request, we don't expect any errors.
70        // So we propagate any unexpected errors to the task that spawned us.
71        let response = self
72            .mempool
73            .ready()
74            .await?
75            .call(mempool::Request::CheckForVerifiedTransactions)
76            .await?;
77
78        match response {
79            mempool::Response::CheckedForVerifiedTransactions => {}
80            _ => {
81                unreachable!("mempool did not respond with checked queue to mempool queue checker")
82            }
83        };
84
85        Ok(())
86    }
87}