zebrad/components/mempool/gossip.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
//! A task that gossips any [`zebra_chain::transaction::UnminedTxId`] that enters the mempool to peers.
//!
//! This module is just a function [`gossip_mempool_transaction_id`] that waits for mempool
//! insertion events received in a channel and broadcasts the transactions to peers.
use std::collections::HashSet;
use tokio::sync::broadcast::{
self,
error::{RecvError, TryRecvError},
};
use tower::{timeout::Timeout, Service, ServiceExt};
use zebra_chain::transaction::UnminedTxId;
use zebra_network::MAX_TX_INV_IN_SENT_MESSAGE;
use zebra_network as zn;
use crate::{
components::sync::{PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
BoxError,
};
/// The maximum number of channel messages we will combine into a single peer broadcast.
pub const MAX_CHANGES_BEFORE_SEND: usize = 10;
/// Runs continuously, gossiping new [`UnminedTxId`] to peers.
///
/// Broadcasts any new [`UnminedTxId`]s that get stored in the mempool to multiple ready peers.
pub async fn gossip_mempool_transaction_id<ZN>(
mut receiver: broadcast::Receiver<HashSet<UnminedTxId>>,
broadcast_network: ZN,
) -> Result<(), BoxError>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
{
let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
.try_into()
.expect("constant fits in usize");
info!("initializing transaction gossip task");
// use the same timeout as tips requests,
// so broadcasts don't delay the syncer too long
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
loop {
let mut combined_changes = 1;
// once we get new data in the channel, broadcast to peers
//
// the mempool automatically combines some transaction IDs that arrive close together,
// and this task also combines the changes that are in the channel before sending
let mut txs = loop {
match receiver.recv().await {
Ok(txs) => break txs,
Err(RecvError::Lagged(skip_count)) => info!(
?skip_count,
"dropped transactions before gossiping due to heavy mempool or network load"
),
Err(closed @ RecvError::Closed) => Err(closed)?,
}
};
// also combine transaction IDs that arrived shortly after this one,
// but limit the number of changes and the number of transaction IDs
// (the network layer handles the actual limits, this just makes sure the loop terminates)
while combined_changes <= MAX_CHANGES_BEFORE_SEND && txs.len() < max_tx_inv_in_message {
match receiver.try_recv() {
Ok(extra_txs) => txs.extend(extra_txs.iter()),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Lagged(skip_count)) => info!(
?skip_count,
"dropped transactions before gossiping due to heavy mempool or network load"
),
Err(closed @ TryRecvError::Closed) => Err(closed)?,
}
combined_changes += 1;
}
let txs_len = txs.len();
let request = zn::Request::AdvertiseTransactionIds(txs);
info!(%request, changes = %combined_changes, "sending mempool transaction broadcast");
debug!(
?request,
changes = ?combined_changes,
"full list of mempool transactions in broadcast"
);
// broadcast requests don't return errors, and we'd just want to ignore them anyway
let _ = broadcast_network.ready().await?.call(request).await;
metrics::counter!("mempool.gossiped.transactions.total").increment(txs_len as u64);
// wait for at least the network timeout between gossips
//
// in practice, transactions arrive every 1-20 seconds,
// so waiting 6 seconds can delay transaction propagation, in order to reduce peer load
tokio::time::sleep(PEER_GOSSIP_DELAY).await;
}
}