1use std::{
22 collections::HashSet,
23 future::Future,
24 iter,
25 pin::{pin, Pin},
26 task::{Context, Poll},
27};
28
29use futures::{future::FutureExt, stream::Stream};
30use tokio::sync::{broadcast, mpsc, oneshot};
31use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
32
33use zebra_chain::{
34 block::{self, Height},
35 chain_sync_status::ChainSyncStatus,
36 chain_tip::ChainTip,
37 transaction::UnminedTxId,
38};
39use zebra_consensus::{error::TransactionError, transaction};
40use zebra_network::{self as zn, PeerSocketAddr};
41use zebra_node_services::mempool::{Gossip, MempoolChange, MempoolTxSubscriber, Request, Response};
42use zebra_state as zs;
43use zebra_state::{ChainTipChange, TipAction};
44
45use crate::components::sync::SyncStatus;
46
47pub mod config;
48mod crawler;
49pub mod downloads;
50mod error;
51pub mod gossip;
52mod pending_outputs;
53mod queue_checker;
54mod storage;
55
56#[cfg(test)]
57mod tests;
58
59pub use crate::BoxError;
60
61pub use config::Config;
62pub use crawler::Crawler;
63pub use error::MempoolError;
64pub use gossip::gossip_mempool_transaction_id;
65pub use queue_checker::QueueChecker;
66pub use storage::{
67 ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
68};
69
70#[cfg(test)]
71pub use self::tests::UnboxMempoolError;
72
73use downloads::{
74 Downloads as TxDownloads, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
75 TRANSACTION_VERIFY_TIMEOUT,
76};
77
78type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
79type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
80type TxVerifier = Buffer<
81 BoxService<transaction::Request, transaction::Response, TransactionError>,
82 transaction::Request,
83>;
84type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
85
86#[allow(clippy::large_enum_variant)]
93#[derive(Default)]
94enum ActiveState {
95 #[default]
97 Disabled,
98
99 Enabled {
101 storage: Storage,
108
109 tx_downloads: Pin<Box<InboundTxDownloads>>,
111
112 last_seen_tip_hash: block::Hash,
116 },
117}
118
119impl ActiveState {
120 fn take(&mut self) -> Self {
122 std::mem::take(self)
123 }
124
125 fn transaction_retry_requests(&self) -> Vec<Gossip> {
127 match self {
128 ActiveState::Disabled => Vec::new(),
129 ActiveState::Enabled {
130 storage,
131 tx_downloads,
132 ..
133 } => {
134 let mut transactions = Vec::new();
135
136 let storage = storage
137 .transactions()
138 .values()
139 .map(|tx| tx.transaction.clone().into());
140 transactions.extend(storage);
141
142 let pending = tx_downloads.transaction_requests().cloned();
143 transactions.extend(pending);
144
145 transactions
146 }
147 }
148 }
149
150 #[cfg(feature = "progress-bar")]
153 fn queued_transaction_count(&self) -> usize {
154 match self {
155 ActiveState::Disabled => 0,
156 ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
157 }
158 }
159
160 #[cfg(feature = "progress-bar")]
162 fn transaction_count(&self) -> usize {
163 match self {
164 ActiveState::Disabled => 0,
165 ActiveState::Enabled { storage, .. } => storage.transaction_count(),
166 }
167 }
168
169 #[cfg(feature = "progress-bar")]
172 fn total_cost(&self) -> u64 {
173 match self {
174 ActiveState::Disabled => 0,
175 ActiveState::Enabled { storage, .. } => storage.total_cost(),
176 }
177 }
178
179 #[cfg(feature = "progress-bar")]
184 pub fn total_serialized_size(&self) -> usize {
185 match self {
186 ActiveState::Disabled => 0,
187 ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
188 }
189 }
190
191 #[cfg(feature = "progress-bar")]
194 fn rejected_transaction_count(&mut self) -> usize {
195 match self {
196 ActiveState::Disabled => 0,
197 ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
198 }
199 }
200}
201
202pub struct Mempool {
208 config: Config,
210
211 active_state: ActiveState,
213
214 sync_status: SyncStatus,
216
217 debug_enable_at_height: Option<Height>,
219
220 latest_chain_tip: zs::LatestChainTip,
222
223 chain_tip_change: ChainTipChange,
226
227 outbound: Outbound,
230
231 state: State,
234
235 tx_verifier: TxVerifier,
238
239 transaction_sender: broadcast::Sender<MempoolChange>,
242
243 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
245
246 #[cfg(feature = "progress-bar")]
251 queued_count_bar: Option<howudoin::Tx>,
252
253 #[cfg(feature = "progress-bar")]
256 transaction_count_bar: Option<howudoin::Tx>,
257
258 #[cfg(feature = "progress-bar")]
261 transaction_cost_bar: Option<howudoin::Tx>,
262
263 #[cfg(feature = "progress-bar")]
266 rejected_count_bar: Option<howudoin::Tx>,
267}
268
269impl Mempool {
270 #[allow(clippy::too_many_arguments)]
271 pub(crate) fn new(
272 config: &Config,
273 outbound: Outbound,
274 state: State,
275 tx_verifier: TxVerifier,
276 sync_status: SyncStatus,
277 latest_chain_tip: zs::LatestChainTip,
278 chain_tip_change: ChainTipChange,
279 misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
280 ) -> (Self, MempoolTxSubscriber) {
281 let (transaction_sender, _) =
282 tokio::sync::broadcast::channel(gossip::MAX_CHANGES_BEFORE_SEND * 2);
283 let transaction_subscriber = MempoolTxSubscriber::new(transaction_sender.clone());
284
285 let mut service = Mempool {
286 config: config.clone(),
287 active_state: ActiveState::Disabled,
288 sync_status,
289 debug_enable_at_height: config.debug_enable_at_height.map(Height),
290 latest_chain_tip,
291 chain_tip_change,
292 outbound,
293 state,
294 tx_verifier,
295 transaction_sender,
296 misbehavior_sender,
297 #[cfg(feature = "progress-bar")]
298 queued_count_bar: None,
299 #[cfg(feature = "progress-bar")]
300 transaction_count_bar: None,
301 #[cfg(feature = "progress-bar")]
302 transaction_cost_bar: None,
303 #[cfg(feature = "progress-bar")]
304 rejected_count_bar: None,
305 };
306
307 service.update_state(None);
310
311 (service, transaction_subscriber)
312 }
313
314 fn is_enabled_by_debug(&self) -> bool {
316 let mut is_debug_enabled = false;
317
318 if self.debug_enable_at_height.is_none() {
320 return is_debug_enabled;
321 }
322
323 let enable_at_height = self
324 .debug_enable_at_height
325 .expect("unexpected debug_enable_at_height: just checked for None");
326
327 if let Some(best_tip_height) = self.latest_chain_tip.best_tip_height() {
328 is_debug_enabled = best_tip_height >= enable_at_height;
329
330 if is_debug_enabled && !self.is_enabled() {
331 info!(
332 ?best_tip_height,
333 ?enable_at_height,
334 "enabling mempool for debugging"
335 );
336 }
337 }
338
339 is_debug_enabled
340 }
341
342 fn update_state(&mut self, tip_action: Option<&TipAction>) -> bool {
350 let is_close_to_tip = self.sync_status.is_close_to_tip() || self.is_enabled_by_debug();
351
352 match (is_close_to_tip, self.is_enabled(), tip_action) {
353 (false, false, _) | (true, true, _) | (true, false, None) => return false,
355
356 (true, false, Some(tip_action)) => {
358 let (last_seen_tip_hash, tip_height) = tip_action.best_tip_hash_and_height();
359
360 info!(?tip_height, "activating mempool: Zebra is close to the tip");
361
362 let tx_downloads = Box::pin(TxDownloads::new(
363 Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
364 Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
365 self.state.clone(),
366 ));
367 self.active_state = ActiveState::Enabled {
368 storage: storage::Storage::new(&self.config),
369 tx_downloads,
370 last_seen_tip_hash,
371 };
372 }
373
374 (false, true, _) => {
376 info!(
377 tip_height = ?self.latest_chain_tip.best_tip_height(),
378 "deactivating mempool: Zebra is syncing lots of blocks"
379 );
380
381 self.active_state = ActiveState::Disabled;
384 }
385 };
386
387 true
388 }
389
390 pub fn is_enabled(&self) -> bool {
392 match self.active_state {
393 ActiveState::Disabled => false,
394 ActiveState::Enabled { .. } => true,
395 }
396 }
397
398 fn remove_expired_from_peer_list(
400 send_to_peers_ids: &HashSet<UnminedTxId>,
401 expired_transactions: &HashSet<UnminedTxId>,
402 ) -> HashSet<UnminedTxId> {
403 send_to_peers_ids
404 .iter()
405 .filter(|id| !expired_transactions.contains(id))
406 .copied()
407 .collect()
408 }
409
410 fn update_metrics(&mut self) {
412 #[cfg(feature = "progress-bar")]
414 if matches!(howudoin::cancelled(), Some(true)) {
415 self.disable_metrics();
416 return;
417 }
418
419 #[cfg(feature = "progress-bar")]
421 if self.is_enabled()
422 && (self.queued_count_bar.is_none()
423 || self.transaction_count_bar.is_none()
424 || self.transaction_cost_bar.is_none()
425 || self.rejected_count_bar.is_none())
426 {
427 let _max_transaction_count = self.config.tx_cost_limit
428 / zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
429
430 let transaction_count_bar = *howudoin::new_root()
431 .label("Mempool Transactions")
432 .set_pos(0u64);
433 let transaction_cost_bar = howudoin::new_with_parent(transaction_count_bar.id())
436 .label("Mempool Cost")
437 .set_pos(0u64)
438 .fmt_as_bytes(true);
440
441 let queued_count_bar = *howudoin::new_with_parent(transaction_cost_bar.id())
442 .label("Mempool Queue")
443 .set_pos(0u64);
444 let rejected_count_bar = *howudoin::new_with_parent(queued_count_bar.id())
449 .label("Mempool Rejects")
450 .set_pos(0u64);
451 self.transaction_count_bar = Some(transaction_count_bar);
456 self.transaction_cost_bar = Some(transaction_cost_bar);
457 self.queued_count_bar = Some(queued_count_bar);
458 self.rejected_count_bar = Some(rejected_count_bar);
459 }
460
461 #[cfg(feature = "progress-bar")]
463 if let (
464 Some(queued_count_bar),
465 Some(transaction_count_bar),
466 Some(transaction_cost_bar),
467 Some(rejected_count_bar),
468 ) = (
469 self.queued_count_bar,
470 self.transaction_count_bar,
471 self.transaction_cost_bar,
472 self.rejected_count_bar,
473 ) {
474 let queued_count = self.active_state.queued_transaction_count();
475 let transaction_count = self.active_state.transaction_count();
476
477 let transaction_cost = self.active_state.total_cost();
478 let transaction_size = self.active_state.total_serialized_size();
479 let transaction_size =
480 indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
481
482 let rejected_count = self.active_state.rejected_transaction_count();
483
484 queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
485
486 transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
487
488 transaction_cost_bar
493 .set_pos(transaction_cost)
494 .desc(format!("Actual size {transaction_size}"));
495
496 rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
497 }
498 }
499
500 fn disable_metrics(&self) {
502 #[cfg(feature = "progress-bar")]
503 {
504 if let Some(bar) = self.queued_count_bar {
505 bar.close()
506 }
507 if let Some(bar) = self.transaction_count_bar {
508 bar.close()
509 }
510 if let Some(bar) = self.transaction_cost_bar {
511 bar.close()
512 }
513 if let Some(bar) = self.rejected_count_bar {
514 bar.close()
515 }
516 }
517 }
518}
519
520impl Service<Request> for Mempool {
521 type Response = Response;
522 type Error = BoxError;
523 type Future =
524 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
525
526 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
527 let tip_action = self.chain_tip_change.last_tip_change();
528
529 let is_state_changed = self.update_state(tip_action.as_ref());
531
532 tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool...");
533
534 if !self.is_enabled() {
537 self.update_metrics();
538
539 return Poll::Ready(Ok(()));
540 }
541
542 if !is_state_changed && matches!(tip_action, Some(TipAction::Reset { .. })) {
547 info!(
548 tip_height = ?tip_action.as_ref().unwrap().best_tip_height(),
549 "resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
550 );
551
552 let previous_state = self.active_state.take();
553 let tx_retries = previous_state.transaction_retry_requests();
554
555 std::mem::drop(previous_state);
562
563 self.update_state(tip_action.as_ref());
565
566 if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
569 info!(
570 transactions = tx_retries.len(),
571 "re-verifying mempool transactions after a chain fork"
572 );
573
574 for tx in tx_retries {
575 let _result = tx_downloads.download_if_needed_and_verify(tx, None);
578 }
579 }
580
581 self.update_metrics();
582
583 return Poll::Ready(Ok(()));
584 }
585
586 if let ActiveState::Enabled {
587 storage,
588 tx_downloads,
589 last_seen_tip_hash,
590 } = &mut self.active_state
591 {
592 let mut send_to_peers_ids = HashSet::<_>::new();
594 let mut invalidated_ids = HashSet::<_>::new();
595 let mut mined_mempool_ids = HashSet::<_>::new();
596
597 let best_tip_height = self.latest_chain_tip.best_tip_height();
598
599 while let Poll::Ready(Some(result)) = pin!(&mut *tx_downloads).poll_next(cx) {
601 match result {
602 Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height, rsp_tx))) => {
603 if best_tip_height == expected_tip_height {
610 let tx_id = tx.transaction.id;
611 let insert_result =
612 storage.insert(tx, spent_mempool_outpoints, best_tip_height);
613
614 tracing::trace!(
615 ?insert_result,
616 "got Ok(_) transaction verify, tried to store",
617 );
618
619 if let Ok(inserted_id) = insert_result {
620 send_to_peers_ids.insert(inserted_id);
622 } else {
623 invalidated_ids.insert(tx_id);
624 }
625
626 if let Some(rsp_tx) = rsp_tx {
628 let _ = rsp_tx
629 .send(insert_result.map(|_| ()).map_err(|err| err.into()));
630 }
631 } else {
632 tracing::trace!("chain grew during tx verification, retrying ..",);
633
634 let _result = tx_downloads
636 .download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
637 }
638 }
639 Ok(Err((tx_id, error))) => {
640 if let TransactionDownloadVerifyError::Invalid {
641 error,
642 advertiser_addr: Some(advertiser_addr),
643 } = &error
644 {
645 if error.mempool_misbehavior_score() != 0 {
646 let _ = self.misbehavior_sender.try_send((
647 *advertiser_addr,
648 error.mempool_misbehavior_score(),
649 ));
650 }
651 };
652
653 tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");
654
655 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);
656
657 invalidated_ids.insert(tx_id);
658 storage.reject_if_needed(tx_id, error);
659 }
660 Err(_elapsed) => {
661 tracing::warn!("mempool transaction failed to verify due to timeout");
667
668 metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);
669 }
670 };
671 }
672
673 if let Some(TipAction::Grow { block }) = tip_action {
675 tracing::trace!(block_height = ?block.height, "handling blocks added to tip");
676 *last_seen_tip_hash = block.hash;
677
678 let mined_ids = block.transaction_hashes.iter().cloned().collect();
681 tx_downloads.cancel(&mined_ids);
682 storage.clear_mined_dependencies(&mined_ids);
683
684 let storage::RemovedTransactionIds { mined, invalidated } =
685 storage.reject_and_remove_same_effects(&mined_ids, block.transactions);
686
687 storage.clear_tip_rejections();
690
691 mined_mempool_ids.extend(mined);
692 invalidated_ids.extend(invalidated);
693 }
694
695 if let Some(tip_height) = best_tip_height {
700 let expired_transactions = storage.remove_expired_transactions(tip_height);
701 send_to_peers_ids =
703 Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
704
705 if !expired_transactions.is_empty() {
706 tracing::debug!(
707 ?expired_transactions,
708 "removed expired transactions from the mempool",
709 );
710
711 invalidated_ids.extend(expired_transactions);
712 }
713 }
714
715 if !send_to_peers_ids.is_empty() {
717 tracing::trace!(
718 ?send_to_peers_ids,
719 "sending new transactions to peers and RPC listeners"
720 );
721
722 self.transaction_sender
723 .send(MempoolChange::added(send_to_peers_ids))?;
724 }
725
726 if !invalidated_ids.is_empty() {
728 tracing::trace!(
729 ?invalidated_ids,
730 "sending invalidated transactions to RPC listeners"
731 );
732
733 self.transaction_sender
734 .send(MempoolChange::invalidated(invalidated_ids))?;
735 }
736
737 if !mined_mempool_ids.is_empty() {
739 tracing::trace!(
740 ?mined_mempool_ids,
741 "sending mined transactions to RPC listeners"
742 );
743
744 self.transaction_sender
745 .send(MempoolChange::mined(mined_mempool_ids))?;
746 }
747 }
748
749 self.update_metrics();
750
751 Poll::Ready(Ok(()))
752 }
753
754 #[instrument(name = "mempool", skip(self, req))]
759 fn call(&mut self, req: Request) -> Self::Future {
760 match &mut self.active_state {
761 ActiveState::Enabled {
762 storage,
763 tx_downloads,
764 last_seen_tip_hash,
765 } => match req {
766 Request::TransactionIds => {
768 trace!(?req, "got mempool request");
769
770 let res: HashSet<_> = storage.tx_ids().collect();
771
772 trace!(?req, res_count = ?res.len(), "answered mempool request");
773
774 async move { Ok(Response::TransactionIds(res)) }.boxed()
775 }
776
777 Request::TransactionsById(ref ids) => {
778 trace!(?req, "got mempool request");
779
780 let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect();
781
782 trace!(?req, res_count = ?res.len(), "answered mempool request");
783
784 async move { Ok(Response::Transactions(res)) }.boxed()
785 }
786 Request::TransactionsByMinedId(ref ids) => {
787 trace!(?req, "got mempool request");
788
789 let res: Vec<_> = storage
790 .transactions_same_effects(ids.clone())
791 .cloned()
792 .collect();
793
794 trace!(?req, res_count = ?res.len(), "answered mempool request");
795
796 async move { Ok(Response::Transactions(res)) }.boxed()
797 }
798 Request::TransactionWithDepsByMinedId(tx_id) => {
799 trace!(?req, "got mempool request");
800
801 let res = if let Some((transaction, dependencies)) =
802 storage.transaction_with_deps(tx_id)
803 {
804 Ok(Response::TransactionWithDeps {
805 transaction,
806 dependencies,
807 })
808 } else {
809 Err("transaction not found in mempool".into())
810 };
811
812 trace!(?req, ?res, "answered mempool request");
813
814 async move { res }.boxed()
815 }
816
817 Request::AwaitOutput(outpoint) => {
818 trace!(?req, "got mempool request");
819
820 let response_fut = storage.pending_outputs.queue(outpoint);
821
822 if let Some(output) = storage.created_output(&outpoint) {
823 storage.pending_outputs.respond(&outpoint, output)
824 }
825
826 trace!("answered mempool request");
827
828 response_fut.boxed()
829 }
830
831 Request::FullTransactions => {
832 trace!(?req, "got mempool request");
833
834 let transactions: Vec<_> = storage.transactions().values().cloned().collect();
835 let transaction_dependencies = storage.transaction_dependencies().clone();
836
837 trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");
838
839 let response = Response::FullTransactions {
840 transactions,
841 transaction_dependencies,
842 last_seen_tip_hash: *last_seen_tip_hash,
843 };
844
845 async move { Ok(response) }.boxed()
846 }
847
848 Request::RejectedTransactionIds(ref ids) => {
849 trace!(?req, "got mempool request");
850
851 let res = storage.rejected_transactions(ids.clone()).collect();
852
853 trace!(?req, ?res, "answered mempool request");
854
855 async move { Ok(Response::RejectedTransactionIds(res)) }.boxed()
856 }
857
858 Request::Queue(gossiped_txs) => {
860 trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
861
862 let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
863 gossiped_txs
864 .into_iter()
865 .map(
866 |gossiped_tx| -> Result<
867 oneshot::Receiver<Result<(), BoxError>>,
868 MempoolError,
869 > {
870 let (rsp_tx, rsp_rx) = oneshot::channel();
871 storage.should_download_or_verify(gossiped_tx.id())?;
872 tx_downloads
873 .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
874
875 Ok(rsp_rx)
876 },
877 )
878 .map(|result| result.map_err(BoxError::from))
879 .collect();
880
881 self.update_metrics();
883
884 async move { Ok(Response::Queued(rsp)) }.boxed()
885 }
886
887 Request::CheckForVerifiedTransactions => {
889 trace!(?req, "got mempool request");
890
891 async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed()
893 }
894 },
895 ActiveState::Disabled => {
896 trace!("got mempool request while mempool is disabled");
899
900 let resp = match req {
905 Request::TransactionIds => Response::TransactionIds(Default::default()),
907
908 Request::TransactionsById(_) => Response::Transactions(Default::default()),
909 Request::TransactionsByMinedId(_) => Response::Transactions(Default::default()),
910 Request::TransactionWithDepsByMinedId(_) | Request::AwaitOutput(_) => {
911 return async move {
912 Err("mempool is not active: wait for Zebra to sync to the tip".into())
913 }
914 .boxed()
915 }
916
917 Request::FullTransactions => {
918 return async move {
919 Err("mempool is not active: wait for Zebra to sync to the tip".into())
920 }
921 .boxed()
922 }
923
924 Request::RejectedTransactionIds(_) => {
925 Response::RejectedTransactionIds(Default::default())
926 }
927
928 Request::Queue(gossiped_txs) => Response::Queued(
930 iter::repeat_n(MempoolError::Disabled, gossiped_txs.len())
933 .map(BoxError::from)
934 .map(Err)
935 .collect(),
936 ),
937
938 Request::CheckForVerifiedTransactions => {
941 Response::CheckedForVerifiedTransactions
943 }
944 };
945
946 async move { Ok(resp) }.boxed()
947 }
948 }
949 }
950}
951
952impl Drop for Mempool {
953 fn drop(&mut self) {
954 self.disable_metrics();
955 }
956}