zebrad/components/mempool/queue_checker.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
//! Zebra Mempool queue checker.
//!
//! The queue checker periodically sends a request to the mempool,
//! so that newly verified transactions are added to the mempool,
//! and gossiped to peers.
//!
//! The mempool performs these actions on every request,
//! but we can't guarantee that requests will arrive from peers
//! on a regular basis.
//!
//! Crawler queue requests are also too infrequent,
//! and they only happen if peers respond within the timeout.
use std::time::Duration;
use tokio::{task::JoinHandle, time::sleep};
use tower::{BoxError, Service, ServiceExt};
use tracing_futures::Instrument;
use crate::components::mempool;
/// The delay between queue check events.
///
/// This interval is chosen so that there are a significant number of
/// queue checks in each target block interval.
///
/// This allows transactions to propagate across the network for each block,
/// even if some peers are poorly connected.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(5);
/// The mempool queue checker.
///
/// The queue checker relies on the mempool to ignore requests when the mempool is inactive.
pub struct QueueChecker<Mempool> {
/// The mempool service that receives crawled transaction IDs.
mempool: Mempool,
}
impl<Mempool> QueueChecker<Mempool>
where
Mempool:
Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
Mempool::Future: Send,
{
/// Spawn an asynchronous task to run the mempool queue checker.
pub fn spawn(mempool: Mempool) -> JoinHandle<Result<(), BoxError>> {
let queue_checker = QueueChecker { mempool };
tokio::spawn(queue_checker.run().in_current_span())
}
/// Periodically check if the mempool has newly verified transactions.
///
/// Runs until the mempool returns an error,
/// which happens when Zebra is shutting down.
pub async fn run(mut self) -> Result<(), BoxError> {
info!("initializing mempool queue checker task");
loop {
sleep(RATE_LIMIT_DELAY).await;
self.check_queue().await?;
}
}
/// Check if the mempool has newly verified transactions.
async fn check_queue(&mut self) -> Result<(), BoxError> {
debug!("checking for newly verified mempool transactions");
// Since this is an internal request, we don't expect any errors.
// So we propagate any unexpected errors to the task that spawned us.
let response = self
.mempool
.ready()
.await?
.call(mempool::Request::CheckForVerifiedTransactions)
.await?;
match response {
mempool::Response::CheckedForVerifiedTransactions => {}
_ => {
unreachable!("mempool did not respond with checked queue to mempool queue checker")
}
};
Ok(())
}
}