zebra_state/service/
pending_utxos.rs

1//! Pending UTXO tracker for [`AwaitUtxo` requests](crate::Request::AwaitUtxo).
2
3use std::{collections::HashMap, future::Future};
4
5use tokio::sync::broadcast;
6
7use zebra_chain::transparent;
8
9use crate::{BoxError, Response};
10
11#[derive(Debug, Default)]
12pub struct PendingUtxos(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Utxo>>);
13
14impl PendingUtxos {
15    /// Returns a future that will resolve to the `transparent::Output` pointed
16    /// to by the given `transparent::OutPoint` when it is available.
17    pub fn queue(
18        &mut self,
19        outpoint: transparent::OutPoint,
20    ) -> impl Future<Output = Result<Response, BoxError>> {
21        let mut receiver = self
22            .0
23            .entry(outpoint)
24            .or_insert_with(|| {
25                let (sender, _) = broadcast::channel(1);
26                sender
27            })
28            .subscribe();
29
30        async move {
31            receiver
32                .recv()
33                .await
34                .map(Response::Utxo)
35                .map_err(BoxError::from)
36        }
37    }
38
39    /// Notify all requests waiting for the [`transparent::Utxo`] pointed to by
40    /// the given [`transparent::OutPoint`] that the [`transparent::Utxo`] has
41    /// arrived.
42    #[inline]
43    pub fn respond(&mut self, outpoint: &transparent::OutPoint, utxo: transparent::Utxo) {
44        if let Some(sender) = self.0.remove(outpoint) {
45            // Adding the outpoint as a field lets us cross-reference
46            // with the trace of the verification that made the request.
47            tracing::trace!(?outpoint, "found pending UTXO");
48            let _ = sender.send(utxo);
49        }
50    }
51
52    /// Check the list of pending UTXO requests against the supplied
53    /// [`transparent::OrderedUtxo`] index.
54    pub fn check_against_ordered(
55        &mut self,
56        ordered_utxos: &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
57    ) {
58        for (outpoint, ordered_utxo) in ordered_utxos.iter() {
59            self.respond(outpoint, ordered_utxo.utxo.clone())
60        }
61    }
62
63    /// Scan the set of waiting utxo requests for channels where all receivers
64    /// have been dropped and remove the corresponding sender.
65    pub fn prune(&mut self) {
66        self.0.retain(|_, chan| chan.receiver_count() > 0);
67    }
68
69    /// Returns the number of utxos that are being waited on.
70    pub fn len(&self) -> usize {
71        self.0.len()
72    }
73}