1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
//! Pending UTXO tracker for [`AwaitUtxo` requests](crate::Request::AwaitUtxo).
use std::{collections::HashMap, future::Future};
use tokio::sync::broadcast;
use zebra_chain::transparent;
use crate::{BoxError, Response};
#[derive(Debug, Default)]
pub struct PendingUtxos(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Utxo>>);
impl PendingUtxos {
/// Returns a future that will resolve to the `transparent::Output` pointed
/// to by the given `transparent::OutPoint` when it is available.
pub fn queue(
&mut self,
outpoint: transparent::OutPoint,
) -> impl Future<Output = Result<Response, BoxError>> {
let mut receiver = self
.0
.entry(outpoint)
.or_insert_with(|| {
let (sender, _) = broadcast::channel(1);
sender
})
.subscribe();
async move {
receiver
.recv()
.await
.map(Response::Utxo)
.map_err(BoxError::from)
}
}
/// Notify all requests waiting for the [`transparent::Utxo`] pointed to by
/// the given [`transparent::OutPoint`] that the [`transparent::Utxo`] has
/// arrived.
#[inline]
pub fn respond(&mut self, outpoint: &transparent::OutPoint, utxo: transparent::Utxo) {
if let Some(sender) = self.0.remove(outpoint) {
// Adding the outpoint as a field lets us cross-reference
// with the trace of the verification that made the request.
tracing::trace!(?outpoint, "found pending UTXO");
let _ = sender.send(utxo);
}
}
/// Check the list of pending UTXO requests against the supplied
/// [`transparent::OrderedUtxo`] index.
pub fn check_against_ordered(
&mut self,
ordered_utxos: &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
) {
for (outpoint, ordered_utxo) in ordered_utxos.iter() {
self.respond(outpoint, ordered_utxo.utxo.clone())
}
}
/// Scan the set of waiting utxo requests for channels where all receivers
/// have been dropped and remove the corresponding sender.
pub fn prune(&mut self) {
self.0.retain(|_, chan| chan.receiver_count() > 0);
}
/// Returns the number of utxos that are being waited on.
pub fn len(&self) -> usize {
self.0.len()
}
}