1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
//! Peer-related errors.
use std::{borrow::Cow, sync::Arc};
use thiserror::Error;
use tracing_error::TracedError;
use zebra_chain::serialization::SerializationError;
use crate::protocol::external::InventoryHash;
/// A wrapper around `Arc<PeerError>` that implements `Error`.
#[derive(Error, Debug, Clone)]
#[error(transparent)]
pub struct SharedPeerError(Arc<TracedError<PeerError>>);
impl<E> From<E> for SharedPeerError
where
PeerError: From<E>,
{
fn from(source: E) -> Self {
Self(Arc::new(TracedError::from(PeerError::from(source))))
}
}
impl SharedPeerError {
/// Returns a debug-formatted string describing the inner [`PeerError`].
///
/// Unfortunately, [`TracedError`] makes it impossible to get a reference to the original error.
pub fn inner_debug(&self) -> String {
format!("{:?}", self.0.as_ref())
}
}
/// An error related to peer connection handling.
#[derive(Error, Debug)]
#[allow(dead_code)]
pub enum PeerError {
/// The remote peer closed the connection.
#[error("Peer closed connection")]
ConnectionClosed,
/// Zebra dropped the [`Connection`](crate::peer::Connection).
#[error("Internal connection dropped")]
ConnectionDropped,
/// Zebra dropped the [`Client`](crate::peer::Client).
#[error("Internal client dropped")]
ClientDropped,
/// A [`Client`](crate::peer::Client)'s internal connection task exited.
#[error("Internal peer connection task exited")]
ConnectionTaskExited,
/// Zebra's [`Client`](crate::peer::Client) cancelled its heartbeat task.
#[error("Internal client cancelled its heartbeat task")]
ClientCancelledHeartbeatTask,
/// Zebra's internal heartbeat task exited.
#[error("Internal heartbeat task exited with message: {0:?}")]
HeartbeatTaskExited(String),
/// Sending a message to a remote peer took too long.
#[error("Sending Client request timed out")]
ConnectionSendTimeout,
/// Receiving a response to a [`Client`](crate::peer::Client) request took too long.
#[error("Receiving client response timed out")]
ConnectionReceiveTimeout,
/// A serialization error occurred while reading or writing a message.
#[error("Serialization error: {0}")]
Serialization(#[from] SerializationError),
/// A badly-behaved remote peer sent a handshake message after the handshake was
/// already complete.
#[error("Remote peer sent handshake messages after handshake")]
DuplicateHandshake,
/// This node's internal services were overloaded, so the connection was dropped
/// to shed load.
#[error("Internal services over capacity")]
Overloaded,
/// There are no ready remote peers.
#[error("No ready peers available")]
NoReadyPeers,
/// This peer request's caused an internal service timeout, so the connection was dropped
/// to shed load or prevent attacks.
#[error("Internal services timed out")]
InboundTimeout,
/// This node's internal services are no longer able to service requests.
#[error("Internal services have failed or shutdown")]
ServiceShutdown,
/// We requested data, but the peer replied with a `notfound` message.
/// (Or it didn't respond before the request finished.)
///
/// This error happens when the peer doesn't have any of the requested data,
/// so that the original request can be retried.
///
/// This is a temporary error.
///
/// Zebra can try different peers if the request is retried,
/// or peers can download and verify the missing data.
///
/// If the peer has some of the data, the request returns an [`Ok`] response,
/// with any `notfound` data is marked as [`Missing`][1].
///
/// [1]: crate::protocol::internal::InventoryResponse::Missing
#[error("Remote peer could not find any of the items: {0:?}")]
NotFoundResponse(Vec<InventoryHash>),
/// We requested data, but all our ready peers are marked as recently
/// [`Missing`][1] that data in our local inventory registry.
///
/// This is a temporary error.
///
/// Peers with the inventory can finish their requests and become ready, or
/// other peers can download and verify the missing data.
///
/// # Correctness
///
/// This error is produced using Zebra's local inventory registry, without
/// contacting any peers.
///
/// Client responses containing this error must not be used to update the
/// inventory registry. This makes sure that we eventually expire our local
/// cache of missing inventory, and send requests to peers again.
///
/// [1]: crate::protocol::internal::InventoryResponse::Missing
#[error("All ready peers are registered as recently missing these items: {0:?}")]
NotFoundRegistry(Vec<InventoryHash>),
}
impl PeerError {
/// Returns the Zebra internal handler type as a string.
pub fn kind(&self) -> Cow<'static, str> {
match self {
PeerError::ConnectionClosed => "ConnectionClosed".into(),
PeerError::ConnectionDropped => "ConnectionDropped".into(),
PeerError::ClientDropped => "ClientDropped".into(),
PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(),
PeerError::HeartbeatTaskExited(_) => "HeartbeatTaskExited".into(),
PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(),
PeerError::ConnectionSendTimeout => "ConnectionSendTimeout".into(),
PeerError::ConnectionReceiveTimeout => "ConnectionReceiveTimeout".into(),
// TODO: add error kinds or summaries to `SerializationError`
PeerError::Serialization(inner) => format!("Serialization({inner})").into(),
PeerError::DuplicateHandshake => "DuplicateHandshake".into(),
PeerError::Overloaded => "Overloaded".into(),
PeerError::NoReadyPeers => "NoReadyPeers".into(),
PeerError::InboundTimeout => "InboundTimeout".into(),
PeerError::ServiceShutdown => "ServiceShutdown".into(),
PeerError::NotFoundResponse(_) => "NotFoundResponse".into(),
PeerError::NotFoundRegistry(_) => "NotFoundRegistry".into(),
}
}
}
/// A shared error slot for peer errors.
///
/// # Correctness
///
/// Error slots are shared between sync and async code. In async code, the error
/// mutex should be held for as short a time as possible. This avoids blocking
/// the async task thread on acquiring the mutex.
///
/// > If the value behind the mutex is just data, it’s usually appropriate to use a blocking mutex
/// > ...
/// > wrap the `Arc<Mutex<...>>` in a struct
/// > that provides non-async methods for performing operations on the data within,
/// > and only lock the mutex inside these methods
///
/// <https://docs.rs/tokio/1.15.0/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use>
#[derive(Default, Clone)]
pub struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
impl std::fmt::Debug for ErrorSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// don't hang if the mutex is locked
// show the panic if the mutex was poisoned
f.debug_struct("ErrorSlot")
.field("error", &self.0.try_lock())
.finish()
}
}
impl ErrorSlot {
/// Read the current error in the slot.
///
/// Returns `None` if there is no error in the slot.
///
/// # Correctness
///
/// Briefly locks the error slot's threaded `std::sync::Mutex`, to get a
/// reference to the error in the slot.
#[allow(clippy::unwrap_in_result)]
pub fn try_get_error(&self) -> Option<SharedPeerError> {
self.0
.lock()
.expect("error mutex should be unpoisoned")
.as_ref()
.cloned()
}
/// Update the current error in the slot.
///
/// Returns `Err(AlreadyErrored)` if there was already an error in the slot.
///
/// # Correctness
///
/// Briefly locks the error slot's threaded `std::sync::Mutex`, to check for
/// a previous error, then update the error in the slot.
#[allow(clippy::unwrap_in_result)]
pub fn try_update_error(&self, e: SharedPeerError) -> Result<(), AlreadyErrored> {
let mut guard = self.0.lock().expect("error mutex should be unpoisoned");
if let Some(original_error) = guard.clone() {
Err(AlreadyErrored { original_error })
} else {
*guard = Some(e);
Ok(())
}
}
}
/// Error returned when the [`ErrorSlot`] already contains an error.
#[derive(Clone, Debug)]
pub struct AlreadyErrored {
/// The original error in the error slot.
pub original_error: SharedPeerError,
}
/// An error during a handshake with a remote peer.
#[derive(Error, Debug)]
pub enum HandshakeError {
/// The remote peer sent an unexpected message during the handshake.
#[error("The remote peer sent an unexpected message: {0:?}")]
UnexpectedMessage(Box<crate::protocol::external::Message>),
/// The peer connector detected handshake nonce reuse, possibly indicating self-connection.
#[error("Detected nonce reuse, possible self-connection")]
RemoteNonceReuse,
/// The peer connector created a duplicate random nonce. This is very unlikely,
/// because the range of the data type is 2^64.
#[error("Unexpectedly created a duplicate random local nonce")]
LocalDuplicateNonce,
/// The remote peer closed the connection.
#[error("Peer closed connection")]
ConnectionClosed,
/// An error occurred while performing an IO operation.
#[error("Underlying IO error")]
Io(#[from] std::io::Error),
/// A serialization error occurred while reading or writing a message.
#[error("Serialization error")]
Serialization(#[from] SerializationError),
/// The remote peer offered a version older than our minimum version.
#[error("Peer offered obsolete version: {0:?}")]
ObsoleteVersion(crate::protocol::external::types::Version),
/// Sending or receiving a message timed out.
#[error("Timeout when sending or receiving a message to peer")]
Timeout,
}
impl From<tokio::time::error::Elapsed> for HandshakeError {
fn from(_source: tokio::time::error::Elapsed) -> Self {
HandshakeError::Timeout
}
}