zebra_network/peer_set/limit.rs
1//! Counting active connections used by Zebra.
2//!
3//! These types can be used to count any kind of active resource.
4//! But they are currently used to track the number of open connections.
5
6use std::{fmt, sync::Arc};
7
8use tokio::sync::mpsc;
9
10/// A signal sent by a [`Connection`][1] when it closes.
11///
12/// Used to count the number of open connections.
13///
14/// [1]: crate::peer::Connection
15#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
16pub struct ConnectionClosed;
17
18/// A counter for active connections.
19///
20/// Creates a [`ConnectionTracker`] to track each active connection.
21/// When these trackers are dropped, the counter gets notified.
22pub struct ActiveConnectionCounter {
23 /// The number of active peers tracked using this counter.
24 count: usize,
25
26 /// The limit for this type of connection, for diagnostics only.
27 /// The caller must enforce the limit by ignoring, delaying, or dropping connections.
28 limit: usize,
29
30 /// The label for this connection counter, typically its type.
31 label: Arc<str>,
32
33 /// The channel used to send closed connection notifications.
34 close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
35
36 /// The channel used to receive closed connection notifications.
37 close_notification_rx: mpsc::UnboundedReceiver<ConnectionClosed>,
38
39 /// Active connection count progress transmitter.
40 #[cfg(feature = "progress-bar")]
41 connection_bar: howudoin::Tx,
42}
43
44impl fmt::Debug for ActiveConnectionCounter {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 f.debug_struct("ActiveConnectionCounter")
47 .field("label", &self.label)
48 .field("count", &self.count)
49 .field("limit", &self.limit)
50 .finish()
51 }
52}
53
54impl ActiveConnectionCounter {
55 /// Create and return a new active connection counter.
56 pub fn new_counter() -> Self {
57 Self::new_counter_with(usize::MAX, "Active Connections")
58 }
59
60 /// Create and return a new active connection counter with `limit` and `label`.
61 /// The caller must check and enforce limits using [`update_count()`](Self::update_count).
62 pub fn new_counter_with<S: ToString>(limit: usize, label: S) -> Self {
63 // The number of items in this channel is bounded by the connection limit.
64 let (close_notification_tx, close_notification_rx) = mpsc::unbounded_channel();
65
66 let label = label.to_string();
67
68 #[cfg(feature = "progress-bar")]
69 let connection_bar = howudoin::new_root().label(label.clone());
70
71 Self {
72 count: 0,
73 limit,
74 label: label.into(),
75 close_notification_rx,
76 close_notification_tx,
77 #[cfg(feature = "progress-bar")]
78 connection_bar,
79 }
80 }
81
82 /// Create and return a new [`ConnectionTracker`], and add 1 to this counter.
83 ///
84 /// When the returned tracker is dropped, this counter will be notified, and decreased by 1.
85 pub fn track_connection(&mut self) -> ConnectionTracker {
86 ConnectionTracker::new(self)
87 }
88
89 /// Check for closed connection notifications, and return the current connection count.
90 pub fn update_count(&mut self) -> usize {
91 let previous_connections = self.count;
92
93 // We ignore errors here:
94 // - TryRecvError::Empty means that there are no pending close notifications
95 // - TryRecvError::Closed is unreachable, because we hold a sender
96 while let Ok(ConnectionClosed) = self.close_notification_rx.try_recv() {
97 self.count -= 1;
98
99 debug!(
100 open_connections = ?self.count,
101 ?previous_connections,
102 limit = ?self.limit,
103 label = ?self.label,
104 "a peer connection was closed",
105 );
106 }
107
108 trace!(
109 open_connections = ?self.count,
110 ?previous_connections,
111 limit = ?self.limit,
112 label = ?self.label,
113 "updated active connection count",
114 );
115
116 #[cfg(feature = "progress-bar")]
117 self.connection_bar
118 .set_pos(u64::try_from(self.count).expect("fits in u64"));
119 // .set_len(u64::try_from(self.limit).expect("fits in u64"));
120
121 self.count
122 }
123}
124
125impl Drop for ActiveConnectionCounter {
126 fn drop(&mut self) {
127 #[cfg(feature = "progress-bar")]
128 self.connection_bar.close();
129 }
130}
131
132/// A per-connection tracker.
133///
134/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
135/// When these trackers are dropped, the counter gets notified.
136pub struct ConnectionTracker {
137 /// The channel used to send closed connection notifications on drop.
138 close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
139
140 /// The label for this connection counter, typically its type.
141 label: Arc<str>,
142}
143
144impl fmt::Debug for ConnectionTracker {
145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 f.debug_tuple("ConnectionTracker")
147 .field(&self.label)
148 .finish()
149 }
150}
151
152impl ConnectionTracker {
153 /// Create and return a new active connection tracker, and add 1 to `counter`.
154 /// All connection trackers share a label with their connection counter.
155 ///
156 /// When the returned tracker is dropped, `counter` will be notified, and decreased by 1.
157 fn new(counter: &mut ActiveConnectionCounter) -> Self {
158 counter.count += 1;
159
160 debug!(
161 open_connections = ?counter.count,
162 limit = ?counter.limit,
163 label = ?counter.label,
164 "opening a new peer connection",
165 );
166
167 Self {
168 close_notification_tx: counter.close_notification_tx.clone(),
169 label: counter.label.clone(),
170 }
171 }
172}
173
174impl Drop for ConnectionTracker {
175 /// Notifies the corresponding connection counter that the connection has closed.
176 fn drop(&mut self) {
177 debug!(label = ?self.label, "closing a peer connection");
178
179 // We ignore disconnected errors, because the receiver can be dropped
180 // before some connections are dropped.
181 // # Security
182 //
183 // This channel is actually bounded by the inbound and outbound connection limit.
184 let _ = self.close_notification_tx.send(ConnectionClosed);
185 }
186}