zebrad/components/mempool/
gossip.rs1use std::collections::HashSet;
7
8use tokio::sync::broadcast::{
9 self,
10 error::{RecvError, TryRecvError},
11};
12use tower::{timeout::Timeout, Service, ServiceExt};
13
14use zebra_chain::transaction::UnminedTxId;
15use zebra_network::MAX_TX_INV_IN_SENT_MESSAGE;
16
17use zebra_network as zn;
18
19use crate::{
20 components::sync::{PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
21 BoxError,
22};
23
24pub const MAX_CHANGES_BEFORE_SEND: usize = 10;
26
27pub async fn gossip_mempool_transaction_id<ZN>(
31 mut receiver: broadcast::Receiver<HashSet<UnminedTxId>>,
32 broadcast_network: ZN,
33) -> Result<(), BoxError>
34where
35 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
36 ZN::Future: Send,
37{
38 let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
39 .try_into()
40 .expect("constant fits in usize");
41
42 info!("initializing transaction gossip task");
43
44 let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
47
48 loop {
49 let mut combined_changes = 1;
50
51 let mut txs = loop {
56 match receiver.recv().await {
57 Ok(txs) => break txs,
58 Err(RecvError::Lagged(skip_count)) => info!(
59 ?skip_count,
60 "dropped transactions before gossiping due to heavy mempool or network load"
61 ),
62 Err(closed @ RecvError::Closed) => Err(closed)?,
63 }
64 };
65
66 while combined_changes <= MAX_CHANGES_BEFORE_SEND && txs.len() < max_tx_inv_in_message {
70 match receiver.try_recv() {
71 Ok(extra_txs) => txs.extend(extra_txs.iter()),
72 Err(TryRecvError::Empty) => break,
73 Err(TryRecvError::Lagged(skip_count)) => info!(
74 ?skip_count,
75 "dropped transactions before gossiping due to heavy mempool or network load"
76 ),
77 Err(closed @ TryRecvError::Closed) => Err(closed)?,
78 }
79
80 combined_changes += 1;
81 }
82
83 let txs_len = txs.len();
84 let request = zn::Request::AdvertiseTransactionIds(txs);
85
86 info!(%request, changes = %combined_changes, "sending mempool transaction broadcast");
87 debug!(
88 ?request,
89 changes = ?combined_changes,
90 "full list of mempool transactions in broadcast"
91 );
92
93 let _ = broadcast_network.ready().await?.call(request).await;
95
96 metrics::counter!("mempool.gossiped.transactions.total").increment(txs_len as u64);
97
98 tokio::time::sleep(PEER_GOSSIP_DELAY).await;
103 }
104}