1use std::{
22 collections::HashSet,
23 future::Future,
24 iter,
25 pin::{pin, Pin},
26 task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34 block::{self, Height},
35 chain_sync_status::ChainSyncStatus,
36 chain_tip::ChainTip,
37 transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{Gossip, Request, Response};
42use zebra_state as zs;
43use zebra_state::{ChainTipChange, TipAction};
44
45use crate::components::sync::SyncStatus;
46
47pub mod config;
48mod crawler;
49pub mod downloads;
50mod error;
51pub mod gossip;
52mod pending_outputs;
53mod queue_checker;
54mod storage;
55
56#[cfg(test)]
57mod tests;
58
59pub use crate::BoxError;
60
61pub use config::Config;
62pub use crawler::Crawler;
63pub use error::MempoolError;
64pub use gossip::gossip_mempool_transaction_id;
65pub use queue_checker::QueueChecker;
66pub use storage::{
67 ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
68};
69
70#[cfg(test)]
71pub use self::tests::UnboxMempoolError;
72
73use downloads::{
74 Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
75 TRANSACTION_VERIFY_TIMEOUT,
76};
77
78type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
79type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
80type TxVerifier = Buffer<
81 BoxService<transaction::Request, transaction::Response, TransactionError>,
82 transaction::Request,
83>;
84type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
85
86#[allow(clippy::large_enum_variant)]
93#[derive(Default)]
94enum ActiveState {
95 #[default]
97 Disabled,
98
99 Enabled {
101 storage: Storage,
108
109 tx_downloads: Pin<Box<InboundTxDownloads>>,
111
112 last_seen_tip_hash: block::Hash,
116 },
117}
118
119impl ActiveState {
120 fn take(&mut self) -> Self {
122 std::mem::take(self)
123 }
124
125 fn transaction_retry_requests(&self) -> Vec<Gossip> {
127 match self {
128 ActiveState::Disabled => Vec::new(),
129 ActiveState::Enabled {
130 storage,
131 tx_downloads,
132 ..
133 } => {
134 let mut transactions = Vec::new();
135
136 let storage = storage
137 .transactions()
138 .values()
139 .map(|tx| tx.transaction.clone().into());
140 transactions.extend(storage);
141
142 let pending = tx_downloads.transaction_requests().cloned();
143 transactions.extend(pending);
144
145 transactions
146 }
147 }
148 }
149
150 #[cfg(feature = "progress-bar")]
153 fn queued_transaction_count(&self) -> usize {
154 match self {
155 ActiveState::Disabled => 0,
156 ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
157 }
158 }
159
160 #[cfg(feature = "progress-bar")]
162 fn transaction_count(&self) -> usize {
163 match self {
164 ActiveState::Disabled => 0,
165 ActiveState::Enabled { storage, .. } => storage.transaction_count(),
166 }
167 }
168
169 #[cfg(feature = "progress-bar")]
172 fn total_cost(&self) -> u64 {
173 match self {
174 ActiveState::Disabled => 0,
175 ActiveState::Enabled { storage, .. } => storage.total_cost(),
176 }
177 }
178
179 #[cfg(feature = "progress-bar")]
184 pub fn total_serialized_size(&self) -> usize {
185 match self {
186 ActiveState::Disabled => 0,
187 ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
188 }
189 }
190
191 #[cfg(feature = "progress-bar")]
194 fn rejected_transaction_count(&mut self) -> usize {
195 match self {
196 ActiveState::Disabled => 0,
197 ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
198 }
199 }
200}
201
202pub struct Mempool {
208 config: Config,
210
211 active_state: ActiveState,
213
214 sync_status: SyncStatus,
216
217 debug_enable_at_height: Option<Height>,
219
220 latest_chain_tip: zs::LatestChainTip,
222
223 chain_tip_change: ChainTipChange,
226
227 outbound: Outbound,
230
231 state: State,
234
235 tx_verifier: TxVerifier,
238
239 transaction_sender: broadcast::Sender<HashSet<UnminedTxId>>,
242
243 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
245
246 #[cfg(feature = "progress-bar")]
251 queued_count_bar: Option<howudoin::Tx>,
252
253 #[cfg(feature = "progress-bar")]
256 transaction_count_bar: Option<howudoin::Tx>,
257
258 #[cfg(feature = "progress-bar")]
261 transaction_cost_bar: Option<howudoin::Tx>,
262
263 #[cfg(feature = "progress-bar")]
266 rejected_count_bar: Option<howudoin::Tx>,
267}
268
269impl Mempool {
270 #[allow(clippy::too_many_arguments)]
271 pub(crate) fn new(
272 config: &Config,
273 outbound: Outbound,
274 state: State,
275 tx_verifier: TxVerifier,
276 sync_status: SyncStatus,
277 latest_chain_tip: zs::LatestChainTip,
278 chain_tip_change: ChainTipChange,
279 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
280 ) -> (Self, broadcast::Receiver<HashSet<UnminedTxId>>) {
281 let (transaction_sender, transaction_receiver) =
282 tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
283
284 let mut service = Mempool {
285 config: config.clone(),
286 active_state: ActiveState::Disabled,
287 sync_status,
288 debug_enable_at_height: config.debug_enable_at_height.map(Height),
289 latest_chain_tip,
290 chain_tip_change,
291 outbound,
292 state,
293 tx_verifier,
294 transaction_sender,
295 misbehavior_sender,
296 #[cfg(feature = "progress-bar")]
297 queued_count_bar: None,
298 #[cfg(feature = "progress-bar")]
299 transaction_count_bar: None,
300 #[cfg(feature = "progress-bar")]
301 transaction_cost_bar: None,
302 #[cfg(feature = "progress-bar")]
303 rejected_count_bar: None,
304 };
305
306 service.update_state(None);
309
310 (service, transaction_receiver)
311 }
312
313 fn is_enabled_by_debug(&self) -> bool {
315 let mut is_debug_enabled = false;
316
317 if self.debug_enable_at_height.is_none() {
319 return is_debug_enabled;
320 }
321
322 let enable_at_height = self
323 .debug_enable_at_height
324 .expect("unexpected debug_enable_at_height: just checked for None");
325
326 if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
327 is_debug_enabled = best_tip_height >= enable_at_height;
328
329 if is_debug_enabled && !self.is_enabled() {
330 info!(
331 ?best_tip_height,
332 ?enable_at_height,
333 "enabling mempool for debugging"
334 );
335 }
336 }
337
338 is_debug_enabled
339 }
340
341 fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
349 let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
350
351 match (is_close_to_tip, self.is_enabled(), tip_action) {
352 (false, false, _) | (true, true, _) | (true, false, None) => return false,
354
355 (true, false, Some(tip_action)) => {
357 let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
358
359 info!(?tip_height, "activating mempool: Zebra is close to the tip");
360
361 let tx_downloads = Box::pin(TxDownloads::new(
362 Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
363 Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
364 self.state.clone(),
365 ));
366 self.active_state = ActiveState::Enabled {
367 storage: storage::Storage::new(&self.config),
368 tx_downloads,
369 last_seen_tip_hash,
370 };
371 }
372
373 (false, true, _) => {
375 info!(
376 tip_height = ?self.latest_chain_tip.best_tip_height(),
377 "deactivating mempool: Zebra is syncing lots of blocks"
378 );
379
380 self.active_state = ActiveState::Disabled;
383 }
384 };
385
386 true
387 }
388
389 pub fn is_enabled(&self) -> bool {
391 match self.active_state {
392 ActiveState::Disabled => false,
393 ActiveState::Enabled { .. } => true,
394 }
395 }
396
397 fn remove_expired_from_peer_list(
399 send_to_peers_ids: &HashSet<UnminedTxId>,
400 expired_transactions: &HashSet<zebra_chain::transaction::Hash>,
401 ) -> HashSet<UnminedTxId> {
402 send_to_peers_ids
403 .iter()
404 .filter(|id| !expired_transactions.contains(&id.mined_id()))
405 .copied()
406 .collect()
407 }
408
409 fn update_metrics(&mut self) {
411 #[cfg(feature = "progress-bar")]
413 if matches!(howudoin::cancelled(), Some(true)) {
414 self.disable_metrics();
415 return;
416 }
417
418 #[cfg(feature = "progress-bar")]
420 if self.is_enabled()
421 && (self.queued_count_bar.is_none()
422 || self.transaction_count_bar.is_none()
423 || self.transaction_cost_bar.is_none()
424 || self.rejected_count_bar.is_none())
425 {
426 let _max_transaction_count = self.config.tx_cost_limit
427 / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
428
429 let transaction_count_bar = *howudoin::new_root()
430 .label("Mempool Transactions")
431 .set_pos(0u64);
432 let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
435 .label("Mempool Cost")
436 .set_pos(0u64)
437 .fmt_as_bytes(true);
439
440 let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
441 .label("Mempool Queue")
442 .set_pos(0u64);
443 let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
448 .label("Mempool Rejects")
449 .set_pos(0u64);
450 self.transaction_count_bar = Some(transaction_count_bar);
455 self.transaction_cost_bar = Some(transaction_cost_bar);
456 self.queued_count_bar = Some(queued_count_bar);
457 self.rejected_count_bar = Some(rejected_count_bar);
458 }
459
460 #[cfg(feature = "progress-bar")]
462 if let (
463 Some(queued_count_bar),
464 Some(transaction_count_bar),
465 Some(transaction_cost_bar),
466 Some(rejected_count_bar),
467 ) = (
468 self.queued_count_bar,
469 self.transaction_count_bar,
470 self.transaction_cost_bar,
471 self.rejected_count_bar,
472 ) {
473 let queued_count = self.active_state.queued_transaction_count();
474 let transaction_count = self.active_state.transaction_count();
475
476 let transaction_cost = self.active_state.total_cost();
477 let transaction_size = self.active_state.total_serialized_size();
478 let transaction_size =
479 indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
480
481 let rejected_count = self.active_state.rejected_transaction_count();
482
483 queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
484
485 transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
486
487 transaction_cost_bar
492 .set_pos(transaction_cost)
493 .desc(format!("Actual size {transaction_size}"));
494
495 rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
496 }
497 }
498
499 fn disable_metrics(&self) {
501 #[cfg(feature = "progress-bar")]
502 {
503 if let Some(bar) = self.queued_count_bar {
504 bar.close()
505 }
506 if let Some(bar) = self.transaction_count_bar {
507 bar.close()
508 }
509 if let Some(bar) = self.transaction_cost_bar {
510 bar.close()
511 }
512 if let Some(bar) = self.rejected_count_bar {
513 bar.close()
514 }
515 }
516 }
517}
518
519impl Service<Request> for Mempool {
520 type Response = Response;
521 type Error = BoxError;
522 type Future =
523 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
524
525 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
526 let tip_action = self.chain_tip_change.last_tip_change();
527 let is_state_changed = self.update_state(tip_action.as_ref());
528
529 tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
530
531 if !self.is_enabled() {
534 self.update_metrics();
535
536 return Poll::Ready(Ok(()));
537 }
538
539 if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
544 info!(
545 tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
546 "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
547 );
548
549 let previous_state = self.active_state.take();
550 let tx_retries = previous_state.transaction_retry_requests();
551
552 std::mem::drop(previous_state);
559
560 self.update_state(tip_action.as_ref());
562
563 if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
566 info!(
567 transactions = tx_retries.len(),
568 "re-verifying mempool transactions after a chain fork"
569 );
570
571 for tx in tx_retries {
572 let _result = tx_downloads.download_if_needed_and_verify(tx, None);
575 }
576 }
577
578 self.update_metrics();
579
580 return Poll::Ready(Ok(()));
581 }
582
583 if let ActiveState::Enabled {
584 storage,
585 tx_downloads,
586 last_seen_tip_hash,
587 } = &mut self.active_state
588 {
589 let mut send_to_peers_ids = HashSet::<_>::new();
591
592 let best_tip_height = self.latest_chain_tip.best_tip_height();
593
594 while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
596 match result {
597 Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
598 if best_tip_height == expected_tip_height {
605 let insert_result =
606 storage.insert(tx, spent_mempool_outpoints, best_tip_height);
607
608 tracing::trace!(
609 ?insert_result,
610 "got Ok(_) transaction verify, tried to store",
611 );
612
613 if let Ok(inserted_id) = insert_result {
614 send_to_peers_ids.insert(inserted_id);
616 }
617
618 if let Some(rsp_tx) = rsp_tx {
620 let _ = rsp_tx
621 .send(insert_result.map(|_| ()).map_err(|err| err.into()));
622 }
623 } else {
624 tracing::trace!("chain grew during tx verification, retrying ..",);
625
626 let _result = tx_downloads
628 .download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
629 }
630 }
631 Ok(Err((tx_id, error))) => {
632 if let TransactionDownloadVerifyError::Invalid {
633 error,
634 advertiser_addr: Some(advertiser_addr),
635 } = &error
636 {
637 if error.mempool_misbehavior_score() != 0 {
638 let _ = self.misbehavior_sender.try_send((
639 *advertiser_addr,
640 error.mempool_misbehavior_score(),
641 ));
642 }
643 };
644
645 tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
646
647 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
648
649 storage.reject_if_needed(tx_id, error);
650 }
651 Err(_elapsed) => {
652 tracing::info!("mempool transaction failed to verify due to timeout");
656
657 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
658 }
659 };
660 }
661
662 if let Some(TipAction::Grow { block }) = tip_action {
664 tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
665 *last_seen_tip_hash = block.hash;
666
667 let mined_ids = block.transaction_hashes.iter().cloned().collect();
670 tx_downloads.cancel(&mined_ids);
671 storage.clear_mined_dependencies(&mined_ids);
672 storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
673
674 storage.clear_tip_rejections();
677 }
678
679 if let Some(tip_height) = best_tip_height {
684 let expired_transactions = storage.remove_expired_transactions(tip_height);
685 send_to_peers_ids =
687 Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
688
689 if !expired_transactions.is_empty() {
690 tracing::debug!(
691 ?expired_transactions,
692 "removed expired transactions from the mempool",
693 );
694 }
695 }
696
697 if !send_to_peers_ids.is_empty() {
699 tracing::trace!(?send_to_peers_ids, "sending new transactions to peers");
700
701 self.transaction_sender.send(send_to_peers_ids)?;
702 }
703 }
704
705 self.update_metrics();
706
707 Poll::Ready(Ok(()))
708 }
709
710 #[instrument(name = "mempool", skip(self, req))]
715 fn call(&mut self, req: Request) -> Self::Future {
716 match &mut self.active_state {
717 ActiveState::Enabled {
718 storage,
719 tx_downloads,
720 last_seen_tip_hash,
721 } => match req {
722 Request::TransactionIds => {
724 trace!(?req, "got mempool request");
725
726 let res: HashSet<_> = storage.tx_ids().collect();
727
728 info!(?req, res_count = ?res.len(), "answered mempool request");
733
734 async move { Ok(Response::TransactionIds(res)) }.boxed()
735 }
736
737 Request::TransactionsById(ref ids) => {
738 trace!(?req, "got mempool request");
739
740 let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
741
742 trace!(?req, res_count = ?res.len(), "answered mempool request");
743
744 async move { Ok(Response::Transactions(res)) }.boxed()
745 }
746 Request::TransactionsByMinedId(ref ids) => {
747 trace!(?req, "got mempool request");
748
749 let res: Vec<_> = storage
750 .transactions_same_effects(ids.clone())
751 .cloned()
752 .collect();
753
754 trace!(?req, res_count = ?res.len(), "answered mempool request");
755
756 async move { Ok(Response::Transactions(res)) }.boxed()
757 }
758 Request::TransactionWithDepsByMinedId(tx_id) => {
759 trace!(?req, "got mempool request");
760
761 let res = if let Some((transaction, dependencies)) =
762 storage.transaction_with_deps(tx_id)
763 {
764 Ok(Response::TransactionWithDeps {
765 transaction,
766 dependencies,
767 })
768 } else {
769 Err("transaction not found in mempool".into())
770 };
771
772 trace!(?req, ?res, "answered mempool request");
773
774 async move { res }.boxed()
775 }
776
777 Request::AwaitOutput(outpoint) => {
778 trace!(?req, "got mempool request");
779
780 let response_fut = storage.pending_outputs.queue(outpoint);
781
782 if let Some(output) = storage.created_output(&outpoint) {
783 storage.pending_outputs.respond(&outpoint, output)
784 }
785
786 trace!("answered mempool request");
787
788 response_fut.boxed()
789 }
790
791 Request::FullTransactions => {
792 trace!(?req, "got mempool request");
793
794 let transactions: Vec<_> = storage.transactions().values().cloned().collect();
795 let transaction_dependencies = storage.transaction_dependencies().clone();
796
797 trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
798
799 let response = Response::FullTransactions {
800 transactions,
801 transaction_dependencies,
802 last_seen_tip_hash: *last_seen_tip_hash,
803 };
804
805 async move { Ok(response) }.boxed()
806 }
807
808 Request::RejectedTransactionIds(ref ids) => {
809 trace!(?req, "got mempool request");
810
811 let res = storage.rejected_transactions(ids.clone()).collect();
812
813 trace!(?req, ?res, "answered mempool request");
814
815 async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
816 }
817
818 Request::Queue(gossiped_txs) => {
820 trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
821
822 let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
823 gossiped_txs
824 .into_iter()
825 .map(
826 |gossiped_tx| -> Result<
827 oneshot::Receiver<Result<(), BoxError>>,
828 MempoolError,
829 > {
830 let (rsp_tx, rsp_rx) = oneshot::channel();
831 storage.should_download_or_verify(gossiped_tx.id())?;
832 tx_downloads
833 .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
834
835 Ok(rsp_rx)
836 },
837 )
838 .map(|result| result.map_err(BoxError::from))
839 .collect();
840
841 self.update_metrics();
843
844 async move { Ok(Response::Queued(rsp)) }.boxed()
845 }
846
847 Request::CheckForVerifiedTransactions => {
849 trace!(?req, "got mempool request");
850
851 async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
853 }
854 },
855 ActiveState::Disabled => {
856 trace!("got mempool request while mempool is disabled");
859
860 let resp = match req {
865 Request::TransactionIds => Response::TransactionIds(Default::default()),
867
868 Request::TransactionsById(_) => Response::Transactions(Default::default()),
869 Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
870 Request::TransactionWithDepsByMinedId(_) | Request::AwaitOutput(_) => {
871 return async move {
872 Err("mempool is not active: wait for Zebra to sync to the tip".into())
873 }
874 .boxed()
875 }
876
877 Request::FullTransactions => {
878 return async move {
879 Err("mempool is not active: wait for Zebra to sync to the tip".into())
880 }
881 .boxed()
882 }
883
884 Request::RejectedTransactionIds(_) => {
885 Response::RejectedTransactionIds(Default::default())
886 }
887
888 Request::Queue(gossiped_txs) => Response::Queued(
890 iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
893 .map(BoxError::from)
894 .map(Err)
895 .collect(),
896 ),
897
898 Request::CheckForVerifiedTransactions => {
901 Response::CheckedForVerifiedTransactions
903 }
904 };
905
906 async move { Ok(resp) }.boxed()
907 }
908 }
909 }
910}
911
912impl Drop for Mempool {
913 fn drop(&mut self) {
914 self.disable_metrics();
915 }
916}