1//! Transaction Queue.
2//!
3//! All transactions that are sent from RPC methods should be added to this queue for retries.
4//! Transactions can fail to be inserted to the mempool immediately by different reasons,
5//! like having not mined utxos.
6//!
7//! The [`Queue`] is just an `IndexMap` of transactions with insertion date.
8//! We use this data type because we want the transactions in the queue to be in order.
9//! The [`Runner`] component will do the processing in it's [`Runner::run()`] method.
1011use std::{collections::HashSet, sync::Arc};
1213use chrono::Duration;
14use indexmap::IndexMap;
15use tokio::{
16 sync::broadcast::{self, error::TryRecvError},
17 time::Instant,
18};
1920use tower::{Service, ServiceExt};
2122use zebra_chain::{
23 block::Height,
24 chain_tip::ChainTip,
25 parameters::{Network, NetworkUpgrade},
26 transaction::{Transaction, UnminedTx, UnminedTxId},
27};
28use zebra_node_services::{
29 mempool::{Gossip, Request, Response},
30 BoxError,
31};
3233use zebra_state::{MinedTx, ReadRequest, ReadResponse};
3435#[cfg(test)]
36mod tests;
3738/// The approximate target number of blocks a transaction can be in the queue.
39const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5;
4041/// Size of the queue and channel.
42pub const CHANNEL_AND_QUEUE_CAPACITY: usize = 20;
4344/// The height to use in spacing calculation if we don't have a chain tip.
45const NO_CHAIN_TIP_HEIGHT: Height = Height(1);
4647#[derive(Clone, Debug)]
48/// The queue is a container of transactions that are going to be
49/// sent to the mempool again.
50pub struct Queue {
51 transactions: IndexMap<UnminedTxId, (Arc<Transaction>, Instant)>,
52}
5354#[derive(Debug)]
55/// The runner will make the processing of the transactions in the queue.
56pub struct Runner {
57 queue: Queue,
58 receiver: broadcast::Receiver<UnminedTx>,
59 tip_height: Height,
60}
6162impl Queue {
63/// Start a new queue
64pub fn start() -> (Runner, broadcast::Sender<UnminedTx>) {
65let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY);
6667let queue = Queue {
68 transactions: IndexMap::new(),
69 };
7071let runner = Runner {
72 queue,
73 receiver,
74 tip_height: Height(0),
75 };
7677 (runner, sender)
78 }
7980/// Get the transactions in the queue.
81pub fn transactions(&self) -> IndexMap<UnminedTxId, (Arc<Transaction>, Instant)> {
82self.transactions.clone()
83 }
8485/// Insert a transaction to the queue.
86pub fn insert(&mut self, unmined_tx: UnminedTx) {
87self.transactions
88 .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now()));
8990// remove if queue is over capacity
91if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY {
92self.remove_first();
93 }
94 }
9596/// Remove a transaction from the queue.
97pub fn remove(&mut self, unmined_id: UnminedTxId) {
98self.transactions.swap_remove(&unmined_id);
99 }
100101/// Remove the oldest transaction from the queue.
102pub fn remove_first(&mut self) {
103self.transactions.shift_remove_index(0);
104 }
105}
106107impl Runner {
108/// Get the queue transactions as a `HashSet` of unmined ids.
109fn transactions_as_hash_set(&self) -> HashSet<UnminedTxId> {
110let transactions = self.queue.transactions();
111 transactions.iter().map(|t| *t.0).collect()
112 }
113114/// Get the queue transactions as a `Vec` of transactions.
115fn transactions_as_vec(&self) -> Vec<Arc<Transaction>> {
116let transactions = self.queue.transactions();
117 transactions.iter().map(|t| t.1 .0.clone()).collect()
118 }
119120/// Update the `tip_height` field with a new height.
121pub fn update_tip_height(&mut self, height: Height) {
122self.tip_height = height;
123 }
124125/// Retry sending to mempool if needed.
126 ///
127 /// Creates a loop that will run each time a new block is mined.
128 /// In this loop, get the transactions that are in the queue and:
129 /// - Check if they are now in the mempool and if so, delete the transaction from the queue.
130 /// - Check if the transaction is now part of a block in the state and if so,
131 /// delete the transaction from the queue.
132 /// - With the transactions left in the queue, retry sending them to the mempool ignoring
133 /// the result of this operation.
134 ///
135 /// Additionally, each iteration of the above loop, will receive and insert to the queue
136 /// transactions that are pending in the channel.
137pub async fn run<Mempool, State, Tip>(
138mut self,
139 mempool: Mempool,
140 state: State,
141 tip: Tip,
142 network: Network,
143 ) where
144Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
145 State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
146 + Clone
147 + Send
148 + Sync
149 + 'static,
150 Tip: ChainTip + Clone + Send + Sync + 'static,
151 {
152loop {
153// if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing
154let tip_height = match tip.best_tip_height() {
155Some(height) => height,
156_ => NO_CHAIN_TIP_HEIGHT,
157 };
158159// get spacing between blocks
160let spacing = NetworkUpgrade::target_spacing_for_height(&network, tip_height);
161162// sleep until the next block
163tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await;
164165// get transactions from the channel
166loop {
167let tx = match self.receiver.try_recv() {
168Ok(tx) => tx,
169Err(TryRecvError::Empty) => break,
170Err(TryRecvError::Lagged(skipped_count)) => {
171tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions");
172continue;
173 }
174Err(TryRecvError::Closed) => {
175tracing::info!(
176"sendrawtransaction queue was closed: is Zebra shutting down?"
177);
178return;
179 }
180 };
181182self.queue.insert(tx.clone());
183 }
184185// skip some work if stored tip height is the same as the one arriving
186 // TODO: check tip block hashes instead, so we always retry when there is a chain fork (these are rare)
187if tip_height != self.tip_height {
188// update the chain tip
189self.update_tip_height(tip_height);
190191if !self.queue.transactions().is_empty() {
192// remove what is expired
193self.remove_expired(spacing);
194195// remove if any of the queued transactions is now in the mempool
196let in_mempool =
197Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await;
198self.remove_committed(in_mempool);
199200// remove if any of the queued transactions is now in the state
201let in_state =
202Self::check_state(state.clone(), self.transactions_as_hash_set()).await;
203self.remove_committed(in_state);
204205// retry what is left in the queue
206let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await;
207 }
208 }
209 }
210 }
211212/// Remove transactions that are expired according to number of blocks and current spacing between blocks.
213fn remove_expired(&mut self, spacing: Duration) {
214// Have some extra time to make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE`
215 // times, as the main loop also takes some time to run.
216let extra_time = Duration::seconds(5);
217218let duration_to_expire =
219 Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time;
220let transactions = self.queue.transactions();
221let now = Instant::now();
222223for tx in transactions.iter() {
224let tx_time =
225 tx.1 .1
226.checked_add(
227 duration_to_expire
228 .to_std()
229 .expect("should never be less than zero"),
230 )
231 .expect("this is low numbers, should always be inside bounds");
232233if now > tx_time {
234self.queue.remove(*tx.0);
235 }
236 }
237 }
238239/// Remove transactions from the queue that had been inserted to the state or the mempool.
240fn remove_committed(&mut self, to_remove: HashSet<UnminedTxId>) {
241for r in to_remove {
242self.queue.remove(r);
243 }
244 }
245246/// Check the mempool for given transactions.
247 ///
248 /// Returns transactions that are in the mempool.
249async fn check_mempool<Mempool>(
250 mempool: Mempool,
251 transactions: HashSet<UnminedTxId>,
252 ) -> HashSet<UnminedTxId>
253where
254Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
255 {
256let mut response = HashSet::new();
257258if !transactions.is_empty() {
259let request = Request::TransactionsById(transactions);
260261// ignore any error coming from the mempool
262let mempool_response = mempool.oneshot(request).await;
263if let Ok(Response::Transactions(txs)) = mempool_response {
264for tx in txs {
265 response.insert(tx.id);
266 }
267 }
268 }
269270 response
271 }
272273/// Check the state for given transactions.
274 ///
275 /// Returns transactions that are in the state.
276async fn check_state<State>(
277 state: State,
278 transactions: HashSet<UnminedTxId>,
279 ) -> HashSet<UnminedTxId>
280where
281State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
282 + Clone
283 + Send
284 + Sync
285 + 'static,
286 {
287let mut response = HashSet::new();
288289for t in transactions {
290let request = ReadRequest::Transaction(t.mined_id());
291292// ignore any error coming from the state
293let state_response = state.clone().oneshot(request).await;
294if let Ok(ReadResponse::Transaction(Some(MinedTx { tx, .. }))) = state_response {
295 response.insert(tx.unmined_id());
296 }
297 }
298299 response
300 }
301302/// Retry sending given transactions to mempool.
303 ///
304 /// Returns the transaction ids that were retried.
305async fn retry<Mempool>(
306 mempool: Mempool,
307 transactions: Vec<Arc<Transaction>>,
308 ) -> HashSet<UnminedTxId>
309where
310Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
311 {
312let mut retried = HashSet::new();
313314for tx in transactions {
315let unmined = UnminedTx::from(tx);
316let gossip = Gossip::Tx(unmined.clone());
317let request = Request::Queue(vec![gossip]);
318319// Send to mempool and ignore any error
320let _ = mempool.clone().oneshot(request).await;
321322// return what we retried but don't delete from the queue,
323 // we might retry again in a next call.
324retried.insert(unmined.id);
325 }
326 retried
327 }
328}