1use std::{collections::HashSet, sync::Arc};
12
13use chrono::Duration;
14use indexmap::IndexMap;
15use tokio::{
16 sync::broadcast::{self, error::TryRecvError},
17 time::Instant,
18};
19
20use tower::{Service, ServiceExt};
21
22use zebra_chain::{
23 block::Height,
24 chain_tip::ChainTip,
25 parameters::{Network, NetworkUpgrade},
26 transaction::{Transaction, UnminedTx, UnminedTxId},
27};
28use zebra_node_services::{
29 mempool::{Gossip, Request, Response},
30 BoxError,
31};
32
33use zebra_state::{MinedTx, ReadRequest, ReadResponse};
34
35#[cfg(test)]
36mod tests;
37
38const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5;
40
41pub const CHANNEL_AND_QUEUE_CAPACITY: usize = 20;
43
44const NO_CHAIN_TIP_HEIGHT: Height = Height(1);
46
47#[derive(Clone, Debug)]
48pub struct Queue {
51 transactions: IndexMap<UnminedTxId, (Arc<Transaction>, Instant)>,
52}
53
54#[derive(Debug)]
55pub struct Runner {
57 queue: Queue,
58 receiver: broadcast::Receiver<UnminedTx>,
59 tip_height: Height,
60}
61
62impl Queue {
63 pub fn start() -> (Runner, broadcast::Sender<UnminedTx>) {
65 let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY);
66
67 let queue = Queue {
68 transactions: IndexMap::new(),
69 };
70
71 let runner = Runner {
72 queue,
73 receiver,
74 tip_height: Height(0),
75 };
76
77 (runner, sender)
78 }
79
80 pub fn transactions(&self) -> IndexMap<UnminedTxId, (Arc<Transaction>, Instant)> {
82 self.transactions.clone()
83 }
84
85 pub fn insert(&mut self, unmined_tx: UnminedTx) {
87 self.transactions
88 .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now()));
89
90 if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY {
92 self.remove_first();
93 }
94 }
95
96 pub fn remove(&mut self, unmined_id: UnminedTxId) {
98 self.transactions.swap_remove(&unmined_id);
99 }
100
101 pub fn remove_first(&mut self) {
103 self.transactions.shift_remove_index(0);
104 }
105}
106
107impl Runner {
108 fn transactions_as_hash_set(&self) -> HashSet<UnminedTxId> {
110 let transactions = self.queue.transactions();
111 transactions.iter().map(|t| *t.0).collect()
112 }
113
114 fn transactions_as_vec(&self) -> Vec<Arc<Transaction>> {
116 let transactions = self.queue.transactions();
117 transactions.iter().map(|t| t.1 .0.clone()).collect()
118 }
119
120 pub fn update_tip_height(&mut self, height: Height) {
122 self.tip_height = height;
123 }
124
125 pub async fn run<Mempool, State, Tip>(
138 mut self,
139 mempool: Mempool,
140 state: State,
141 tip: Tip,
142 network: Network,
143 ) where
144 Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
145 State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
146 + Clone
147 + Send
148 + Sync
149 + 'static,
150 Tip: ChainTip + Clone + Send + Sync + 'static,
151 {
152 loop {
153 let tip_height = match tip.best_tip_height() {
155 Some(height) => height,
156 _ => NO_CHAIN_TIP_HEIGHT,
157 };
158
159 let spacing = NetworkUpgrade::target_spacing_for_height(&network, tip_height);
161
162 tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await;
164
165 loop {
167 let tx = match self.receiver.try_recv() {
168 Ok(tx) => tx,
169 Err(TryRecvError::Empty) => break,
170 Err(TryRecvError::Lagged(skipped_count)) => {
171 tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions");
172 continue;
173 }
174 Err(TryRecvError::Closed) => {
175 tracing::info!(
176 "sendrawtransaction queue was closed: is Zebra shutting down?"
177 );
178 return;
179 }
180 };
181
182 self.queue.insert(tx.clone());
183 }
184
185 if tip_height != self.tip_height {
188 self.update_tip_height(tip_height);
190
191 if !self.queue.transactions().is_empty() {
192 self.remove_expired(spacing);
194
195 let in_mempool =
197 Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await;
198 self.remove_committed(in_mempool);
199
200 let in_state =
202 Self::check_state(state.clone(), self.transactions_as_hash_set()).await;
203 self.remove_committed(in_state);
204
205 let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await;
207 }
208 }
209 }
210 }
211
212 fn remove_expired(&mut self, spacing: Duration) {
214 let extra_time = Duration::seconds(5);
217
218 let duration_to_expire =
219 Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time;
220 let transactions = self.queue.transactions();
221 let now = Instant::now();
222
223 for tx in transactions.iter() {
224 let tx_time =
225 tx.1 .1
226 .checked_add(
227 duration_to_expire
228 .to_std()
229 .expect("should never be less than zero"),
230 )
231 .expect("this is low numbers, should always be inside bounds");
232
233 if now > tx_time {
234 self.queue.remove(*tx.0);
235 }
236 }
237 }
238
239 fn remove_committed(&mut self, to_remove: HashSet<UnminedTxId>) {
241 for r in to_remove {
242 self.queue.remove(r);
243 }
244 }
245
246 async fn check_mempool<Mempool>(
250 mempool: Mempool,
251 transactions: HashSet<UnminedTxId>,
252 ) -> HashSet<UnminedTxId>
253 where
254 Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
255 {
256 let mut response = HashSet::new();
257
258 if !transactions.is_empty() {
259 let request = Request::TransactionsById(transactions);
260
261 let mempool_response = mempool.oneshot(request).await;
263 if let Ok(Response::Transactions(txs)) = mempool_response {
264 for tx in txs {
265 response.insert(tx.id);
266 }
267 }
268 }
269
270 response
271 }
272
273 async fn check_state<State>(
277 state: State,
278 transactions: HashSet<UnminedTxId>,
279 ) -> HashSet<UnminedTxId>
280 where
281 State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
282 + Clone
283 + Send
284 + Sync
285 + 'static,
286 {
287 let mut response = HashSet::new();
288
289 for t in transactions {
290 let request = ReadRequest::Transaction(t.mined_id());
291
292 let state_response = state.clone().oneshot(request).await;
294 if let Ok(ReadResponse::Transaction(Some(MinedTx { tx, .. }))) = state_response {
295 response.insert(tx.unmined_id());
296 }
297 }
298
299 response
300 }
301
302 async fn retry<Mempool>(
306 mempool: Mempool,
307 transactions: Vec<Arc<Transaction>>,
308 ) -> HashSet<UnminedTxId>
309 where
310 Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
311 {
312 let mut retried = HashSet::new();
313
314 for tx in transactions {
315 let unmined = UnminedTx::from(tx);
316 let gossip = Gossip::Tx(unmined.clone());
317 let request = Request::Queue(vec![gossip]);
318
319 let _ = mempool.clone().oneshot(request).await;
321
322 retried.insert(unmined.id);
325 }
326 retried
327 }
328}