zebra_network/peer_set/
inventory_registry.rs

1//! Inventory Registry Implementation
2//!
3//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
4
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{FutureExt, Stream, StreamExt};
11use indexmap::IndexMap;
12use tokio::{
13    sync::broadcast,
14    time::{self, Instant},
15};
16use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream};
17
18use zebra_chain::serialization::AtLeastOne;
19
20use crate::{
21    constants::INVENTORY_ROTATION_INTERVAL,
22    protocol::{external::InventoryHash, internal::InventoryResponse},
23    BoxError, PeerSocketAddr,
24};
25
26use self::update::Update;
27
28/// Underlying type for the alias InventoryStatus::*
29use InventoryResponse::*;
30
31pub mod update;
32
33#[cfg(test)]
34mod tests;
35
36/// The maximum number of inventory hashes we will track from a single peer.
37///
38/// # Security
39///
40/// This limits known memory denial of service attacks like <https://invdos.net/> to a total of:
41/// ```text
42/// 1000 inventory * 2 maps * 32-64 bytes per inventory = less than 1 MB
43/// 1000 inventory * 70 peers * 2 maps * 6-18 bytes per address = up to 3 MB
44/// ```
45///
46/// Since the inventory registry is an efficiency optimisation, which falls back to a
47/// random peer, we only need to track a small number of hashes for available inventory.
48///
49/// But we want to be able to track a significant amount of missing inventory,
50/// to limit queries for globally missing inventory.
51//
52// TODO: split this into available (25) and missing (1000 or more?)
53pub const MAX_INV_PER_MAP: usize = 1000;
54
55/// The maximum number of peers we will track inventory for.
56///
57/// # Security
58///
59/// This limits known memory denial of service attacks. See [`MAX_INV_PER_MAP`] for details.
60///
61/// Since the inventory registry is an efficiency optimisation, which falls back to a
62/// random peer, we only need to track a small number of peers per inv for available inventory.
63///
64/// But we want to be able to track missing inventory for almost all our peers,
65/// so we only query a few peers for inventory that is genuinely missing from the network.
66//
67// TODO: split this into available (25) and missing (70)
68pub const MAX_PEERS_PER_INV: usize = 70;
69
70/// A peer inventory status, which tracks a hash for both available and missing inventory.
71pub type InventoryStatus<T> = InventoryResponse<T, T>;
72
73/// A peer inventory status change, used in the inventory status channel.
74///
75/// For performance reasons, advertisements should only be tracked
76/// for hashes that are rare on the network.
77/// So Zebra only tracks single-block inventory messages.
78///
79/// For security reasons, all `notfound` rejections should be tracked.
80/// This also helps with performance, if the hash is rare on the network.
81pub type InventoryChange = InventoryStatus<(AtLeastOne<InventoryHash>, PeerSocketAddr)>;
82
83/// An internal marker used in inventory status hash maps.
84type InventoryMarker = InventoryStatus<()>;
85
86/// An Inventory Registry for tracking recent inventory advertisements and missing inventory.
87///
88/// For more details please refer to the [RFC].
89///
90/// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
91pub struct InventoryRegistry {
92    /// Map tracking the latest inventory status from the current interval
93    /// period.
94    //
95    // TODO: split maps into available and missing, so we can limit them separately.
96    current: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
97
98    /// Map tracking inventory statuses from the previous interval period.
99    prev: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
100
101    /// Stream of incoming inventory statuses to register.
102    inv_stream: Pin<
103        Box<dyn Stream<Item = Result<InventoryChange, BroadcastStreamRecvError>> + Send + 'static>,
104    >,
105
106    /// Interval tracking when we should next rotate our maps.
107    interval: IntervalStream,
108}
109
110impl std::fmt::Debug for InventoryRegistry {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("InventoryRegistry")
113            .field("current", &self.current)
114            .field("prev", &self.prev)
115            .finish()
116    }
117}
118
119impl InventoryChange {
120    /// Returns a new available inventory change from a single hash.
121    pub fn new_available(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
122        InventoryStatus::Available((AtLeastOne::from_one(hash), peer))
123    }
124
125    /// Returns a new missing inventory change from a single hash.
126    #[allow(dead_code)]
127    pub fn new_missing(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
128        InventoryStatus::Missing((AtLeastOne::from_one(hash), peer))
129    }
130
131    /// Returns a new available multiple inventory change, if `hashes` contains at least one change.
132    pub fn new_available_multi<'a>(
133        hashes: impl IntoIterator<Item = &'a InventoryHash>,
134        peer: PeerSocketAddr,
135    ) -> Option<Self> {
136        let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
137
138        // # Security
139        //
140        // Don't send more hashes than we're going to store.
141        // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
142        //
143        //  This limits known memory denial of service attacks to:
144        // `1000 hashes * 200 peers/channel capacity * 32-64 bytes = up to 12 MB`
145        hashes.truncate(MAX_INV_PER_MAP);
146
147        let hashes = hashes.try_into().ok();
148
149        hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
150    }
151
152    /// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
153    pub fn new_missing_multi<'a>(
154        hashes: impl IntoIterator<Item = &'a InventoryHash>,
155        peer: PeerSocketAddr,
156    ) -> Option<Self> {
157        let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
158
159        // # Security
160        //
161        // Don't send more hashes than we're going to store.
162        // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
163        hashes.truncate(MAX_INV_PER_MAP);
164
165        let hashes = hashes.try_into().ok();
166
167        hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
168    }
169}
170
171impl<T> InventoryStatus<T> {
172    /// Get a marker for the status, without any associated data.
173    pub fn marker(&self) -> InventoryMarker {
174        self.as_ref().map(|_inner| ())
175    }
176
177    /// Maps an `InventoryStatus<T>` to `InventoryStatus<U>` by applying a function to a contained value.
178    pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
179        // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#844
180        match self {
181            Available(item) => Available(f(item)),
182            Missing(item) => Missing(f(item)),
183        }
184    }
185}
186
187impl<T: Clone> InventoryStatus<T> {
188    /// Returns a clone of the inner item, regardless of status.
189    pub fn to_inner(&self) -> T {
190        match self {
191            Available(item) | Missing(item) => item.clone(),
192        }
193    }
194}
195
196impl InventoryRegistry {
197    /// Returns a new Inventory Registry for `inv_stream`.
198    pub fn new(inv_stream: broadcast::Receiver<InventoryChange>) -> Self {
199        let interval = INVENTORY_ROTATION_INTERVAL;
200
201        // Don't do an immediate rotation, current and prev are already empty.
202        let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
203        // # Security
204        //
205        // If the rotation time is late, execute as many ticks as needed to catch up.
206        // This is a tradeoff between memory usage and quickly accessing remote data
207        // under heavy load. Bursting prioritises lower memory usage.
208        //
209        // Skipping or delaying could keep peer inventory in memory for a longer time,
210        // further increasing memory load or delays due to virtual memory swapping.
211        interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
212
213        Self {
214            current: Default::default(),
215            prev: Default::default(),
216            inv_stream: BroadcastStream::new(inv_stream).boxed(),
217            interval: IntervalStream::new(interval),
218        }
219    }
220
221    /// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory.
222    pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
223        self.status_peers(hash)
224            .filter_map(|addr_status| addr_status.available())
225    }
226
227    /// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory.
228    #[allow(dead_code)]
229    pub fn missing_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
230        self.status_peers(hash)
231            .filter_map(|addr_status| addr_status.missing())
232    }
233
234    /// Returns an iterator over peer inventory statuses for `hash`.
235    ///
236    /// Prefers current statuses to previously rotated statuses for the same peer.
237    pub fn status_peers(
238        &self,
239        hash: InventoryHash,
240    ) -> impl Iterator<Item = InventoryStatus<&PeerSocketAddr>> {
241        let prev = self.prev.get(&hash);
242        let current = self.current.get(&hash);
243
244        // # Security
245        //
246        // Prefer `current` statuses for the same peer over previously rotated statuses.
247        // This makes sure Zebra is using the most up-to-date network information.
248        let prev = prev
249            .into_iter()
250            .flatten()
251            .filter(move |(addr, _status)| !self.has_current_status(hash, **addr));
252        let current = current.into_iter().flatten();
253
254        current
255            .chain(prev)
256            .map(|(addr, status)| status.map(|()| addr))
257    }
258
259    /// Returns true if there is a current status entry for `hash` and `addr`.
260    pub fn has_current_status(&self, hash: InventoryHash, addr: PeerSocketAddr) -> bool {
261        self.current
262            .get(&hash)
263            .and_then(|current| current.get(&addr))
264            .is_some()
265    }
266
267    /// Returns an iterator over peer inventory status hashes.
268    ///
269    /// Yields current statuses first, then previously rotated statuses.
270    /// This can include multiple statuses for the same hash.
271    #[allow(dead_code)]
272    pub fn status_hashes(
273        &self,
274    ) -> impl Iterator<Item = (&InventoryHash, &IndexMap<PeerSocketAddr, InventoryMarker>)> {
275        self.current.iter().chain(self.prev.iter())
276    }
277
278    /// Returns a future that waits for new registry updates.
279    #[allow(dead_code)]
280    pub fn update(&mut self) -> Update {
281        Update::new(self)
282    }
283
284    /// Drive periodic inventory tasks.
285    ///
286    /// Rotates the inventory HashMaps on every timer tick.
287    /// Drains the inv_stream channel and registers all advertised inventory.
288    ///
289    /// Returns an error if the inventory channel is closed.
290    ///
291    /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending`
292    /// if there was no inventory change. Always registers a wakeup for the next inventory update
293    /// or rotation, even when it returns `Ok`.
294    pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
295        let mut result = Poll::Pending;
296
297        // # Correctness
298        //
299        // Registers the current task for wakeup when the timer next becomes ready.
300        // (But doesn't return, because we also want to register the task for wakeup when more
301        // inventory arrives.)
302        //
303        // # Security
304        //
305        // Only rotate one inventory per peer request, to give the next inventory
306        // time to gather some peer advertisements. This is a tradeoff between
307        // memory usage and quickly accessing remote data under heavy load.
308        //
309        // This prevents a burst edge case where all inventory is emptied after
310        // two interval ticks are delayed.
311        if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
312            self.rotate();
313            result = Poll::Ready(Ok(()));
314        }
315
316        // This module uses a broadcast channel instead of an mpsc channel, even
317        // though there's a single consumer of inventory advertisements, because
318        // the broadcast channel has ring-buffer behavior: when the channel is
319        // full, sending a new message displaces the oldest message in the
320        // channel.
321        //
322        // This is the behavior we want for inventory advertisements, because we
323        // want to have a bounded buffer of unprocessed advertisements, and we
324        // want to prioritize new inventory (which is likely only at a specific
325        // peer) over old inventory (which is likely more widely distributed).
326        //
327        // The broadcast channel reports dropped messages by returning
328        // `RecvError::Lagged`. It's crucial that we handle that error here
329        // rather than propagating it through the peer set's Service::poll_ready
330        // implementation, where reporting a failure means reporting a permanent
331        // failure of the peer set.
332
333        // Returns Pending if all messages are processed, but the channel could get more.
334        loop {
335            let channel_result = self.inv_stream.next().poll_unpin(cx);
336
337            match channel_result {
338                Poll::Ready(Some(Ok(change))) => {
339                    self.register(change);
340                    result = Poll::Ready(Ok(()));
341                }
342                Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => {
343                    // This isn't a fatal inventory error, it's expected behaviour when Zebra is
344                    // under load from peers.
345                    metrics::counter!("pool.inventory.dropped").increment(1);
346                    metrics::counter!("pool.inventory.dropped.messages").increment(count);
347
348                    // If this message happens a lot, we should improve inventory registry
349                    // performance, or poll the registry or peer set in a separate task.
350                    info!(count, "dropped lagged inventory advertisements");
351                }
352                Poll::Ready(None) => {
353                    // If the channel is empty and returns None, all senders, including the one in
354                    // the handshaker, have been dropped, which really is a permanent failure.
355                    result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into()));
356                }
357                Poll::Pending => {
358                    break;
359                }
360            }
361        }
362
363        result
364    }
365
366    /// Record the given inventory `change` for the peer `addr`.
367    ///
368    /// `Missing` markers are not updated until the registry rotates, for security reasons.
369    fn register(&mut self, change: InventoryChange) {
370        let new_status = change.marker();
371        let (invs, addr) = change.to_inner();
372
373        for inv in invs {
374            use InventoryHash::*;
375            assert!(
376                matches!(inv, Block(_) | Tx(_) | Wtx(_)),
377                "unexpected inventory type: {inv:?} from peer: {addr:?}",
378            );
379
380            let hash_peers = self.current.entry(inv).or_default();
381
382            // # Security
383            //
384            // Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
385            // and funnel multiple failing requests to themselves.
386            if let Some(old_status) = hash_peers.get(&addr) {
387                if old_status.is_missing() && new_status.is_available() {
388                    debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
389                    continue;
390                }
391
392                debug!(
393                    ?new_status,
394                    ?old_status,
395                    ?addr,
396                    ?inv,
397                    "keeping both new and old status"
398                );
399            }
400
401            let replaced_status = hash_peers.insert(addr, new_status);
402
403            debug!(
404                ?new_status,
405                ?replaced_status,
406                ?addr,
407                ?inv,
408                "inserted new status"
409            );
410
411            // # Security
412            //
413            // Limit the number of stored peers per hash, removing the oldest entries,
414            // because newer entries are likely to be more relevant.
415            //
416            // TODO: do random or weighted random eviction instead?
417            if hash_peers.len() > MAX_PEERS_PER_INV {
418                // Performance: `MAX_PEERS_PER_INV` is small, so O(n) performance is acceptable.
419                hash_peers.shift_remove_index(0);
420            }
421
422            // # Security
423            //
424            // Limit the number of stored inventory hashes, removing the oldest entries,
425            // because newer entries are likely to be more relevant.
426            //
427            // TODO: do random or weighted random eviction instead?
428            if self.current.len() > MAX_INV_PER_MAP {
429                // Performance: `MAX_INV_PER_MAP` is small, so O(n) performance is acceptable.
430                self.current.shift_remove_index(0);
431            }
432        }
433    }
434
435    /// Replace the prev HashMap with current's and replace current with an empty
436    /// HashMap
437    fn rotate(&mut self) {
438        self.prev = std::mem::take(&mut self.current);
439    }
440}