zebra_rpc/
queue.rs

1//! Transaction Queue.
2//!
3//! All transactions that are sent from RPC methods should be added to this queue for retries.
4//! Transactions can fail to be inserted to the mempool immediately by different reasons,
5//! like having not mined utxos.
6//!
7//! The [`Queue`] is just an `IndexMap` of transactions with insertion date.
8//! We use this data type because we want the transactions in the queue to be in order.
9//! The [`Runner`] component will do the processing in it's [`Runner::run()`] method.
10
11use std::{collections::HashSet, sync::Arc};
12
13use chrono::Duration;
14use indexmap::IndexMap;
15use tokio::{
16    sync::broadcast::{self, error::TryRecvError},
17    time::Instant,
18};
19
20use tower::{Service, ServiceExt};
21
22use zebra_chain::{
23    block::Height,
24    chain_tip::ChainTip,
25    parameters::{Network, NetworkUpgrade},
26    transaction::{Transaction, UnminedTx, UnminedTxId},
27};
28use zebra_node_services::{
29    mempool::{Gossip, Request, Response},
30    BoxError,
31};
32
33use zebra_state::{MinedTx, ReadRequest, ReadResponse};
34
35#[cfg(test)]
36mod tests;
37
38/// The approximate target number of blocks a transaction can be in the queue.
39const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5;
40
41/// Size of the queue and channel.
42pub const CHANNEL_AND_QUEUE_CAPACITY: usize = 20;
43
44/// The height to use in spacing calculation if we don't have a chain tip.
45const NO_CHAIN_TIP_HEIGHT: Height = Height(1);
46
47#[derive(Clone, Debug)]
48/// The queue is a container of transactions that are going to be
49/// sent to the mempool again.
50pub struct Queue {
51    transactions: IndexMap<UnminedTxId, (Arc<Transaction>, Instant)>,
52}
53
54#[derive(Debug)]
55/// The runner will make the processing of the transactions in the queue.
56pub struct Runner {
57    queue: Queue,
58    receiver: broadcast::Receiver<UnminedTx>,
59    tip_height: Height,
60}
61
62impl Queue {
63    /// Start a new queue
64    pub fn start() -> (Runner, broadcast::Sender<UnminedTx>) {
65        let (sender, receiver) = broadcast::channel(CHANNEL_AND_QUEUE_CAPACITY);
66
67        let queue = Queue {
68            transactions: IndexMap::new(),
69        };
70
71        let runner = Runner {
72            queue,
73            receiver,
74            tip_height: Height(0),
75        };
76
77        (runner, sender)
78    }
79
80    /// Get the transactions in the queue.
81    pub fn transactions(&self) -> IndexMap<UnminedTxId, (Arc<Transaction>, Instant)> {
82        self.transactions.clone()
83    }
84
85    /// Insert a transaction to the queue.
86    pub fn insert(&mut self, unmined_tx: UnminedTx) {
87        self.transactions
88            .insert(unmined_tx.id, (unmined_tx.transaction, Instant::now()));
89
90        // remove if queue is over capacity
91        if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY {
92            self.remove_first();
93        }
94    }
95
96    /// Remove a transaction from the queue.
97    pub fn remove(&mut self, unmined_id: UnminedTxId) {
98        self.transactions.swap_remove(&unmined_id);
99    }
100
101    /// Remove the oldest transaction from the queue.
102    pub fn remove_first(&mut self) {
103        self.transactions.shift_remove_index(0);
104    }
105}
106
107impl Runner {
108    /// Get the queue transactions as a `HashSet` of unmined ids.
109    fn transactions_as_hash_set(&self) -> HashSet<UnminedTxId> {
110        let transactions = self.queue.transactions();
111        transactions.iter().map(|t| *t.0).collect()
112    }
113
114    /// Get the queue transactions as a `Vec` of transactions.
115    fn transactions_as_vec(&self) -> Vec<Arc<Transaction>> {
116        let transactions = self.queue.transactions();
117        transactions.iter().map(|t| t.1 .0.clone()).collect()
118    }
119
120    /// Update the `tip_height` field with a new height.
121    pub fn update_tip_height(&mut self, height: Height) {
122        self.tip_height = height;
123    }
124
125    /// Retry sending to mempool if needed.
126    ///
127    /// Creates a loop that will run each time a new block is mined.
128    /// In this loop, get the transactions that are in the queue and:
129    /// - Check if they are now in the mempool and if so, delete the transaction from the queue.
130    /// - Check if the transaction is now part of a block in the state and if so,
131    ///   delete the transaction from the queue.
132    /// - With the transactions left in the queue, retry sending them to the mempool ignoring
133    ///   the result of this operation.
134    ///
135    /// Additionally, each iteration of the above loop, will receive and insert to the queue
136    /// transactions that are pending in the channel.
137    pub async fn run<Mempool, State, Tip>(
138        mut self,
139        mempool: Mempool,
140        state: State,
141        tip: Tip,
142        network: Network,
143    ) where
144        Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
145        State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
146            + Clone
147            + Send
148            + Sync
149            + 'static,
150        Tip: ChainTip + Clone + Send + Sync + 'static,
151    {
152        loop {
153            // if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing
154            let tip_height = match tip.best_tip_height() {
155                Some(height) => height,
156                _ => NO_CHAIN_TIP_HEIGHT,
157            };
158
159            // get spacing between blocks
160            let spacing = NetworkUpgrade::target_spacing_for_height(&network, tip_height);
161
162            // sleep until the next block
163            tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await;
164
165            // get transactions from the channel
166            loop {
167                let tx = match self.receiver.try_recv() {
168                    Ok(tx) => tx,
169                    Err(TryRecvError::Empty) => break,
170                    Err(TryRecvError::Lagged(skipped_count)) => {
171                        tracing::info!("sendrawtransaction queue was full: skipped {skipped_count} transactions");
172                        continue;
173                    }
174                    Err(TryRecvError::Closed) => {
175                        tracing::info!(
176                            "sendrawtransaction queue was closed: is Zebra shutting down?"
177                        );
178                        return;
179                    }
180                };
181
182                self.queue.insert(tx.clone());
183            }
184
185            // skip some work if stored tip height is the same as the one arriving
186            // TODO: check tip block hashes instead, so we always retry when there is a chain fork (these are rare)
187            if tip_height != self.tip_height {
188                // update the chain tip
189                self.update_tip_height(tip_height);
190
191                if !self.queue.transactions().is_empty() {
192                    // remove what is expired
193                    self.remove_expired(spacing);
194
195                    // remove if any of the queued transactions is now in the mempool
196                    let in_mempool =
197                        Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await;
198                    self.remove_committed(in_mempool);
199
200                    // remove if any of the queued transactions is now in the state
201                    let in_state =
202                        Self::check_state(state.clone(), self.transactions_as_hash_set()).await;
203                    self.remove_committed(in_state);
204
205                    // retry what is left in the queue
206                    let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await;
207                }
208            }
209        }
210    }
211
212    /// Remove transactions that are expired according to number of blocks and current spacing between blocks.
213    fn remove_expired(&mut self, spacing: Duration) {
214        // Have some extra time to make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE`
215        // times, as the main loop also takes some time to run.
216        let extra_time = Duration::seconds(5);
217
218        let duration_to_expire =
219            Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time;
220        let transactions = self.queue.transactions();
221        let now = Instant::now();
222
223        for tx in transactions.iter() {
224            let tx_time =
225                tx.1 .1
226                    .checked_add(
227                        duration_to_expire
228                            .to_std()
229                            .expect("should never be less than zero"),
230                    )
231                    .expect("this is low numbers, should always be inside bounds");
232
233            if now > tx_time {
234                self.queue.remove(*tx.0);
235            }
236        }
237    }
238
239    /// Remove transactions from the queue that had been inserted to the state or the mempool.
240    fn remove_committed(&mut self, to_remove: HashSet<UnminedTxId>) {
241        for r in to_remove {
242            self.queue.remove(r);
243        }
244    }
245
246    /// Check the mempool for given transactions.
247    ///
248    /// Returns transactions that are in the mempool.
249    async fn check_mempool<Mempool>(
250        mempool: Mempool,
251        transactions: HashSet<UnminedTxId>,
252    ) -> HashSet<UnminedTxId>
253    where
254        Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
255    {
256        let mut response = HashSet::new();
257
258        if !transactions.is_empty() {
259            let request = Request::TransactionsById(transactions);
260
261            // ignore any error coming from the mempool
262            let mempool_response = mempool.oneshot(request).await;
263            if let Ok(Response::Transactions(txs)) = mempool_response {
264                for tx in txs {
265                    response.insert(tx.id);
266                }
267            }
268        }
269
270        response
271    }
272
273    /// Check the state for given transactions.
274    ///
275    /// Returns transactions that are in the state.
276    async fn check_state<State>(
277        state: State,
278        transactions: HashSet<UnminedTxId>,
279    ) -> HashSet<UnminedTxId>
280    where
281        State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
282            + Clone
283            + Send
284            + Sync
285            + 'static,
286    {
287        let mut response = HashSet::new();
288
289        for t in transactions {
290            let request = ReadRequest::Transaction(t.mined_id());
291
292            // ignore any error coming from the state
293            let state_response = state.clone().oneshot(request).await;
294            if let Ok(ReadResponse::Transaction(Some(MinedTx { tx, .. }))) = state_response {
295                response.insert(tx.unmined_id());
296            }
297        }
298
299        response
300    }
301
302    /// Retry sending given transactions to mempool.
303    ///
304    /// Returns the transaction ids that were retried.
305    async fn retry<Mempool>(
306        mempool: Mempool,
307        transactions: Vec<Arc<Transaction>>,
308    ) -> HashSet<UnminedTxId>
309    where
310        Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
311    {
312        let mut retried = HashSet::new();
313
314        for tx in transactions {
315            let unmined = UnminedTx::from(tx);
316            let gossip = Gossip::Tx(unmined.clone());
317            let request = Request::Queue(vec![gossip]);
318
319            // Send to mempool and ignore any error
320            let _ = mempool.clone().oneshot(request).await;
321
322            // return what we retried but don't delete from the queue,
323            // we might retry again in a next call.
324            retried.insert(unmined.id);
325        }
326        retried
327    }
328}