zebrad/components/
mempool.rs

1//! Zebra mempool.
2//!
3//! A zebrad application component that manages the active collection, reception,
4//! gossip, verification, in-memory storage, eviction, and rejection of unmined Zcash
5//! transactions (those that have not been confirmed in a mined block on the
6//! blockchain).
7//!
8//! Major parts of the mempool include:
9//!  * [Mempool Service][`Mempool`]
10//!    * activates when the syncer is near the chain tip
11//!    * spawns [download and verify tasks][`downloads::Downloads`] for each crawled or gossiped transaction
12//!    * handles in-memory [storage][`storage::Storage`] of unmined transactions
13//!  * [Crawler][`crawler::Crawler`]
14//!    * runs in the background to periodically poll peers for fresh unmined transactions
15//!  * [Queue Checker][`queue_checker::QueueChecker`]
16//!    * runs in the background, polling the mempool to store newly verified transactions
17//!  * [Transaction Gossip Task][`gossip::gossip_mempool_transaction_id`]
18//!    * runs in the background and gossips newly added mempool transactions
19//!      to peers
20
21use std::{
22    collections::HashSet,
23    future::Future,
24    iter,
25    pin::{pin, Pin},
26    task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34    block::{self, Height},
35    chain_sync_status::ChainSyncStatus,
36    chain_tip::ChainTip,
37    transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{Gossip, Request, Response};
42use zebra_state as zs;
43use zebra_state::{ChainTipChange, TipAction};
44
45use crate::components::sync::SyncStatus;
46
47pub mod config;
48mod crawler;
49pub mod downloads;
50mod error;
51pub mod gossip;
52mod pending_outputs;
53mod queue_checker;
54mod storage;
55
56#[cfg(test)]
57mod tests;
58
59pub use crate::BoxError;
60
61pub use config::Config;
62pub use crawler::Crawler;
63pub use error::MempoolError;
64pub use gossip::gossip_mempool_transaction_id;
65pub use queue_checker::QueueChecker;
66pub use storage::{
67    ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
68};
69
70#[cfg(test)]
71pub use self::tests::UnboxMempoolError;
72
73use downloads::{
74    Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
75    TRANSACTION_VERIFY_TIMEOUT,
76};
77
78type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
79type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
80type TxVerifier = Buffer<
81    BoxService<transaction::Request, transaction::Response, TransactionError>,
82    transaction::Request,
83>;
84type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
85
86/// The state of the mempool.
87///
88/// Indicates whether it is enabled or disabled and, if enabled, contains
89/// the necessary data to run it.
90//
91// Zebra only has one mempool, so the enum variant size difference doesn't matter.
92#[allow(clippy::large_enum_variant)]
93#[derive(Default)]
94enum ActiveState {
95    /// The Mempool is disabled.
96    #[default]
97    Disabled,
98
99    /// The Mempool is enabled.
100    Enabled {
101        /// The Mempool storage itself.
102        ///
103        /// # Correctness
104        ///
105        /// Only components internal to the [`Mempool`] struct are allowed to
106        /// inject transactions into `storage`, as transactions must be verified beforehand.
107        storage: Storage,
108
109        /// The transaction download and verify stream.
110        tx_downloads: Pin<Box<InboundTxDownloads>>,
111
112        /// Last seen chain tip hash that mempool transactions have been verified against.
113        ///
114        /// In some tests, this is initialized to the latest chain tip, then updated in `poll_ready()` before each request.
115        last_seen_tip_hash: block::Hash,
116    },
117}
118
119impl ActiveState {
120    /// Returns the current state, leaving [`Self::Disabled`] in its place.
121    fn take(&mut self) -> Self {
122        std::mem::take(self)
123    }
124
125    /// Returns a list of requests that will retry every stored and pending transaction.
126    fn transaction_retry_requests(&self) -> Vec<Gossip> {
127        match self {
128            ActiveState::Disabled => Vec::new(),
129            ActiveState::Enabled {
130                storage,
131                tx_downloads,
132                ..
133            } => {
134                let mut transactions = Vec::new();
135
136                let storage = storage
137                    .transactions()
138                    .values()
139                    .map(|tx| tx.transaction.clone().into());
140                transactions.extend(storage);
141
142                let pending = tx_downloads.transaction_requests().cloned();
143                transactions.extend(pending);
144
145                transactions
146            }
147        }
148    }
149
150    /// Returns the number of pending transactions waiting for download or verify,
151    /// or zero if the mempool is disabled.
152    #[cfg(feature = "progress-bar")]
153    fn queued_transaction_count(&self) -> usize {
154        match self {
155            ActiveState::Disabled => 0,
156            ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
157        }
158    }
159
160    /// Returns the number of transactions in storage, or zero if the mempool is disabled.
161    #[cfg(feature = "progress-bar")]
162    fn transaction_count(&self) -> usize {
163        match self {
164            ActiveState::Disabled => 0,
165            ActiveState::Enabled { storage, .. } => storage.transaction_count(),
166        }
167    }
168
169    /// Returns the cost of the transactions in the mempool, according to ZIP-401.
170    /// Returns zero if the mempool is disabled.
171    #[cfg(feature = "progress-bar")]
172    fn total_cost(&self) -> u64 {
173        match self {
174            ActiveState::Disabled => 0,
175            ActiveState::Enabled { storage, .. } => storage.total_cost(),
176        }
177    }
178
179    /// Returns the total serialized size of the verified transactions in the set,
180    /// or zero if the mempool is disabled.
181    ///
182    /// See [`Storage::total_serialized_size()`] for details.
183    #[cfg(feature = "progress-bar")]
184    pub fn total_serialized_size(&self) -> usize {
185        match self {
186            ActiveState::Disabled => 0,
187            ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
188        }
189    }
190
191    /// Returns the number of rejected transaction hashes in storage,
192    /// or zero if the mempool is disabled.
193    #[cfg(feature = "progress-bar")]
194    fn rejected_transaction_count(&mut self) -> usize {
195        match self {
196            ActiveState::Disabled => 0,
197            ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
198        }
199    }
200}
201
202/// Mempool async management and query service.
203///
204/// The mempool is the set of all verified transactions that this node is aware
205/// of that have yet to be confirmed by the Zcash network. A transaction is
206/// confirmed when it has been included in a block ('mined').
207pub struct Mempool {
208    /// The configurable options for the mempool, persisted between states.
209    config: Config,
210
211    /// The state of the mempool.
212    active_state: ActiveState,
213
214    /// Allows checking if we are near the tip to enable/disable the mempool.
215    sync_status: SyncStatus,
216
217    /// If the state's best chain tip has reached this height, always enable the mempool.
218    debug_enable_at_height: Option<Height>,
219
220    /// Allows efficient access to the best tip of the blockchain.
221    latest_chain_tip: zs::LatestChainTip,
222
223    /// Allows the detection of newly added chain tip blocks,
224    /// and chain tip resets.
225    chain_tip_change: ChainTipChange,
226
227    /// Handle to the outbound service.
228    /// Used to construct the transaction downloader.
229    outbound: Outbound,
230
231    /// Handle to the state service.
232    /// Used to construct the transaction downloader.
233    state: State,
234
235    /// Handle to the transaction verifier service.
236    /// Used to construct the transaction downloader.
237    tx_verifier: TxVerifier,
238
239    /// Sender part of a gossip transactions channel.
240    /// Used to broadcast transaction ids to peers.
241    transaction_sender: broadcast::Sender<HashSet<UnminedTxId>>,
242
243    /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
244    misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
245
246    // Diagnostics
247    //
248    /// Queued transactions pending download or verification transmitter.
249    /// Only displayed after the mempool's first activation.
250    #[cfg(feature = "progress-bar")]
251    queued_count_bar: Option<howudoin::Tx>,
252
253    /// Number of mempool transactions transmitter.
254    /// Only displayed after the mempool's first activation.
255    #[cfg(feature = "progress-bar")]
256    transaction_count_bar: Option<howudoin::Tx>,
257
258    /// Mempool transaction cost transmitter.
259    /// Only displayed after the mempool's first activation.
260    #[cfg(feature = "progress-bar")]
261    transaction_cost_bar: Option<howudoin::Tx>,
262
263    /// Rejected transactions transmitter.
264    /// Only displayed after the mempool's first activation.
265    #[cfg(feature = "progress-bar")]
266    rejected_count_bar: Option<howudoin::Tx>,
267}
268
269impl Mempool {
270    #[allow(clippy::too_many_arguments)]
271    pub(crate) fn new(
272        config: &Config,
273        outbound: Outbound,
274        state: State,
275        tx_verifier: TxVerifier,
276        sync_status: SyncStatus,
277        latest_chain_tip: zs::LatestChainTip,
278        chain_tip_change: ChainTipChange,
279        misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
280    ) -> (Self, broadcast::Receiver<HashSet<UnminedTxId>>) {
281        let (transaction_sender, transaction_receiver) =
282            tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
283
284        let mut service = Mempool {
285            config: config.clone(),
286            active_state: ActiveState::Disabled,
287            sync_status,
288            debug_enable_at_height: config.debug_enable_at_height.map(Height),
289            latest_chain_tip,
290            chain_tip_change,
291            outbound,
292            state,
293            tx_verifier,
294            transaction_sender,
295            misbehavior_sender,
296            #[cfg(feature = "progress-bar")]
297            queued_count_bar: None,
298            #[cfg(feature = "progress-bar")]
299            transaction_count_bar: None,
300            #[cfg(feature = "progress-bar")]
301            transaction_cost_bar: None,
302            #[cfg(feature = "progress-bar")]
303            rejected_count_bar: None,
304        };
305
306        // Make sure `is_enabled` is accurate.
307        // Otherwise, it is only updated in `poll_ready`, right before each service call.
308        service.update_state(None);
309
310        (service, transaction_receiver)
311    }
312
313    /// Is the mempool enabled by a debug config option?
314    fn is_enabled_by_debug(&self) -> bool {
315        let mut is_debug_enabled = false;
316
317        // optimise non-debug performance
318        if self.debug_enable_at_height.is_none() {
319            return is_debug_enabled;
320        }
321
322        let enable_at_height = self
323            .debug_enable_at_height
324            .expect("unexpected debug_enable_at_height: just checked for None");
325
326        if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
327            is_debug_enabled = best_tip_height >= enable_at_height;
328
329            if is_debug_enabled && !self.is_enabled() {
330                info!(
331                    ?best_tip_height,
332                    ?enable_at_height,
333                    "enabling mempool for debugging"
334                );
335            }
336        }
337
338        is_debug_enabled
339    }
340
341    /// Update the mempool state (enabled / disabled) depending on how close to
342    /// the tip is the synchronization, including side effects to state changes.
343    ///
344    /// Accepts an optional [`TipAction`] for setting the `last_seen_tip_hash` field
345    /// when enabling the mempool state, it will not enable the mempool if this is None.
346    ///
347    /// Returns `true` if the state changed.
348    fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
349        let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
350
351        match (is_close_to_tip, self.is_enabled(), tip_action) {
352            // the active state is up to date, or there is no tip action to activate the mempool
353            (false, false, _) | (true, true, _) | (true, false, None) => return false,
354
355            // Enable state - there should be a chain tip when Zebra is close to the network tip
356            (true, false, Some(tip_action)) => {
357                let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
358
359                info!(?tip_height, "activating mempool: Zebra is close to the tip");
360
361                let tx_downloads = Box::pin(TxDownloads::new(
362                    Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
363                    Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
364                    self.state.clone(),
365                ));
366                self.active_state = ActiveState::Enabled {
367                    storage: storage::Storage::new(&self.config),
368                    tx_downloads,
369                    last_seen_tip_hash,
370                };
371            }
372
373            // Disable state
374            (false, true, _) => {
375                info!(
376                    tip_height = ?self.latest_chain_tip.best_tip_height(),
377                    "deactivating mempool: Zebra is syncing lots of blocks"
378                );
379
380                // This drops the previous ActiveState::Enabled, cancelling its download tasks.
381                // We don't preserve the previous transactions, because we are syncing lots of blocks.
382                self.active_state = ActiveState::Disabled;
383            }
384        };
385
386        true
387    }
388
389    /// Return whether the mempool is enabled or not.
390    pub fn is_enabled(&self) -> bool {
391        match self.active_state {
392            ActiveState::Disabled => false,
393            ActiveState::Enabled { .. } => true,
394        }
395    }
396
397    /// Remove expired transaction ids from a given list of inserted ones.
398    fn remove_expired_from_peer_list(
399        send_to_peers_ids: &HashSet<UnminedTxId>,
400        expired_transactions: &HashSet<zebra_chain::transaction::Hash>,
401    ) -> HashSet<UnminedTxId> {
402        send_to_peers_ids
403            .iter()
404            .filter(|id| !expired_transactions.contains(&id.mined_id()))
405            .copied()
406            .collect()
407    }
408
409    /// Update metrics for the mempool.
410    fn update_metrics(&mut self) {
411        // Shutdown if needed
412        #[cfg(feature = "progress-bar")]
413        if matches!(howudoin::cancelled(), Some(true)) {
414            self.disable_metrics();
415            return;
416        }
417
418        // Initialize if just activated
419        #[cfg(feature = "progress-bar")]
420        if self.is_enabled()
421            && (self.queued_count_bar.is_none()
422                || self.transaction_count_bar.is_none()
423                || self.transaction_cost_bar.is_none()
424                || self.rejected_count_bar.is_none())
425        {
426            let _max_transaction_count = self.config.tx_cost_limit
427                / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
428
429            let transaction_count_bar = *howudoin::new_root()
430                .label("Mempool Transactions")
431                .set_pos(0u64);
432            // .set_len(max_transaction_count);
433
434            let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
435                .label("Mempool Cost")
436                .set_pos(0u64)
437                // .set_len(self.config.tx_cost_limit)
438                .fmt_as_bytes(true);
439
440            let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
441                .label("Mempool Queue")
442                .set_pos(0u64);
443            // .set_len(
444            //     u64::try_from(downloads::MAX_INBOUND_CONCURRENCY).expect("fits in u64"),
445            // );
446
447            let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
448                .label("Mempool Rejects")
449                .set_pos(0u64);
450            // .set_len(
451            //     u64::try_from(storage::MAX_EVICTION_MEMORY_ENTRIES).expect("fits in u64"),
452            // );
453
454            self.transaction_count_bar = Some(transaction_count_bar);
455            self.transaction_cost_bar = Some(transaction_cost_bar);
456            self.queued_count_bar = Some(queued_count_bar);
457            self.rejected_count_bar = Some(rejected_count_bar);
458        }
459
460        // Update if the mempool has ever been active
461        #[cfg(feature = "progress-bar")]
462        if let (
463            Some(queued_count_bar),
464            Some(transaction_count_bar),
465            Some(transaction_cost_bar),
466            Some(rejected_count_bar),
467        ) = (
468            self.queued_count_bar,
469            self.transaction_count_bar,
470            self.transaction_cost_bar,
471            self.rejected_count_bar,
472        ) {
473            let queued_count = self.active_state.queued_transaction_count();
474            let transaction_count = self.active_state.transaction_count();
475
476            let transaction_cost = self.active_state.total_cost();
477            let transaction_size = self.active_state.total_serialized_size();
478            let transaction_size =
479                indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
480
481            let rejected_count = self.active_state.rejected_transaction_count();
482
483            queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
484
485            transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
486
487            // Display the cost and cost limit, with the actual size as a description.
488            //
489            // Costs can be much higher than the transaction size due to the
490            // MEMPOOL_TRANSACTION_COST_THRESHOLD minimum cost.
491            transaction_cost_bar
492                .set_pos(transaction_cost)
493                .desc(format!("Actual size {transaction_size}"));
494
495            rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
496        }
497    }
498
499    /// Disable metrics for the mempool.
500    fn disable_metrics(&self) {
501        #[cfg(feature = "progress-bar")]
502        {
503            if let Some(bar) = self.queued_count_bar {
504                bar.close()
505            }
506            if let Some(bar) = self.transaction_count_bar {
507                bar.close()
508            }
509            if let Some(bar) = self.transaction_cost_bar {
510                bar.close()
511            }
512            if let Some(bar) = self.rejected_count_bar {
513                bar.close()
514            }
515        }
516    }
517}
518
519impl Service<Request> for Mempool {
520    type Response = Response;
521    type Error = BoxError;
522    type Future =
523        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
524
525    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
526        let tip_action = self.chain_tip_change.last_tip_change();
527        let is_state_changed = self.update_state(tip_action.as_ref());
528
529        tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
530
531        // When the mempool is disabled we still return that the service is ready.
532        // Otherwise, callers could block waiting for the mempool to be enabled.
533        if !self.is_enabled() {
534            self.update_metrics();
535
536            return Poll::Ready(Ok(()));
537        }
538
539        // Clear the mempool and cancel downloads if there has been a chain tip reset.
540        //
541        // But if the mempool was just freshly enabled,
542        // skip resetting and removing mined transactions for this tip.
543        if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
544            info!(
545                tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
546                "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
547            );
548
549            let previous_state = self.active_state.take();
550            let tx_retries = previous_state.transaction_retry_requests();
551
552            // Use the same code for dropping and resetting the mempool,
553            // to avoid subtle bugs.
554            //
555            // Drop the current contents of the state,
556            // cancelling any pending download tasks,
557            // and dropping completed verification results.
558            std::mem::drop(previous_state);
559
560            // Re-initialise an empty state.
561            self.update_state(tip_action.as_ref());
562
563            // Re-verify the transactions that were pending or valid at the previous tip.
564            // This saves us the time and data needed to re-download them.
565            if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
566                info!(
567                    transactions = tx_retries.len(),
568                    "re-verifying mempool transactions after a chain fork"
569                );
570
571                for tx in tx_retries {
572                    // This is just an efficiency optimisation, so we don't care if queueing
573                    // transaction requests fails.
574                    let _result = tx_downloads.download_if_needed_and_verify(tx, None);
575                }
576            }
577
578            self.update_metrics();
579
580            return Poll::Ready(Ok(()));
581        }
582
583        if let ActiveState::Enabled {
584            storage,
585            tx_downloads,
586            last_seen_tip_hash,
587        } = &mut self.active_state
588        {
589            // Collect inserted transaction ids.
590            let mut send_to_peers_ids = HashSet::<_>::new();
591
592            let best_tip_height = self.latest_chain_tip.best_tip_height();
593
594            // Clean up completed download tasks and add to mempool if successful.
595            while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
596                match result {
597                    Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
598                        // # Correctness:
599                        //
600                        // It's okay to use tip height here instead of the tip hash since
601                        // chain_tip_change.last_tip_change() returns a `TipAction::Reset` when
602                        // the best chain changes (which is the only way to stay at the same height), and the
603                        // mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`.
604                        if best_tip_height == expected_tip_height {
605                            let insert_result =
606                                storage.insert(tx, spent_mempool_outpoints, best_tip_height);
607
608                            tracing::trace!(
609                                ?insert_result,
610                                "got Ok(_) transaction verify, tried to store",
611                            );
612
613                            if let Ok(inserted_id) = insert_result {
614                                // Save transaction ids that we will send to peers
615                                send_to_peers_ids.insert(inserted_id);
616                            }
617
618                            // Send the result to responder channel if one was provided.
619                            if let Some(rsp_tx) = rsp_tx {
620                                let _ = rsp_tx
621                                    .send(insert_result.map(|_| ()).map_err(|err| err.into()));
622                            }
623                        } else {
624                            tracing::trace!("chain grew during tx verification, retrying ..",);
625
626                            // We don't care if re-queueing the transaction request fails.
627                            let _result = tx_downloads
628                                .download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
629                        }
630                    }
631                    Ok(Err((tx_id, error))) => {
632                        if let TransactionDownloadVerifyError::Invalid {
633                            error,
634                            advertiser_addr: Some(advertiser_addr),
635                        } = &error
636                        {
637                            if error.mempool_misbehavior_score() != 0 {
638                                let _ = self.misbehavior_sender.try_send((
639                                    *advertiser_addr,
640                                    error.mempool_misbehavior_score(),
641                                ));
642                            }
643                        };
644
645                        tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
646
647                        metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
648
649                        storage.reject_if_needed(tx_id, error);
650                    }
651                    Err(_elapsed) => {
652                        // A timeout happens when the stream hangs waiting for another service,
653                        // so there is no specific transaction ID.
654
655                        tracing::info!("mempool transaction failed to verify due to timeout");
656
657                        metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
658                    }
659                };
660            }
661
662            // Handle best chain tip changes
663            if let Some(TipAction::Grow { block }) = tip_action {
664                tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
665                *last_seen_tip_hash = block.hash;
666
667                // Cancel downloads/verifications/storage of transactions
668                // with the same mined IDs as recently mined transactions.
669                let mined_ids = block.transaction_hashes.iter().cloned().collect();
670                tx_downloads.cancel(&mined_ids);
671                storage.clear_mined_dependencies(&mined_ids);
672                storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
673
674                // Clear any transaction rejections if they might have become valid after
675                // the new block was added to the tip.
676                storage.clear_tip_rejections();
677            }
678
679            // Remove expired transactions from the mempool.
680            //
681            // Lock times never expire, because block times are strictly increasing.
682            // So we don't need to check them here.
683            if let Some(tip_height) = best_tip_height {
684                let expired_transactions = storage.remove_expired_transactions(tip_height);
685                // Remove transactions that are expired from the peers list
686                send_to_peers_ids =
687                    Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
688
689                if !expired_transactions.is_empty() {
690                    tracing::debug!(
691                        ?expired_transactions,
692                        "removed expired transactions from the mempool",
693                    );
694                }
695            }
696
697            // Send transactions that were not rejected nor expired to peers
698            if !send_to_peers_ids.is_empty() {
699                tracing::trace!(?send_to_peers_ids, "sending new transactions to peers");
700
701                self.transaction_sender.send(send_to_peers_ids)?;
702            }
703        }
704
705        self.update_metrics();
706
707        Poll::Ready(Ok(()))
708    }
709
710    /// Call the mempool service.
711    ///
712    /// Errors indicate that the peer has done something wrong or unexpected,
713    /// and will cause callers to disconnect from the remote peer.
714    #[instrument(name = "mempool", skip(self, req))]
715    fn call(&mut self, req: Request) -> Self::Future {
716        match &mut self.active_state {
717            ActiveState::Enabled {
718                storage,
719                tx_downloads,
720                last_seen_tip_hash,
721            } => match req {
722                // Queries
723                Request::TransactionIds => {
724                    trace!(?req, "got mempool request");
725
726                    let res: HashSet<_> = storage.tx_ids().collect();
727
728                    // This log line is checked by tests,
729                    // because lightwalletd doesn't return mempool transactions at the moment.
730                    //
731                    // TODO: downgrade to trace level when we can check transactions via gRPC
732                    info!(?req, res_count = ?res.len(), "answered mempool request");
733
734                    async move { Ok(Response::TransactionIds(res)) }.boxed()
735                }
736
737                Request::TransactionsById(ref ids) => {
738                    trace!(?req, "got mempool request");
739
740                    let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
741
742                    trace!(?req, res_count = ?res.len(), "answered mempool request");
743
744                    async move { Ok(Response::Transactions(res)) }.boxed()
745                }
746                Request::TransactionsByMinedId(ref ids) => {
747                    trace!(?req, "got mempool request");
748
749                    let res: Vec<_> = storage
750                        .transactions_same_effects(ids.clone())
751                        .cloned()
752                        .collect();
753
754                    trace!(?req, res_count = ?res.len(), "answered mempool request");
755
756                    async move { Ok(Response::Transactions(res)) }.boxed()
757                }
758                Request::TransactionWithDepsByMinedId(tx_id) => {
759                    trace!(?req, "got mempool request");
760
761                    let res = if let Some((transaction, dependencies)) =
762                        storage.transaction_with_deps(tx_id)
763                    {
764                        Ok(Response::TransactionWithDeps {
765                            transaction,
766                            dependencies,
767                        })
768                    } else {
769                        Err("transaction not found in mempool".into())
770                    };
771
772                    trace!(?req, ?res, "answered mempool request");
773
774                    async move { res }.boxed()
775                }
776
777                Request::AwaitOutput(outpoint) => {
778                    trace!(?req, "got mempool request");
779
780                    let response_fut = storage.pending_outputs.queue(outpoint);
781
782                    if let Some(output) = storage.created_output(&outpoint) {
783                        storage.pending_outputs.respond(&outpoint, output)
784                    }
785
786                    trace!("answered mempool request");
787
788                    response_fut.boxed()
789                }
790
791                Request::FullTransactions => {
792                    trace!(?req, "got mempool request");
793
794                    let transactions: Vec<_> = storage.transactions().values().cloned().collect();
795                    let transaction_dependencies = storage.transaction_dependencies().clone();
796
797                    trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
798
799                    let response = Response::FullTransactions {
800                        transactions,
801                        transaction_dependencies,
802                        last_seen_tip_hash: *last_seen_tip_hash,
803                    };
804
805                    async move { Ok(response) }.boxed()
806                }
807
808                Request::RejectedTransactionIds(ref ids) => {
809                    trace!(?req, "got mempool request");
810
811                    let res = storage.rejected_transactions(ids.clone()).collect();
812
813                    trace!(?req, ?res, "answered mempool request");
814
815                    async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
816                }
817
818                // Queue mempool candidates
819                Request::Queue(gossiped_txs) => {
820                    trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
821
822                    let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
823                        gossiped_txs
824                            .into_iter()
825                            .map(
826                                |gossiped_tx| -> Result<
827                                    oneshot::Receiver<Result<(), BoxError>>,
828                                    MempoolError,
829                                > {
830                                    let (rsp_tx, rsp_rx) = oneshot::channel();
831                                    storage.should_download_or_verify(gossiped_tx.id())?;
832                                    tx_downloads
833                                        .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
834
835                                    Ok(rsp_rx)
836                                },
837                            )
838                            .map(|result| result.map_err(BoxError::from))
839                            .collect();
840
841                    // We've added transactions to the queue
842                    self.update_metrics();
843
844                    async move { Ok(Response::Queued(rsp)) }.boxed()
845                }
846
847                // Store successfully downloaded and verified transactions in the mempool
848                Request::CheckForVerifiedTransactions => {
849                    trace!(?req, "got mempool request");
850
851                    // all the work for this request is done in poll_ready
852                    async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
853                }
854            },
855            ActiveState::Disabled => {
856                // TODO: add the name of the request, but not the content,
857                //       like the command() or Display impls of network requests
858                trace!("got mempool request while mempool is disabled");
859
860                // We can't return an error since that will cause a disconnection
861                // by the peer connection handler. Therefore, return successful
862                // empty responses.
863
864                let resp = match req {
865                    // Return empty responses for queries.
866                    Request::TransactionIds => Response::TransactionIds(Default::default()),
867
868                    Request::TransactionsById(_) => Response::Transactions(Default::default()),
869                    Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
870                    Request::TransactionWithDepsByMinedId(_) | Request::AwaitOutput(_) => {
871                        return async move {
872                            Err("mempool is not active: wait for Zebra to sync to the tip".into())
873                        }
874                        .boxed()
875                    }
876
877                    Request::FullTransactions => {
878                        return async move {
879                            Err("mempool is not active: wait for Zebra to sync to the tip".into())
880                        }
881                        .boxed()
882                    }
883
884                    Request::RejectedTransactionIds(_) => {
885                        Response::RejectedTransactionIds(Default::default())
886                    }
887
888                    // Don't queue mempool candidates, because there is no queue.
889                    Request::Queue(gossiped_txs) => Response::Queued(
890                        // Special case; we can signal the error inside the response,
891                        // because the inbound service ignores inner errors.
892                        iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
893                            .map(BoxError::from)
894                            .map(Err)
895                            .collect(),
896                    ),
897
898                    // Check if the mempool should be enabled.
899                    // This request makes sure mempools are debug-enabled in the acceptance tests.
900                    Request::CheckForVerifiedTransactions => {
901                        // all the work for this request is done in poll_ready
902                        Response::CheckedForVerifiedTransactions
903                    }
904                };
905
906                async move { Ok(resp) }.boxed()
907            }
908        }
909    }
910}
911
912impl Drop for Mempool {
913    fn drop(&mut self) {
914        self.disable_metrics();
915    }
916}