zebra_network/peer_set/inventory_registry.rs
1//! Inventory Registry Implementation
2//!
3//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
4
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{FutureExt, Stream, StreamExt};
11use indexmap::IndexMap;
12use tokio::{
13 sync::broadcast,
14 time::{self, Instant},
15};
16use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream};
17
18use zebra_chain::serialization::AtLeastOne;
19
20use crate::{
21 constants::INVENTORY_ROTATION_INTERVAL,
22 protocol::{external::InventoryHash, internal::InventoryResponse},
23 BoxError, PeerSocketAddr,
24};
25
26use self::update::Update;
27
28/// Underlying type for the alias InventoryStatus::*
29use InventoryResponse::*;
30
31pub mod update;
32
33#[cfg(test)]
34mod tests;
35
36/// The maximum number of inventory hashes we will track from a single peer.
37///
38/// # Security
39///
40/// This limits known memory denial of service attacks like <https://invdos.net/> to a total of:
41/// ```text
42/// 1000 inventory * 2 maps * 32-64 bytes per inventory = less than 1 MB
43/// 1000 inventory * 70 peers * 2 maps * 6-18 bytes per address = up to 3 MB
44/// ```
45///
46/// Since the inventory registry is an efficiency optimisation, which falls back to a
47/// random peer, we only need to track a small number of hashes for available inventory.
48///
49/// But we want to be able to track a significant amount of missing inventory,
50/// to limit queries for globally missing inventory.
51//
52// TODO: split this into available (25) and missing (1000 or more?)
53pub const MAX_INV_PER_MAP: usize = 1000;
54
55/// The maximum number of peers we will track inventory for.
56///
57/// # Security
58///
59/// This limits known memory denial of service attacks. See [`MAX_INV_PER_MAP`] for details.
60///
61/// Since the inventory registry is an efficiency optimisation, which falls back to a
62/// random peer, we only need to track a small number of peers per inv for available inventory.
63///
64/// But we want to be able to track missing inventory for almost all our peers,
65/// so we only query a few peers for inventory that is genuinely missing from the network.
66//
67// TODO: split this into available (25) and missing (70)
68pub const MAX_PEERS_PER_INV: usize = 70;
69
70/// A peer inventory status, which tracks a hash for both available and missing inventory.
71pub type InventoryStatus<T> = InventoryResponse<T, T>;
72
73/// A peer inventory status change, used in the inventory status channel.
74///
75/// For performance reasons, advertisements should only be tracked
76/// for hashes that are rare on the network.
77/// So Zebra only tracks single-block inventory messages.
78///
79/// For security reasons, all `notfound` rejections should be tracked.
80/// This also helps with performance, if the hash is rare on the network.
81pub type InventoryChange = InventoryStatus<(AtLeastOne<InventoryHash>, PeerSocketAddr)>;
82
83/// An internal marker used in inventory status hash maps.
84type InventoryMarker = InventoryStatus<()>;
85
86/// An Inventory Registry for tracking recent inventory advertisements and missing inventory.
87///
88/// For more details please refer to the [RFC].
89///
90/// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
91pub struct InventoryRegistry {
92 /// Map tracking the latest inventory status from the current interval
93 /// period.
94 //
95 // TODO: split maps into available and missing, so we can limit them separately.
96 current: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
97
98 /// Map tracking inventory statuses from the previous interval period.
99 prev: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
100
101 /// Stream of incoming inventory statuses to register.
102 inv_stream: Pin<
103 Box<dyn Stream<Item = Result<InventoryChange, BroadcastStreamRecvError>> + Send + 'static>,
104 >,
105
106 /// Interval tracking when we should next rotate our maps.
107 interval: IntervalStream,
108}
109
110impl std::fmt::Debug for InventoryRegistry {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("InventoryRegistry")
113 .field("current", &self.current)
114 .field("prev", &self.prev)
115 .finish()
116 }
117}
118
119impl InventoryChange {
120 /// Returns a new available inventory change from a single hash.
121 pub fn new_available(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
122 InventoryStatus::Available((AtLeastOne::from_one(hash), peer))
123 }
124
125 /// Returns a new missing inventory change from a single hash.
126 #[allow(dead_code)]
127 pub fn new_missing(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
128 InventoryStatus::Missing((AtLeastOne::from_one(hash), peer))
129 }
130
131 /// Returns a new available multiple inventory change, if `hashes` contains at least one change.
132 pub fn new_available_multi<'a>(
133 hashes: impl IntoIterator<Item = &'a InventoryHash>,
134 peer: PeerSocketAddr,
135 ) -> Option<Self> {
136 let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
137
138 // # Security
139 //
140 // Don't send more hashes than we're going to store.
141 // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
142 //
143 // This limits known memory denial of service attacks to:
144 // `1000 hashes * 200 peers/channel capacity * 32-64 bytes = up to 12 MB`
145 hashes.truncate(MAX_INV_PER_MAP);
146
147 let hashes = hashes.try_into().ok();
148
149 hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
150 }
151
152 /// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
153 pub fn new_missing_multi<'a>(
154 hashes: impl IntoIterator<Item = &'a InventoryHash>,
155 peer: PeerSocketAddr,
156 ) -> Option<Self> {
157 let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
158
159 // # Security
160 //
161 // Don't send more hashes than we're going to store.
162 // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
163 hashes.truncate(MAX_INV_PER_MAP);
164
165 let hashes = hashes.try_into().ok();
166
167 hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
168 }
169}
170
171impl<T> InventoryStatus<T> {
172 /// Get a marker for the status, without any associated data.
173 pub fn marker(&self) -> InventoryMarker {
174 self.as_ref().map(|_inner| ())
175 }
176
177 /// Maps an `InventoryStatus<T>` to `InventoryStatus<U>` by applying a function to a contained value.
178 pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
179 // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#844
180 match self {
181 Available(item) => Available(f(item)),
182 Missing(item) => Missing(f(item)),
183 }
184 }
185}
186
187impl<T: Clone> InventoryStatus<T> {
188 /// Returns a clone of the inner item, regardless of status.
189 pub fn to_inner(&self) -> T {
190 match self {
191 Available(item) | Missing(item) => item.clone(),
192 }
193 }
194}
195
196impl InventoryRegistry {
197 /// Returns a new Inventory Registry for `inv_stream`.
198 pub fn new(inv_stream: broadcast::Receiver<InventoryChange>) -> Self {
199 let interval = INVENTORY_ROTATION_INTERVAL;
200
201 // Don't do an immediate rotation, current and prev are already empty.
202 let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
203 // # Security
204 //
205 // If the rotation time is late, execute as many ticks as needed to catch up.
206 // This is a tradeoff between memory usage and quickly accessing remote data
207 // under heavy load. Bursting prioritises lower memory usage.
208 //
209 // Skipping or delaying could keep peer inventory in memory for a longer time,
210 // further increasing memory load or delays due to virtual memory swapping.
211 interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
212
213 Self {
214 current: Default::default(),
215 prev: Default::default(),
216 inv_stream: BroadcastStream::new(inv_stream).boxed(),
217 interval: IntervalStream::new(interval),
218 }
219 }
220
221 /// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory.
222 pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
223 self.status_peers(hash)
224 .filter_map(|addr_status| addr_status.available())
225 }
226
227 /// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory.
228 #[allow(dead_code)]
229 pub fn missing_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
230 self.status_peers(hash)
231 .filter_map(|addr_status| addr_status.missing())
232 }
233
234 /// Returns an iterator over peer inventory statuses for `hash`.
235 ///
236 /// Prefers current statuses to previously rotated statuses for the same peer.
237 pub fn status_peers(
238 &self,
239 hash: InventoryHash,
240 ) -> impl Iterator<Item = InventoryStatus<&PeerSocketAddr>> {
241 let prev = self.prev.get(&hash);
242 let current = self.current.get(&hash);
243
244 // # Security
245 //
246 // Prefer `current` statuses for the same peer over previously rotated statuses.
247 // This makes sure Zebra is using the most up-to-date network information.
248 let prev = prev
249 .into_iter()
250 .flatten()
251 .filter(move |(addr, _status)| !self.has_current_status(hash, **addr));
252 let current = current.into_iter().flatten();
253
254 current
255 .chain(prev)
256 .map(|(addr, status)| status.map(|()| addr))
257 }
258
259 /// Returns true if there is a current status entry for `hash` and `addr`.
260 pub fn has_current_status(&self, hash: InventoryHash, addr: PeerSocketAddr) -> bool {
261 self.current
262 .get(&hash)
263 .and_then(|current| current.get(&addr))
264 .is_some()
265 }
266
267 /// Returns an iterator over peer inventory status hashes.
268 ///
269 /// Yields current statuses first, then previously rotated statuses.
270 /// This can include multiple statuses for the same hash.
271 #[allow(dead_code)]
272 pub fn status_hashes(
273 &self,
274 ) -> impl Iterator<Item = (&InventoryHash, &IndexMap<PeerSocketAddr, InventoryMarker>)> {
275 self.current.iter().chain(self.prev.iter())
276 }
277
278 /// Returns a future that waits for new registry updates.
279 #[allow(dead_code)]
280 pub fn update(&mut self) -> Update {
281 Update::new(self)
282 }
283
284 /// Drive periodic inventory tasks.
285 ///
286 /// Rotates the inventory HashMaps on every timer tick.
287 /// Drains the inv_stream channel and registers all advertised inventory.
288 ///
289 /// Returns an error if the inventory channel is closed.
290 ///
291 /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending`
292 /// if there was no inventory change. Always registers a wakeup for the next inventory update
293 /// or rotation, even when it returns `Ok`.
294 pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
295 let mut result = Poll::Pending;
296
297 // # Correctness
298 //
299 // Registers the current task for wakeup when the timer next becomes ready.
300 // (But doesn't return, because we also want to register the task for wakeup when more
301 // inventory arrives.)
302 //
303 // # Security
304 //
305 // Only rotate one inventory per peer request, to give the next inventory
306 // time to gather some peer advertisements. This is a tradeoff between
307 // memory usage and quickly accessing remote data under heavy load.
308 //
309 // This prevents a burst edge case where all inventory is emptied after
310 // two interval ticks are delayed.
311 if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
312 self.rotate();
313 result = Poll::Ready(Ok(()));
314 }
315
316 // This module uses a broadcast channel instead of an mpsc channel, even
317 // though there's a single consumer of inventory advertisements, because
318 // the broadcast channel has ring-buffer behavior: when the channel is
319 // full, sending a new message displaces the oldest message in the
320 // channel.
321 //
322 // This is the behavior we want for inventory advertisements, because we
323 // want to have a bounded buffer of unprocessed advertisements, and we
324 // want to prioritize new inventory (which is likely only at a specific
325 // peer) over old inventory (which is likely more widely distributed).
326 //
327 // The broadcast channel reports dropped messages by returning
328 // `RecvError::Lagged`. It's crucial that we handle that error here
329 // rather than propagating it through the peer set's Service::poll_ready
330 // implementation, where reporting a failure means reporting a permanent
331 // failure of the peer set.
332
333 // Returns Pending if all messages are processed, but the channel could get more.
334 loop {
335 let channel_result = self.inv_stream.next().poll_unpin(cx);
336
337 match channel_result {
338 Poll::Ready(Some(Ok(change))) => {
339 self.register(change);
340 result = Poll::Ready(Ok(()));
341 }
342 Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => {
343 // This isn't a fatal inventory error, it's expected behaviour when Zebra is
344 // under load from peers.
345 metrics::counter!("pool.inventory.dropped").increment(1);
346 metrics::counter!("pool.inventory.dropped.messages").increment(count);
347
348 // If this message happens a lot, we should improve inventory registry
349 // performance, or poll the registry or peer set in a separate task.
350 info!(count, "dropped lagged inventory advertisements");
351 }
352 Poll::Ready(None) => {
353 // If the channel is empty and returns None, all senders, including the one in
354 // the handshaker, have been dropped, which really is a permanent failure.
355 result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into()));
356 }
357 Poll::Pending => {
358 break;
359 }
360 }
361 }
362
363 result
364 }
365
366 /// Record the given inventory `change` for the peer `addr`.
367 ///
368 /// `Missing` markers are not updated until the registry rotates, for security reasons.
369 fn register(&mut self, change: InventoryChange) {
370 let new_status = change.marker();
371 let (invs, addr) = change.to_inner();
372
373 for inv in invs {
374 use InventoryHash::*;
375 assert!(
376 matches!(inv, Block(_) | Tx(_) | Wtx(_)),
377 "unexpected inventory type: {inv:?} from peer: {addr:?}",
378 );
379
380 let hash_peers = self.current.entry(inv).or_default();
381
382 // # Security
383 //
384 // Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
385 // and funnel multiple failing requests to themselves.
386 if let Some(old_status) = hash_peers.get(&addr) {
387 if old_status.is_missing() && new_status.is_available() {
388 debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
389 continue;
390 }
391
392 debug!(
393 ?new_status,
394 ?old_status,
395 ?addr,
396 ?inv,
397 "keeping both new and old status"
398 );
399 }
400
401 let replaced_status = hash_peers.insert(addr, new_status);
402
403 debug!(
404 ?new_status,
405 ?replaced_status,
406 ?addr,
407 ?inv,
408 "inserted new status"
409 );
410
411 // # Security
412 //
413 // Limit the number of stored peers per hash, removing the oldest entries,
414 // because newer entries are likely to be more relevant.
415 //
416 // TODO: do random or weighted random eviction instead?
417 if hash_peers.len() > MAX_PEERS_PER_INV {
418 // Performance: `MAX_PEERS_PER_INV` is small, so O(n) performance is acceptable.
419 hash_peers.shift_remove_index(0);
420 }
421
422 // # Security
423 //
424 // Limit the number of stored inventory hashes, removing the oldest entries,
425 // because newer entries are likely to be more relevant.
426 //
427 // TODO: do random or weighted random eviction instead?
428 if self.current.len() > MAX_INV_PER_MAP {
429 // Performance: `MAX_INV_PER_MAP` is small, so O(n) performance is acceptable.
430 self.current.shift_remove_index(0);
431 }
432 }
433 }
434
435 /// Replace the prev HashMap with current's and replace current with an empty
436 /// HashMap
437 fn rotate(&mut self) {
438 self.prev = std::mem::take(&mut self.current);
439 }
440}