zebra_state/service/
pending_utxos.rs1use std::{collections::HashMap, future::Future};
4
5use tokio::sync::broadcast;
6
7use zebra_chain::transparent;
8
9use crate::{BoxError, Response};
10
11#[derive(Debug, Default)]
12pub struct PendingUtxos(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Utxo>>);
13
14impl PendingUtxos {
15 pub fn queue(
18 &mut self,
19 outpoint: transparent::OutPoint,
20 ) -> impl Future<Output = Result<Response, BoxError>> {
21 let mut receiver = self
22 .0
23 .entry(outpoint)
24 .or_insert_with(|| {
25 let (sender, _) = broadcast::channel(1);
26 sender
27 })
28 .subscribe();
29
30 async move {
31 receiver
32 .recv()
33 .await
34 .map(Response::Utxo)
35 .map_err(BoxError::from)
36 }
37 }
38
39 #[inline]
43 pub fn respond(&mut self, outpoint: &transparent::OutPoint, utxo: transparent::Utxo) {
44 if let Some(sender) = self.0.remove(outpoint) {
45 tracing::trace!(?outpoint, "found pending UTXO");
48 let _ = sender.send(utxo);
49 }
50 }
51
52 pub fn check_against_ordered(
55 &mut self,
56 ordered_utxos: &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
57 ) {
58 for (outpoint, ordered_utxo) in ordered_utxos.iter() {
59 self.respond(outpoint, ordered_utxo.utxo.clone())
60 }
61 }
62
63 pub fn prune(&mut self) {
66 self.0.retain(|_, chan| chan.receiver_count() > 0);
67 }
68
69 pub fn len(&self) -> usize {
71 self.0.len()
72 }
73}