1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//! Pending [`transparent::Output`] tracker for [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput).

use std::{collections::HashMap, future::Future};

use tokio::sync::broadcast;

use tower::BoxError;
use zebra_chain::transparent;

use zebra_node_services::mempool::Response;

/// Pending [`transparent::Output`] tracker for handling the mempool's
/// [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput).
#[derive(Debug, Default)]
pub struct PendingOutputs(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Output>>);

impl PendingOutputs {
    /// Returns a future that will resolve to the `transparent::Output` pointed
    /// to by the given `transparent::OutPoint` when it is available.
    pub fn queue(
        &mut self,
        outpoint: transparent::OutPoint,
    ) -> impl Future<Output = Result<Response, BoxError>> {
        let mut receiver = self
            .0
            .entry(outpoint)
            .or_insert_with(|| {
                let (sender, _) = broadcast::channel(1);
                sender
            })
            .subscribe();

        async move {
            receiver
                .recv()
                .await
                .map(Response::UnspentOutput)
                .map_err(BoxError::from)
        }
    }

    /// Notify all requests waiting for the [`transparent::Output`] pointed to by
    /// the given [`transparent::OutPoint`] that the [`transparent::Output`] has
    /// arrived.
    #[inline]
    pub fn respond(&mut self, outpoint: &transparent::OutPoint, output: transparent::Output) {
        if let Some(sender) = self.0.remove(outpoint) {
            // Adding the outpoint as a field lets us cross-reference
            // with the trace of the verification that made the request.
            tracing::trace!(?outpoint, "found pending mempool output");
            let _ = sender.send(output);
        }
    }

    /// Scan the set of waiting Output requests for channels where all receivers
    /// have been dropped and remove the corresponding sender.
    pub fn prune(&mut self) {
        self.0.retain(|_, chan| chan.receiver_count() > 0);
    }

    /// Clears the inner [`HashMap`] of queued pending output requests.
    pub fn clear(&mut self) {
        self.0.clear();
    }
}