1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
//! Counting active connections used by Zebra.
//!
//! These types can be used to count any kind of active resource.
//! But they are currently used to track the number of open connections.
use std::{fmt, sync::Arc};
use tokio::sync::mpsc;
/// A signal sent by a [`Connection`][1] when it closes.
///
/// Used to count the number of open connections.
///
/// [1]: crate::peer::Connection
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ConnectionClosed;
/// A counter for active connections.
///
/// Creates a [`ConnectionTracker`] to track each active connection.
/// When these trackers are dropped, the counter gets notified.
pub struct ActiveConnectionCounter {
/// The number of active peers tracked using this counter.
count: usize,
/// The limit for this type of connection, for diagnostics only.
/// The caller must enforce the limit by ignoring, delaying, or dropping connections.
limit: usize,
/// The label for this connection counter, typically its type.
label: Arc<str>,
/// The channel used to send closed connection notifications.
close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
/// The channel used to receive closed connection notifications.
close_notification_rx: mpsc::UnboundedReceiver<ConnectionClosed>,
/// Active connection count progress transmitter.
#[cfg(feature = "progress-bar")]
connection_bar: howudoin::Tx,
}
impl fmt::Debug for ActiveConnectionCounter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActiveConnectionCounter")
.field("label", &self.label)
.field("count", &self.count)
.field("limit", &self.limit)
.finish()
}
}
impl ActiveConnectionCounter {
/// Create and return a new active connection counter.
pub fn new_counter() -> Self {
Self::new_counter_with(usize::MAX, "Active Connections")
}
/// Create and return a new active connection counter with `limit` and `label`.
/// The caller must check and enforce limits using [`update_count()`](Self::update_count).
pub fn new_counter_with<S: ToString>(limit: usize, label: S) -> Self {
// The number of items in this channel is bounded by the connection limit.
let (close_notification_tx, close_notification_rx) = mpsc::unbounded_channel();
let label = label.to_string();
#[cfg(feature = "progress-bar")]
let connection_bar = howudoin::new_root().label(label.clone());
Self {
count: 0,
limit,
label: label.into(),
close_notification_rx,
close_notification_tx,
#[cfg(feature = "progress-bar")]
connection_bar,
}
}
/// Create and return a new [`ConnectionTracker`], and add 1 to this counter.
///
/// When the returned tracker is dropped, this counter will be notified, and decreased by 1.
pub fn track_connection(&mut self) -> ConnectionTracker {
ConnectionTracker::new(self)
}
/// Check for closed connection notifications, and return the current connection count.
pub fn update_count(&mut self) -> usize {
let previous_connections = self.count;
// We ignore errors here:
// - TryRecvError::Empty means that there are no pending close notifications
// - TryRecvError::Closed is unreachable, because we hold a sender
while let Ok(ConnectionClosed) = self.close_notification_rx.try_recv() {
self.count -= 1;
debug!(
open_connections = ?self.count,
?previous_connections,
limit = ?self.limit,
label = ?self.label,
"a peer connection was closed",
);
}
trace!(
open_connections = ?self.count,
?previous_connections,
limit = ?self.limit,
label = ?self.label,
"updated active connection count",
);
#[cfg(feature = "progress-bar")]
self.connection_bar
.set_pos(u64::try_from(self.count).expect("fits in u64"));
// .set_len(u64::try_from(self.limit).expect("fits in u64"));
self.count
}
}
impl Drop for ActiveConnectionCounter {
fn drop(&mut self) {
#[cfg(feature = "progress-bar")]
self.connection_bar.close();
}
}
/// A per-connection tracker.
///
/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
/// When these trackers are dropped, the counter gets notified.
pub struct ConnectionTracker {
/// The channel used to send closed connection notifications on drop.
close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
/// The label for this connection counter, typically its type.
label: Arc<str>,
}
impl fmt::Debug for ConnectionTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ConnectionTracker")
.field(&self.label)
.finish()
}
}
impl ConnectionTracker {
/// Create and return a new active connection tracker, and add 1 to `counter`.
/// All connection trackers share a label with their connection counter.
///
/// When the returned tracker is dropped, `counter` will be notified, and decreased by 1.
fn new(counter: &mut ActiveConnectionCounter) -> Self {
counter.count += 1;
debug!(
open_connections = ?counter.count,
limit = ?counter.limit,
label = ?counter.label,
"opening a new peer connection",
);
Self {
close_notification_tx: counter.close_notification_tx.clone(),
label: counter.label.clone(),
}
}
}
impl Drop for ConnectionTracker {
/// Notifies the corresponding connection counter that the connection has closed.
fn drop(&mut self) {
debug!(label = ?self.label, "closing a peer connection");
// We ignore disconnected errors, because the receiver can be dropped
// before some connections are dropped.
// # Security
//
// This channel is actually bounded by the inbound and outbound connection limit.
let _ = self.close_notification_tx.send(ConnectionClosed);
}
}