1//! Pending UTXO tracker for [`AwaitUtxo` requests](crate::Request::AwaitUtxo).
23use std::{collections::HashMap, future::Future};
45use tokio::sync::broadcast;
67use zebra_chain::transparent;
89use crate::{BoxError, Response};
1011#[derive(Debug, Default)]
12pub struct PendingUtxos(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Utxo>>);
1314impl PendingUtxos {
15/// Returns a future that will resolve to the `transparent::Output` pointed
16 /// to by the given `transparent::OutPoint` when it is available.
17pub fn queue(
18&mut self,
19 outpoint: transparent::OutPoint,
20 ) -> impl Future<Output = Result<Response, BoxError>> {
21let mut receiver = self
22.0
23.entry(outpoint)
24 .or_insert_with(|| {
25let (sender, _) = broadcast::channel(1);
26 sender
27 })
28 .subscribe();
2930async move {
31 receiver
32 .recv()
33 .await
34.map(Response::Utxo)
35 .map_err(BoxError::from)
36 }
37 }
3839/// Notify all requests waiting for the [`transparent::Utxo`] pointed to by
40 /// the given [`transparent::OutPoint`] that the [`transparent::Utxo`] has
41 /// arrived.
42#[inline]
43pub fn respond(&mut self, outpoint: &transparent::OutPoint, utxo: transparent::Utxo) {
44if let Some(sender) = self.0.remove(outpoint) {
45// Adding the outpoint as a field lets us cross-reference
46 // with the trace of the verification that made the request.
47tracing::trace!(?outpoint, "found pending UTXO");
48let _ = sender.send(utxo);
49 }
50 }
5152/// Check the list of pending UTXO requests against the supplied
53 /// [`transparent::OrderedUtxo`] index.
54pub fn check_against_ordered(
55&mut self,
56 ordered_utxos: &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
57 ) {
58for (outpoint, ordered_utxo) in ordered_utxos.iter() {
59self.respond(outpoint, ordered_utxo.utxo.clone())
60 }
61 }
6263/// Scan the set of waiting utxo requests for channels where all receivers
64 /// have been dropped and remove the corresponding sender.
65pub fn prune(&mut self) {
66self.0.retain(|_, chan| chan.receiver_count() > 0);
67 }
6869/// Returns the number of utxos that are being waited on.
70pub fn len(&self) -> usize {
71self.0.len()
72 }
73}