zebrad/components/mempool/gossip.rs
1//! A task that gossips any [`zebra_chain::transaction::UnminedTxId`] that enters the mempool to peers.
2//!
3//! This module is just a function [`gossip_mempool_transaction_id`] that waits for mempool
4//! insertion events received in a channel and broadcasts the transactions to peers.
5
6use tokio::sync::broadcast::{
7 self,
8 error::{RecvError, TryRecvError},
9};
10use tower::{timeout::Timeout, Service, ServiceExt};
11
12use zebra_network::MAX_TX_INV_IN_SENT_MESSAGE;
13
14use zebra_network as zn;
15use zebra_node_services::mempool::MempoolChange;
16
17use crate::{
18 components::sync::{PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
19 BoxError,
20};
21
22/// The maximum number of channel messages we will combine into a single peer broadcast.
23pub const MAX_CHANGES_BEFORE_SEND: usize = 10;
24
25/// Runs continuously, gossiping new [`UnminedTxId`](zebra_chain::transaction::UnminedTxId) to peers.
26///
27/// Broadcasts any new [`UnminedTxId`](zebra_chain::transaction::UnminedTxId)s that
28/// are stored in the mempool to multiple ready peers.
29pub async fn gossip_mempool_transaction_id<ZN>(
30 mut receiver: broadcast::Receiver<MempoolChange>,
31 broadcast_network: ZN,
32) -> Result<(), BoxError>
33where
34 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
35 ZN::Future: Send,
36{
37 let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
38 .try_into()
39 .expect("constant fits in usize");
40
41 info!("initializing transaction gossip task");
42
43 // use the same timeout as tips requests,
44 // so broadcasts don't delay the syncer too long
45 let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
46
47 loop {
48 let mut combined_changes = 1;
49
50 // once we get new data in the channel, broadcast to peers
51 //
52 // the mempool automatically combines some transaction IDs that arrive close together,
53 // and this task also combines the changes that are in the channel before sending
54 let mut txs = loop {
55 match receiver.recv().await {
56 Ok(mempool_change) if mempool_change.is_added() => {
57 break mempool_change.into_tx_ids()
58 }
59 Ok(_) => {
60 // ignore other changes, we only want to gossip added transactions
61 continue;
62 }
63 Err(RecvError::Lagged(skip_count)) => info!(
64 ?skip_count,
65 "dropped transactions before gossiping due to heavy mempool or network load"
66 ),
67 Err(closed @ RecvError::Closed) => Err(closed)?,
68 }
69 };
70
71 // also combine transaction IDs that arrived shortly after this one,
72 // but limit the number of changes and the number of transaction IDs
73 // (the network layer handles the actual limits, this just makes sure the loop terminates)
74 //
75 // TODO: If some amount of time passes (300ms?) before reaching MAX_CHANGES_BEFORE_SEND or
76 // max_tx_inv_in_message, flush messages anyway.
77 while combined_changes <= MAX_CHANGES_BEFORE_SEND && txs.len() < max_tx_inv_in_message {
78 match receiver.try_recv() {
79 Ok(mempool_change) if mempool_change.is_added() => {
80 txs.extend(mempool_change.into_tx_ids().into_iter())
81 }
82 Ok(_) => {
83 // ignore other changes, we only want to gossip added transactions
84 continue;
85 }
86 Err(TryRecvError::Empty) => break,
87 Err(TryRecvError::Lagged(skip_count)) => info!(
88 ?skip_count,
89 "dropped transactions before gossiping due to heavy mempool or network load"
90 ),
91 Err(closed @ TryRecvError::Closed) => Err(closed)?,
92 }
93
94 combined_changes += 1;
95 }
96
97 let txs_len = txs.len();
98 let request = zn::Request::AdvertiseTransactionIds(txs);
99
100 info!(%request, changes = %combined_changes, "sending mempool transaction broadcast");
101 debug!(
102 ?request,
103 changes = ?combined_changes,
104 "full list of mempool transactions in broadcast"
105 );
106
107 // broadcast requests don't return errors, and we'd just want to ignore them anyway
108 let _ = broadcast_network.ready().await?.call(request).await;
109
110 metrics::counter!("mempool.gossiped.transactions.total").increment(txs_len as u64);
111
112 // wait for at least the network timeout between gossips
113 //
114 // in practice, transactions arrive every 1-20 seconds,
115 // so waiting 6 seconds can delay transaction propagation, in order to reduce peer load
116 tokio::time::sleep(PEER_GOSSIP_DELAY).await;
117 }
118}