zebrad/components/mempool/
pending_outputs.rs

1//! Pending [`transparent::Output`] tracker for [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput).
2
3use std::{collections::HashMap, future::Future};
4
5use tokio::sync::broadcast;
6
7use tower::BoxError;
8use zebra_chain::transparent;
9
10use zebra_node_services::mempool::Response;
11
12/// Pending [`transparent::Output`] tracker for handling the mempool's
13/// [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput).
14#[derive(Debug, Default)]
15pub struct PendingOutputs(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Output>>);
16
17impl PendingOutputs {
18    /// Returns a future that will resolve to the `transparent::Output` pointed
19    /// to by the given `transparent::OutPoint` when it is available.
20    pub fn queue(
21        &mut self,
22        outpoint: transparent::OutPoint,
23    ) -> impl Future<Output = Result<Response, BoxError>> {
24        let mut receiver = self
25            .0
26            .entry(outpoint)
27            .or_insert_with(|| {
28                let (sender, _) = broadcast::channel(1);
29                sender
30            })
31            .subscribe();
32
33        async move {
34            receiver
35                .recv()
36                .await
37                .map(Response::UnspentOutput)
38                .map_err(BoxError::from)
39        }
40    }
41
42    /// Notify all requests waiting for the [`transparent::Output`] pointed to by
43    /// the given [`transparent::OutPoint`] that the [`transparent::Output`] has
44    /// arrived.
45    #[inline]
46    pub fn respond(&mut self, outpoint: &transparent::OutPoint, output: transparent::Output) {
47        if let Some(sender) = self.0.remove(outpoint) {
48            // Adding the outpoint as a field lets us cross-reference
49            // with the trace of the verification that made the request.
50            tracing::trace!(?outpoint, "found pending mempool output");
51            let _ = sender.send(output);
52        }
53    }
54
55    /// Scan the set of waiting Output requests for channels where all receivers
56    /// have been dropped and remove the corresponding sender.
57    pub fn prune(&mut self) {
58        self.0.retain(|_, chan| chan.receiver_count() > 0);
59    }
60
61    /// Clears the inner [`HashMap`] of queued pending output requests.
62    pub fn clear(&mut self) {
63        self.0.clear();
64    }
65}