zebrad/components/
mempool.rs

1//! Zebra mempool.
2//!
3//! A zebrad application component that manages the active collection, reception,
4//! gossip, verification, in-memory storage, eviction, and rejection of unmined Zcash
5//! transactions (those that have not been confirmed in a mined block on the
6//! blockchain).
7//!
8//! Major parts of the mempool include:
9//!  * [Mempool Service][`Mempool`]
10//!    * activates when the syncer is near the chain tip
11//!    * spawns [download and verify tasks][`downloads::Downloads`] for each crawled or gossiped transaction
12//!    * handles in-memory [storage][`storage::Storage`] of unmined transactions
13//!  * [Crawler][`crawler::Crawler`]
14//!    * runs in the background to periodically poll peers for fresh unmined transactions
15//!  * [Queue Checker][`queue_checker::QueueChecker`]
16//!    * runs in the background, polling the mempool to store newly verified transactions
17//!  * [Transaction Gossip Task][`gossip::gossip_mempool_transaction_id`]
18//!    * runs in the background and gossips newly added mempool transactions
19//!      to peers
20
21use std::{
22    collections::HashSet,
23    future::Future,
24    iter,
25    pin::{pin, Pin},
26    task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34    block::{self, Height},
35    chain_sync_status::ChainSyncStatus,
36    chain_tip::ChainTip,
37    transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{Gossip, MempoolChange, MempoolTxSubscriber, Request, Response};
42use zebra_state as zs;
43use zebra_state::{ChainTipChange, TipAction};
44
45use crate::components::sync::SyncStatus;
46
47pub mod config;
48mod crawler;
49pub mod downloads;
50mod error;
51pub mod gossip;
52mod pending_outputs;
53mod queue_checker;
54mod storage;
55
56#[cfg(test)]
57mod tests;
58
59pub use crate::BoxError;
60
61pub use config::Config;
62pub use crawler::Crawler;
63pub use error::MempoolError;
64pub use gossip::gossip_mempool_transaction_id;
65pub use queue_checker::QueueChecker;
66pub use storage::{
67    ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
68};
69
70#[cfg(test)]
71pub use self::tests::UnboxMempoolError;
72
73use downloads::{
74    Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
75    TRANSACTION_VERIFY_TIMEOUT,
76};
77
78type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
79type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
80type TxVerifier = Buffer<
81    BoxService<transaction::Request, transaction::Response, TransactionError>,
82    transaction::Request,
83>;
84type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
85
86/// The state of the mempool.
87///
88/// Indicates whether it is enabled or disabled and, if enabled, contains
89/// the necessary data to run it.
90//
91// Zebra only has one mempool, so the enum variant size difference doesn't matter.
92#[allow(clippy::large_enum_variant)]
93#[derive(Default)]
94enum ActiveState {
95    /// The Mempool is disabled.
96    #[default]
97    Disabled,
98
99    /// The Mempool is enabled.
100    Enabled {
101        /// The Mempool storage itself.
102        ///
103        /// # Correctness
104        ///
105        /// Only components internal to the [`Mempool`] struct are allowed to
106        /// inject transactions into `storage`, as transactions must be verified beforehand.
107        storage: Storage,
108
109        /// The transaction download and verify stream.
110        tx_downloads: Pin<Box<InboundTxDownloads>>,
111
112        /// Last seen chain tip hash that mempool transactions have been verified against.
113        ///
114        /// In some tests, this is initialized to the latest chain tip, then updated in `poll_ready()` before each request.
115        last_seen_tip_hash: block::Hash,
116    },
117}
118
119impl ActiveState {
120    /// Returns the current state, leaving [`Self::Disabled`] in its place.
121    fn take(&mut self) -> Self {
122        std::mem::take(self)
123    }
124
125    /// Returns a list of requests that will retry every stored and pending transaction.
126    fn transaction_retry_requests(&self) -> Vec<Gossip> {
127        match self {
128            ActiveState::Disabled => Vec::new(),
129            ActiveState::Enabled {
130                storage,
131                tx_downloads,
132                ..
133            } => {
134                let mut transactions = Vec::new();
135
136                let storage = storage
137                    .transactions()
138                    .values()
139                    .map(|tx| tx.transaction.clone().into());
140                transactions.extend(storage);
141
142                let pending = tx_downloads.transaction_requests().cloned();
143                transactions.extend(pending);
144
145                transactions
146            }
147        }
148    }
149
150    /// Returns the number of pending transactions waiting for download or verify,
151    /// or zero if the mempool is disabled.
152    #[cfg(feature = "progress-bar")]
153    fn queued_transaction_count(&self) -> usize {
154        match self {
155            ActiveState::Disabled => 0,
156            ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
157        }
158    }
159
160    /// Returns the number of transactions in storage, or zero if the mempool is disabled.
161    #[cfg(feature = "progress-bar")]
162    fn transaction_count(&self) -> usize {
163        match self {
164            ActiveState::Disabled => 0,
165            ActiveState::Enabled { storage, .. } => storage.transaction_count(),
166        }
167    }
168
169    /// Returns the cost of the transactions in the mempool, according to ZIP-401.
170    /// Returns zero if the mempool is disabled.
171    #[cfg(feature = "progress-bar")]
172    fn total_cost(&self) -> u64 {
173        match self {
174            ActiveState::Disabled => 0,
175            ActiveState::Enabled { storage, .. } => storage.total_cost(),
176        }
177    }
178
179    /// Returns the total serialized size of the verified transactions in the set,
180    /// or zero if the mempool is disabled.
181    ///
182    /// See [`Storage::total_serialized_size()`] for details.
183    #[cfg(feature = "progress-bar")]
184    pub fn total_serialized_size(&self) -> usize {
185        match self {
186            ActiveState::Disabled => 0,
187            ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
188        }
189    }
190
191    /// Returns the number of rejected transaction hashes in storage,
192    /// or zero if the mempool is disabled.
193    #[cfg(feature = "progress-bar")]
194    fn rejected_transaction_count(&mut self) -> usize {
195        match self {
196            ActiveState::Disabled => 0,
197            ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
198        }
199    }
200}
201
202/// Mempool async management and query service.
203///
204/// The mempool is the set of all verified transactions that this node is aware
205/// of that have yet to be confirmed by the Zcash network. A transaction is
206/// confirmed when it has been included in a block ('mined').
207pub struct Mempool {
208    /// The configurable options for the mempool, persisted between states.
209    config: Config,
210
211    /// The state of the mempool.
212    active_state: ActiveState,
213
214    /// Allows checking if we are near the tip to enable/disable the mempool.
215    sync_status: SyncStatus,
216
217    /// If the state's best chain tip has reached this height, always enable the mempool.
218    debug_enable_at_height: Option<Height>,
219
220    /// Allows efficient access to the best tip of the blockchain.
221    latest_chain_tip: zs::LatestChainTip,
222
223    /// Allows the detection of newly added chain tip blocks,
224    /// and chain tip resets.
225    chain_tip_change: ChainTipChange,
226
227    /// Handle to the outbound service.
228    /// Used to construct the transaction downloader.
229    outbound: Outbound,
230
231    /// Handle to the state service.
232    /// Used to construct the transaction downloader.
233    state: State,
234
235    /// Handle to the transaction verifier service.
236    /// Used to construct the transaction downloader.
237    tx_verifier: TxVerifier,
238
239    /// Sender part of a gossip transactions channel.
240    /// Used to broadcast transaction ids to peers.
241    transaction_sender: broadcast::Sender<MempoolChange>,
242
243    /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
244    misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
245
246    // Diagnostics
247    //
248    /// Queued transactions pending download or verification transmitter.
249    /// Only displayed after the mempool's first activation.
250    #[cfg(feature = "progress-bar")]
251    queued_count_bar: Option<howudoin::Tx>,
252
253    /// Number of mempool transactions transmitter.
254    /// Only displayed after the mempool's first activation.
255    #[cfg(feature = "progress-bar")]
256    transaction_count_bar: Option<howudoin::Tx>,
257
258    /// Mempool transaction cost transmitter.
259    /// Only displayed after the mempool's first activation.
260    #[cfg(feature = "progress-bar")]
261    transaction_cost_bar: Option<howudoin::Tx>,
262
263    /// Rejected transactions transmitter.
264    /// Only displayed after the mempool's first activation.
265    #[cfg(feature = "progress-bar")]
266    rejected_count_bar: Option<howudoin::Tx>,
267}
268
269impl Mempool {
270    #[allow(clippy::too_many_arguments)]
271    pub(crate) fn new(
272        config: &Config,
273        outbound: Outbound,
274        state: State,
275        tx_verifier: TxVerifier,
276        sync_status: SyncStatus,
277        latest_chain_tip: zs::LatestChainTip,
278        chain_tip_change: ChainTipChange,
279        misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
280    ) -> (Self, MempoolTxSubscriber) {
281        let (transaction_sender, _) =
282            tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
283        let transaction_subscriber = MempoolTxSubscriber::new(transaction_sender.clone());
284
285        let mut service = Mempool {
286            config: config.clone(),
287            active_state: ActiveState::Disabled,
288            sync_status,
289            debug_enable_at_height: config.debug_enable_at_height.map(Height),
290            latest_chain_tip,
291            chain_tip_change,
292            outbound,
293            state,
294            tx_verifier,
295            transaction_sender,
296            misbehavior_sender,
297            #[cfg(feature = "progress-bar")]
298            queued_count_bar: None,
299            #[cfg(feature = "progress-bar")]
300            transaction_count_bar: None,
301            #[cfg(feature = "progress-bar")]
302            transaction_cost_bar: None,
303            #[cfg(feature = "progress-bar")]
304            rejected_count_bar: None,
305        };
306
307        // Make sure `is_enabled` is accurate.
308        // Otherwise, it is only updated in `poll_ready`, right before each service call.
309        service.update_state(None);
310
311        (service, transaction_subscriber)
312    }
313
314    /// Is the mempool enabled by a debug config option?
315    fn is_enabled_by_debug(&self) -> bool {
316        let mut is_debug_enabled = false;
317
318        // optimise non-debug performance
319        if self.debug_enable_at_height.is_none() {
320            return is_debug_enabled;
321        }
322
323        let enable_at_height = self
324            .debug_enable_at_height
325            .expect("unexpected debug_enable_at_height: just checked for None");
326
327        if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
328            is_debug_enabled = best_tip_height >= enable_at_height;
329
330            if is_debug_enabled && !self.is_enabled() {
331                info!(
332                    ?best_tip_height,
333                    ?enable_at_height,
334                    "enabling mempool for debugging"
335                );
336            }
337        }
338
339        is_debug_enabled
340    }
341
342    /// Update the mempool state (enabled / disabled) depending on how close to
343    /// the tip is the synchronization, including side effects to state changes.
344    ///
345    /// Accepts an optional [`TipAction`] for setting the `last_seen_tip_hash` field
346    /// when enabling the mempool state, it will not enable the mempool if this is None.
347    ///
348    /// Returns `true` if the state changed.
349    fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
350        let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
351
352        match (is_close_to_tip, self.is_enabled(), tip_action) {
353            // the active state is up to date, or there is no tip action to activate the mempool
354            (false, false, _) | (true, true, _) | (true, false, None) => return false,
355
356            // Enable state - there should be a chain tip when Zebra is close to the network tip
357            (true, false, Some(tip_action)) => {
358                let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
359
360                info!(?tip_height, "activating mempool: Zebra is close to the tip");
361
362                let tx_downloads = Box::pin(TxDownloads::new(
363                    Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
364                    Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
365                    self.state.clone(),
366                ));
367                self.active_state = ActiveState::Enabled {
368                    storage: storage::Storage::new(&self.config),
369                    tx_downloads,
370                    last_seen_tip_hash,
371                };
372            }
373
374            // Disable state
375            (false, true, _) => {
376                info!(
377                    tip_height = ?self.latest_chain_tip.best_tip_height(),
378                    "deactivating mempool: Zebra is syncing lots of blocks"
379                );
380
381                // This drops the previous ActiveState::Enabled, cancelling its download tasks.
382                // We don't preserve the previous transactions, because we are syncing lots of blocks.
383                self.active_state = ActiveState::Disabled;
384            }
385        };
386
387        true
388    }
389
390    /// Return whether the mempool is enabled or not.
391    pub fn is_enabled(&self) -> bool {
392        match self.active_state {
393            ActiveState::Disabled => false,
394            ActiveState::Enabled { .. } => true,
395        }
396    }
397
398    /// Remove expired transaction ids from a given list of inserted ones.
399    fn remove_expired_from_peer_list(
400        send_to_peers_ids: &HashSet<UnminedTxId>,
401        expired_transactions: &HashSet<UnminedTxId>,
402    ) -> HashSet<UnminedTxId> {
403        send_to_peers_ids
404            .iter()
405            .filter(|id| !expired_transactions.contains(id))
406            .copied()
407            .collect()
408    }
409
410    /// Update metrics for the mempool.
411    fn update_metrics(&mut self) {
412        // Shutdown if needed
413        #[cfg(feature = "progress-bar")]
414        if matches!(howudoin::cancelled(), Some(true)) {
415            self.disable_metrics();
416            return;
417        }
418
419        // Initialize if just activated
420        #[cfg(feature = "progress-bar")]
421        if self.is_enabled()
422            && (self.queued_count_bar.is_none()
423                || self.transaction_count_bar.is_none()
424                || self.transaction_cost_bar.is_none()
425                || self.rejected_count_bar.is_none())
426        {
427            let _max_transaction_count = self.config.tx_cost_limit
428                / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
429
430            let transaction_count_bar = *howudoin::new_root()
431                .label("Mempool Transactions")
432                .set_pos(0u64);
433            // .set_len(max_transaction_count);
434
435            let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
436                .label("Mempool Cost")
437                .set_pos(0u64)
438                // .set_len(self.config.tx_cost_limit)
439                .fmt_as_bytes(true);
440
441            let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
442                .label("Mempool Queue")
443                .set_pos(0u64);
444            // .set_len(
445            //     u64::try_from(downloads::MAX_INBOUND_CONCURRENCY).expect("fits in u64"),
446            // );
447
448            let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
449                .label("Mempool Rejects")
450                .set_pos(0u64);
451            // .set_len(
452            //     u64::try_from(storage::MAX_EVICTION_MEMORY_ENTRIES).expect("fits in u64"),
453            // );
454
455            self.transaction_count_bar = Some(transaction_count_bar);
456            self.transaction_cost_bar = Some(transaction_cost_bar);
457            self.queued_count_bar = Some(queued_count_bar);
458            self.rejected_count_bar = Some(rejected_count_bar);
459        }
460
461        // Update if the mempool has ever been active
462        #[cfg(feature = "progress-bar")]
463        if let (
464            Some(queued_count_bar),
465            Some(transaction_count_bar),
466            Some(transaction_cost_bar),
467            Some(rejected_count_bar),
468        ) = (
469            self.queued_count_bar,
470            self.transaction_count_bar,
471            self.transaction_cost_bar,
472            self.rejected_count_bar,
473        ) {
474            let queued_count = self.active_state.queued_transaction_count();
475            let transaction_count = self.active_state.transaction_count();
476
477            let transaction_cost = self.active_state.total_cost();
478            let transaction_size = self.active_state.total_serialized_size();
479            let transaction_size =
480                indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
481
482            let rejected_count = self.active_state.rejected_transaction_count();
483
484            queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
485
486            transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
487
488            // Display the cost and cost limit, with the actual size as a description.
489            //
490            // Costs can be much higher than the transaction size due to the
491            // MEMPOOL_TRANSACTION_COST_THRESHOLD minimum cost.
492            transaction_cost_bar
493                .set_pos(transaction_cost)
494                .desc(format!("Actual size {transaction_size}"));
495
496            rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
497        }
498    }
499
500    /// Disable metrics for the mempool.
501    fn disable_metrics(&self) {
502        #[cfg(feature = "progress-bar")]
503        {
504            if let Some(bar) = self.queued_count_bar {
505                bar.close()
506            }
507            if let Some(bar) = self.transaction_count_bar {
508                bar.close()
509            }
510            if let Some(bar) = self.transaction_cost_bar {
511                bar.close()
512            }
513            if let Some(bar) = self.rejected_count_bar {
514                bar.close()
515            }
516        }
517    }
518}
519
520impl Service<Request> for Mempool {
521    type Response = Response;
522    type Error = BoxError;
523    type Future =
524        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
525
526    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
527        let tip_action = self.chain_tip_change.last_tip_change();
528
529        // TODO: Consider broadcasting a `MempoolChange` when the mempool is disabled.
530        let is_state_changed = self.update_state(tip_action.as_ref());
531
532        tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
533
534        // When the mempool is disabled we still return that the service is ready.
535        // Otherwise, callers could block waiting for the mempool to be enabled.
536        if !self.is_enabled() {
537            self.update_metrics();
538
539            return Poll::Ready(Ok(()));
540        }
541
542        // Clear the mempool and cancel downloads if there has been a chain tip reset.
543        //
544        // But if the mempool was just freshly enabled,
545        // skip resetting and removing mined transactions for this tip.
546        if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
547            info!(
548                tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
549                "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
550            );
551
552            let previous_state = self.active_state.take();
553            let tx_retries = previous_state.transaction_retry_requests();
554
555            // Use the same code for dropping and resetting the mempool,
556            // to avoid subtle bugs.
557            //
558            // Drop the current contents of the state,
559            // cancelling any pending download tasks,
560            // and dropping completed verification results.
561            std::mem::drop(previous_state);
562
563            // Re-initialise an empty state.
564            self.update_state(tip_action.as_ref());
565
566            // Re-verify the transactions that were pending or valid at the previous tip.
567            // This saves us the time and data needed to re-download them.
568            if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
569                info!(
570                    transactions = tx_retries.len(),
571                    "re-verifying mempool transactions after a chain fork"
572                );
573
574                for tx in tx_retries {
575                    // This is just an efficiency optimisation, so we don't care if queueing
576                    // transaction requests fails.
577                    let _result = tx_downloads.download_if_needed_and_verify(tx, None);
578                }
579            }
580
581            self.update_metrics();
582
583            return Poll::Ready(Ok(()));
584        }
585
586        if let ActiveState::Enabled {
587            storage,
588            tx_downloads,
589            last_seen_tip_hash,
590        } = &mut self.active_state
591        {
592            // Collect inserted transaction ids.
593            let mut send_to_peers_ids = HashSet::<_>::new();
594            let mut invalidated_ids = HashSet::<_>::new();
595            let mut mined_mempool_ids = HashSet::<_>::new();
596
597            let best_tip_height = self.latest_chain_tip.best_tip_height();
598
599            // Clean up completed download tasks and add to mempool if successful.
600            while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
601                match result {
602                    Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
603                        // # Correctness:
604                        //
605                        // It's okay to use tip height here instead of the tip hash since
606                        // chain_tip_change.last_tip_change() returns a `TipAction::Reset` when
607                        // the best chain changes (which is the only way to stay at the same height), and the
608                        // mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`.
609                        if best_tip_height == expected_tip_height {
610                            let tx_id = tx.transaction.id;
611                            let insert_result =
612                                storage.insert(tx, spent_mempool_outpoints, best_tip_height);
613
614                            tracing::trace!(
615                                ?insert_result,
616                                "got Ok(_) transaction verify, tried to store",
617                            );
618
619                            if let Ok(inserted_id) = insert_result {
620                                // Save transaction ids that we will send to peers
621                                send_to_peers_ids.insert(inserted_id);
622                            } else {
623                                invalidated_ids.insert(tx_id);
624                            }
625
626                            // Send the result to responder channel if one was provided.
627                            if let Some(rsp_tx) = rsp_tx {
628                                let _ = rsp_tx
629                                    .send(insert_result.map(|_| ()).map_err(|err| err.into()));
630                            }
631                        } else {
632                            tracing::trace!("chain grew during tx verification, retrying ..",);
633
634                            // We don't care if re-queueing the transaction request fails.
635                            let _result = tx_downloads
636                                .download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
637                        }
638                    }
639                    Ok(Err((tx_id, error))) => {
640                        if let TransactionDownloadVerifyError::Invalid {
641                            error,
642                            advertiser_addr: Some(advertiser_addr),
643                        } = &error
644                        {
645                            if error.mempool_misbehavior_score() != 0 {
646                                let _ = self.misbehavior_sender.try_send((
647                                    *advertiser_addr,
648                                    error.mempool_misbehavior_score(),
649                                ));
650                            }
651                        };
652
653                        tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
654
655                        metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
656
657                        invalidated_ids.insert(tx_id);
658                        storage.reject_if_needed(tx_id, error);
659                    }
660                    Err(_elapsed) => {
661                        // A timeout happens when the stream hangs waiting for another service,
662                        // so there is no specific transaction ID.
663
664                        // TODO: Return the transaction id that timed out during verification so it can be
665                        //       included in the list of invalidated transactions and change `warn!` to `info!`.
666                        tracing::warn!("mempool transaction failed to verify due to timeout");
667
668                        metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
669                    }
670                };
671            }
672
673            // Handle best chain tip changes
674            if let Some(TipAction::Grow { block }) = tip_action {
675                tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
676                *last_seen_tip_hash = block.hash;
677
678                // Cancel downloads/verifications/storage of transactions
679                // with the same mined IDs as recently mined transactions.
680                let mined_ids = block.transaction_hashes.iter().cloned().collect();
681                tx_downloads.cancel(&mined_ids);
682                storage.clear_mined_dependencies(&mined_ids);
683
684                let storage::RemovedTransactionIds { mined, invalidated } =
685                    storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
686
687                // Clear any transaction rejections if they might have become valid after
688                // the new block was added to the tip.
689                storage.clear_tip_rejections();
690
691                mined_mempool_ids.extend(mined);
692                invalidated_ids.extend(invalidated);
693            }
694
695            // Remove expired transactions from the mempool.
696            //
697            // Lock times never expire, because block times are strictly increasing.
698            // So we don't need to check them here.
699            if let Some(tip_height) = best_tip_height {
700                let expired_transactions = storage.remove_expired_transactions(tip_height);
701                // Remove transactions that are expired from the peers list
702                send_to_peers_ids =
703                    Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
704
705                if !expired_transactions.is_empty() {
706                    tracing::debug!(
707                        ?expired_transactions,
708                        "removed expired transactions from the mempool",
709                    );
710
711                    invalidated_ids.extend(expired_transactions);
712                }
713            }
714
715            // Send transactions that were not rejected nor expired to peers and RPC listeners.
716            if !send_to_peers_ids.is_empty() {
717                tracing::trace!(
718                    ?send_to_peers_ids,
719                    "sending new transactions to peers and RPC listeners"
720                );
721
722                self.transaction_sender
723                    .send(MempoolChange::added(send_to_peers_ids))?;
724            }
725
726            // Send transactions that were rejected to RPC listeners.
727            if !invalidated_ids.is_empty() {
728                tracing::trace!(
729                    ?invalidated_ids,
730                    "sending invalidated transactions to RPC listeners"
731                );
732
733                self.transaction_sender
734                    .send(MempoolChange::invalidated(invalidated_ids))?;
735            }
736
737            // Send transactions that were mined onto the best chain to RPC listeners.
738            if !mined_mempool_ids.is_empty() {
739                tracing::trace!(
740                    ?mined_mempool_ids,
741                    "sending mined transactions to RPC listeners"
742                );
743
744                self.transaction_sender
745                    .send(MempoolChange::mined(mined_mempool_ids))?;
746            }
747        }
748
749        self.update_metrics();
750
751        Poll::Ready(Ok(()))
752    }
753
754    /// Call the mempool service.
755    ///
756    /// Errors indicate that the peer has done something wrong or unexpected,
757    /// and will cause callers to disconnect from the remote peer.
758    #[instrument(name = "mempool", skip(self, req))]
759    fn call(&mut self, req: Request) -> Self::Future {
760        match &mut self.active_state {
761            ActiveState::Enabled {
762                storage,
763                tx_downloads,
764                last_seen_tip_hash,
765            } => match req {
766                // Queries
767                Request::TransactionIds => {
768                    trace!(?req, "got mempool request");
769
770                    let res: HashSet<_> = storage.tx_ids().collect();
771
772                    trace!(?req, res_count = ?res.len(), "answered mempool request");
773
774                    async move { Ok(Response::TransactionIds(res)) }.boxed()
775                }
776
777                Request::TransactionsById(ref ids) => {
778                    trace!(?req, "got mempool request");
779
780                    let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
781
782                    trace!(?req, res_count = ?res.len(), "answered mempool request");
783
784                    async move { Ok(Response::Transactions(res)) }.boxed()
785                }
786                Request::TransactionsByMinedId(ref ids) => {
787                    trace!(?req, "got mempool request");
788
789                    let res: Vec<_> = storage
790                        .transactions_same_effects(ids.clone())
791                        .cloned()
792                        .collect();
793
794                    trace!(?req, res_count = ?res.len(), "answered mempool request");
795
796                    async move { Ok(Response::Transactions(res)) }.boxed()
797                }
798                Request::TransactionWithDepsByMinedId(tx_id) => {
799                    trace!(?req, "got mempool request");
800
801                    let res = if let Some((transaction, dependencies)) =
802                        storage.transaction_with_deps(tx_id)
803                    {
804                        Ok(Response::TransactionWithDeps {
805                            transaction,
806                            dependencies,
807                        })
808                    } else {
809                        Err("transaction not found in mempool".into())
810                    };
811
812                    trace!(?req, ?res, "answered mempool request");
813
814                    async move { res }.boxed()
815                }
816
817                Request::AwaitOutput(outpoint) => {
818                    trace!(?req, "got mempool request");
819
820                    let response_fut = storage.pending_outputs.queue(outpoint);
821
822                    if let Some(output) = storage.created_output(&outpoint) {
823                        storage.pending_outputs.respond(&outpoint, output)
824                    }
825
826                    trace!("answered mempool request");
827
828                    response_fut.boxed()
829                }
830
831                Request::FullTransactions => {
832                    trace!(?req, "got mempool request");
833
834                    let transactions: Vec<_> = storage.transactions().values().cloned().collect();
835                    let transaction_dependencies = storage.transaction_dependencies().clone();
836
837                    trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
838
839                    let response = Response::FullTransactions {
840                        transactions,
841                        transaction_dependencies,
842                        last_seen_tip_hash: *last_seen_tip_hash,
843                    };
844
845                    async move { Ok(response) }.boxed()
846                }
847
848                Request::RejectedTransactionIds(ref ids) => {
849                    trace!(?req, "got mempool request");
850
851                    let res = storage.rejected_transactions(ids.clone()).collect();
852
853                    trace!(?req, ?res, "answered mempool request");
854
855                    async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
856                }
857
858                // Queue mempool candidates
859                Request::Queue(gossiped_txs) => {
860                    trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
861
862                    let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
863                        gossiped_txs
864                            .into_iter()
865                            .map(
866                                |gossiped_tx| -> Result<
867                                    oneshot::Receiver<Result<(), BoxError>>,
868                                    MempoolError,
869                                > {
870                                    let (rsp_tx, rsp_rx) = oneshot::channel();
871                                    storage.should_download_or_verify(gossiped_tx.id())?;
872                                    tx_downloads
873                                        .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
874
875                                    Ok(rsp_rx)
876                                },
877                            )
878                            .map(|result| result.map_err(BoxError::from))
879                            .collect();
880
881                    // We've added transactions to the queue
882                    self.update_metrics();
883
884                    async move { Ok(Response::Queued(rsp)) }.boxed()
885                }
886
887                // Store successfully downloaded and verified transactions in the mempool
888                Request::CheckForVerifiedTransactions => {
889                    trace!(?req, "got mempool request");
890
891                    // all the work for this request is done in poll_ready
892                    async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
893                }
894            },
895            ActiveState::Disabled => {
896                // TODO: add the name of the request, but not the content,
897                //       like the command() or Display impls of network requests
898                trace!("got mempool request while mempool is disabled");
899
900                // We can't return an error since that will cause a disconnection
901                // by the peer connection handler. Therefore, return successful
902                // empty responses.
903
904                let resp = match req {
905                    // Return empty responses for queries.
906                    Request::TransactionIds => Response::TransactionIds(Default::default()),
907
908                    Request::TransactionsById(_) => Response::Transactions(Default::default()),
909                    Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
910                    Request::TransactionWithDepsByMinedId(_) | Request::AwaitOutput(_) => {
911                        return async move {
912                            Err("mempool is not active: wait for Zebra to sync to the tip".into())
913                        }
914                        .boxed()
915                    }
916
917                    Request::FullTransactions => {
918                        return async move {
919                            Err("mempool is not active: wait for Zebra to sync to the tip".into())
920                        }
921                        .boxed()
922                    }
923
924                    Request::RejectedTransactionIds(_) => {
925                        Response::RejectedTransactionIds(Default::default())
926                    }
927
928                    // Don't queue mempool candidates, because there is no queue.
929                    Request::Queue(gossiped_txs) => Response::Queued(
930                        // Special case; we can signal the error inside the response,
931                        // because the inbound service ignores inner errors.
932                        iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
933                            .map(BoxError::from)
934                            .map(Err)
935                            .collect(),
936                    ),
937
938                    // Check if the mempool should be enabled.
939                    // This request makes sure mempools are debug-enabled in the acceptance tests.
940                    Request::CheckForVerifiedTransactions => {
941                        // all the work for this request is done in poll_ready
942                        Response::CheckedForVerifiedTransactions
943                    }
944                };
945
946                async move { Ok(resp) }.boxed()
947            }
948        }
949    }
950}
951
952impl Drop for Mempool {
953    fn drop(&mut self) {
954        self.disable_metrics();
955    }
956}