zebrad/components/mempool/pending_outputs.rs
1//! Pending [`transparent::Output`] tracker for [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput).
2
3use std::{collections::HashMap, future::Future};
4
5use tokio::sync::broadcast;
6
7use tower::BoxError;
8use zebra_chain::transparent;
9
10use zebra_node_services::mempool::Response;
11
12/// Pending [`transparent::Output`] tracker for handling the mempool's
13/// [`AwaitOutput` requests](zebra_node_services::mempool::Request::AwaitOutput).
14#[derive(Debug, Default)]
15pub struct PendingOutputs(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Output>>);
16
17impl PendingOutputs {
18 /// Returns a future that will resolve to the `transparent::Output` pointed
19 /// to by the given `transparent::OutPoint` when it is available.
20 pub fn queue(
21 &mut self,
22 outpoint: transparent::OutPoint,
23 ) -> impl Future<Output = Result<Response, BoxError>> {
24 let mut receiver = self
25 .0
26 .entry(outpoint)
27 .or_insert_with(|| {
28 let (sender, _) = broadcast::channel(1);
29 sender
30 })
31 .subscribe();
32
33 async move {
34 receiver
35 .recv()
36 .await
37 .map(Response::UnspentOutput)
38 .map_err(BoxError::from)
39 }
40 }
41
42 /// Notify all requests waiting for the [`transparent::Output`] pointed to by
43 /// the given [`transparent::OutPoint`] that the [`transparent::Output`] has
44 /// arrived.
45 #[inline]
46 pub fn respond(&mut self, outpoint: &transparent::OutPoint, output: transparent::Output) {
47 if let Some(sender) = self.0.remove(outpoint) {
48 // Adding the outpoint as a field lets us cross-reference
49 // with the trace of the verification that made the request.
50 tracing::trace!(?outpoint, "found pending mempool output");
51 let _ = sender.send(output);
52 }
53 }
54
55 /// Scan the set of waiting Output requests for channels where all receivers
56 /// have been dropped and remove the corresponding sender.
57 pub fn prune(&mut self) {
58 self.0.retain(|_, chan| chan.receiver_count() > 0);
59 }
60
61 /// Clears the inner [`HashMap`] of queued pending output requests.
62 pub fn clear(&mut self) {
63 self.0.clear();
64 }
65}