zebra_network/peer/
client.rs

1//! Handles outbound requests from our node to the network.
2
3use std::{
4    collections::HashSet,
5    future::Future,
6    iter,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10};
11
12use futures::{
13    channel::{mpsc, oneshot},
14    future, ready,
15    stream::{Stream, StreamExt},
16    FutureExt,
17};
18use tokio::{sync::broadcast, task::JoinHandle};
19use tower::Service;
20
21use zebra_chain::diagnostic::task::CheckForPanics;
22
23use crate::{
24    peer::{
25        error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError},
26        ConnectionInfo,
27    },
28    peer_set::InventoryChange,
29    protocol::{
30        external::InventoryHash,
31        internal::{Request, Response},
32    },
33    BoxError, PeerSocketAddr,
34};
35
36#[cfg(any(test, feature = "proptest-impl"))]
37pub mod tests;
38
39/// The "client" duplex half of a peer connection.
40pub struct Client {
41    /// The metadata for the connected peer `service`.
42    pub connection_info: Arc<ConnectionInfo>,
43
44    /// Used to shut down the corresponding heartbeat.
45    /// This is always Some except when we take it on drop.
46    pub(crate) shutdown_tx: Option<oneshot::Sender<CancelHeartbeatTask>>,
47
48    /// Used to send [`Request`]s to the remote peer.
49    pub(crate) server_tx: mpsc::Sender<ClientRequest>,
50
51    /// Used to register missing inventory in client [`Response`]s,
52    /// so that the peer set can route retries to other clients.
53    pub(crate) inv_collector: broadcast::Sender<InventoryChange>,
54
55    /// A slot for an error shared between the Connection and the Client that uses it.
56    ///
57    /// `None` unless the connection or client have errored.
58    pub(crate) error_slot: ErrorSlot,
59
60    /// A handle to the task responsible for connecting to the peer.
61    pub(crate) connection_task: JoinHandle<()>,
62
63    /// A handle to the task responsible for sending periodic heartbeats.
64    pub(crate) heartbeat_task: JoinHandle<Result<(), BoxError>>,
65}
66
67/// A signal sent by the [`Client`] half of a peer connection,
68/// to cancel a [`Client`]'s heartbeat task.
69///
70/// When it receives this signal, the heartbeat task exits.
71#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
72pub struct CancelHeartbeatTask;
73
74/// A message from the `peer::Client` to the `peer::Server`.
75#[derive(Debug)]
76pub(crate) struct ClientRequest {
77    /// The actual network request for the peer.
78    pub request: Request,
79
80    /// The response `Message` channel, included because `peer::Client::call`
81    /// returns a future that may be moved around before it resolves.
82    pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
83
84    /// Used to register missing inventory in responses on `tx`,
85    /// so that the peer set can route retries to other clients.
86    pub inv_collector: Option<broadcast::Sender<InventoryChange>>,
87
88    /// The peer address for registering missing inventory.
89    ///
90    /// TODO: replace this with `ConnectedAddr`?
91    pub transient_addr: Option<PeerSocketAddr>,
92
93    /// The tracing context for the request, so that work the connection task does
94    /// processing messages in the context of this request will have correct context.
95    pub span: tracing::Span,
96}
97
98/// A receiver for the `peer::Server`, which wraps a `mpsc::Receiver`,
99/// converting `ClientRequest`s into `InProgressClientRequest`s.
100#[derive(Debug)]
101pub(super) struct ClientRequestReceiver {
102    /// The inner receiver
103    inner: mpsc::Receiver<ClientRequest>,
104}
105
106/// A message from the `peer::Client` to the `peer::Server`,
107/// after it has been received by the `peer::Server`.
108#[derive(Debug)]
109#[must_use = "tx.send() must be called before drop"]
110pub(super) struct InProgressClientRequest {
111    /// The actual request.
112    pub request: Request,
113
114    /// The return message channel, included because `peer::Client::call` returns a
115    /// future that may be moved around before it resolves.
116    ///
117    /// INVARIANT: `tx.send()` must be called before dropping `tx`.
118    ///
119    /// JUSTIFICATION: the `peer::Client` translates `Request`s into
120    /// `ClientRequest`s, which it sends to a background task. If the send is
121    /// `Ok(())`, it will assume that it is safe to unconditionally poll the
122    /// `Receiver` tied to the `Sender` used to create the `ClientRequest`.
123    ///
124    /// We also take advantage of this invariant to route inventory requests
125    /// away from peers that did not respond with that inventory.
126    ///
127    /// We enforce this invariant via the type system, by converting
128    /// `ClientRequest`s to `InProgressClientRequest`s when they are received by
129    /// the background task. These conversions are implemented by
130    /// `ClientRequestReceiver`.
131    pub tx: MustUseClientResponseSender,
132
133    /// The tracing context for the request, so that work the connection task does
134    /// processing messages in the context of this request will have correct context.
135    pub span: tracing::Span,
136}
137
138/// A `oneshot::Sender` for client responses, that must be used by calling `send()`.
139/// Also handles forwarding missing inventory to the inventory registry.
140///
141/// Panics on drop if `tx` has not been used or canceled.
142/// Panics if `tx.send()` is used more than once.
143#[derive(Debug)]
144#[must_use = "tx.send() must be called before drop"]
145pub(super) struct MustUseClientResponseSender {
146    /// The sender for the oneshot client response channel.
147    ///
148    /// `None` if `tx.send()` has been used.
149    pub tx: Option<oneshot::Sender<Result<Response, SharedPeerError>>>,
150
151    /// Forwards missing inventory in the response to the inventory collector.
152    ///
153    /// Boxed to reduce the size of containing structures.
154    pub missing_inv: Option<Box<MissingInventoryCollector>>,
155}
156
157/// Forwards missing inventory in the response to the inventory registry.
158#[derive(Debug)]
159pub(super) struct MissingInventoryCollector {
160    /// A clone of the original request, if it is an inventory request.
161    ///
162    /// This struct is only ever created with inventory requests.
163    request: Request,
164
165    /// Used to register missing inventory from responses,
166    /// so that the peer set can route retries to other clients.
167    collector: broadcast::Sender<InventoryChange>,
168
169    /// The peer address for registering missing inventory.
170    transient_addr: PeerSocketAddr,
171}
172
173impl std::fmt::Debug for Client {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        // skip the channels, they don't tell us anything useful
176        f.debug_struct("Client")
177            .field("connection_info", &self.connection_info)
178            .field("error_slot", &self.error_slot)
179            .field("connection_task", &self.connection_task)
180            .field("heartbeat_task", &self.heartbeat_task)
181            .finish()
182    }
183}
184
185impl From<ClientRequest> for InProgressClientRequest {
186    fn from(client_request: ClientRequest) -> Self {
187        let ClientRequest {
188            request,
189            tx,
190            inv_collector,
191            transient_addr,
192            span,
193        } = client_request;
194
195        let tx = MustUseClientResponseSender::new(tx, &request, inv_collector, transient_addr);
196
197        InProgressClientRequest { request, tx, span }
198    }
199}
200
201impl ClientRequestReceiver {
202    /// Forwards to `inner.close()`.
203    pub fn close(&mut self) {
204        self.inner.close()
205    }
206
207    /// Closes `inner`, then gets the next pending [`Request`].
208    ///
209    /// Closing the channel ensures that:
210    /// - the request stream terminates, and
211    /// - task notifications are not required.
212    pub fn close_and_flush_next(&mut self) -> Option<InProgressClientRequest> {
213        self.inner.close();
214
215        // # Correctness
216        //
217        // The request stream terminates, because the sender is closed,
218        // and the channel has a limited capacity.
219        // Task notifications are not required, because the sender is closed.
220        //
221        // Despite what its documentation says, we've seen futures::channel::mpsc::Receiver::try_next()
222        // return an error after the channel is closed.
223        self.inner.try_next().ok()?.map(Into::into)
224    }
225}
226
227impl Stream for ClientRequestReceiver {
228    type Item = InProgressClientRequest;
229
230    /// Converts the successful result of `inner.poll_next()` to an
231    /// `InProgressClientRequest`.
232    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
233        match self.inner.poll_next_unpin(cx) {
234            Poll::Ready(client_request) => Poll::Ready(client_request.map(Into::into)),
235            // CORRECTNESS
236            //
237            // The current task must be scheduled for wakeup every time we
238            // return `Poll::Pending`.
239            //
240            // inner.poll_next_unpin` schedules this task for wakeup when
241            // there are new items available in the inner stream.
242            Poll::Pending => Poll::Pending,
243        }
244    }
245
246    /// Returns `inner.size_hint()`
247    fn size_hint(&self) -> (usize, Option<usize>) {
248        self.inner.size_hint()
249    }
250}
251
252impl From<mpsc::Receiver<ClientRequest>> for ClientRequestReceiver {
253    fn from(rx: mpsc::Receiver<ClientRequest>) -> Self {
254        ClientRequestReceiver { inner: rx }
255    }
256}
257
258impl MustUseClientResponseSender {
259    /// Returns a newly created client response sender for `tx`.
260    ///
261    /// If `request` or the response contains missing inventory,
262    /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
263    pub fn new(
264        tx: oneshot::Sender<Result<Response, SharedPeerError>>,
265        request: &Request,
266        inv_collector: Option<broadcast::Sender<InventoryChange>>,
267        transient_addr: Option<PeerSocketAddr>,
268    ) -> Self {
269        Self {
270            tx: Some(tx),
271            missing_inv: MissingInventoryCollector::new(request, inv_collector, transient_addr),
272        }
273    }
274
275    /// Forwards `response` to `tx.send()`, and missing inventory to `inv_collector`,
276    /// and marks this sender as used.
277    ///
278    /// Panics if `tx.send()` is used more than once.
279    pub fn send(
280        mut self,
281        response: Result<Response, SharedPeerError>,
282    ) -> Result<(), Result<Response, SharedPeerError>> {
283        // Forward any missing inventory to the registry.
284        if let Some(missing_inv) = self.missing_inv.take() {
285            missing_inv.send(&response);
286        }
287
288        // Forward the response to the internal requester.
289        self.tx
290            .take()
291            .unwrap_or_else(|| {
292                panic!(
293                    "multiple uses of response sender: response must be sent exactly once: {self:?}"
294                )
295            })
296            .send(response)
297    }
298
299    /// Returns `tx.cancellation()`.
300    ///
301    /// Panics if `tx.send()` has previously been used.
302    pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, Result<Response, SharedPeerError>> {
303        self.tx
304            .as_mut()
305            .map(|tx| tx.cancellation())
306            .unwrap_or_else( || {
307                panic!("called cancellation() after using oneshot sender: oneshot must be used exactly once")
308            })
309    }
310
311    /// Returns `tx.is_canceled()`.
312    ///
313    /// Panics if `tx.send()` has previously been used.
314    pub fn is_canceled(&self) -> bool {
315        self.tx
316            .as_ref()
317            .map(|tx| tx.is_canceled())
318            .unwrap_or_else(
319                || panic!("called is_canceled() after using oneshot sender: oneshot must be used exactly once: {self:?}"))
320    }
321}
322
323impl Drop for MustUseClientResponseSender {
324    #[instrument(skip(self))]
325    fn drop(&mut self) {
326        // we don't panic if we are shutting down anyway
327        if !zebra_chain::shutdown::is_shutting_down() {
328            // is_canceled() will not panic, because we check is_none() first
329            assert!(
330                self.tx.is_none() || self.is_canceled(),
331                "unused client response sender: oneshot must be used or canceled: {self:?}"
332            );
333        }
334    }
335}
336
337impl MissingInventoryCollector {
338    /// Returns a newly created missing inventory collector, if needed.
339    ///
340    /// If `request` or the response contains missing inventory,
341    /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
342    pub fn new(
343        request: &Request,
344        inv_collector: Option<broadcast::Sender<InventoryChange>>,
345        transient_addr: Option<PeerSocketAddr>,
346    ) -> Option<Box<MissingInventoryCollector>> {
347        if !request.is_inventory_download() {
348            return None;
349        }
350
351        if let (Some(inv_collector), Some(transient_addr)) = (inv_collector, transient_addr) {
352            Some(Box::new(MissingInventoryCollector {
353                request: request.clone(),
354                collector: inv_collector,
355                transient_addr,
356            }))
357        } else {
358            None
359        }
360    }
361
362    /// Forwards any missing inventory to the registry.
363    ///
364    /// `zcashd` doesn't send `notfound` messages for blocks,
365    /// so we need to track missing blocks ourselves.
366    ///
367    /// This can sometimes send duplicate missing inventory,
368    /// but the registry ignores duplicates anyway.
369    pub fn send(self, response: &Result<Response, SharedPeerError>) {
370        let missing_inv: HashSet<InventoryHash> = match (self.request, response) {
371            // Missing block hashes from partial responses.
372            (_, Ok(Response::Blocks(block_statuses))) => block_statuses
373                .iter()
374                .filter_map(|b| b.missing())
375                .map(InventoryHash::Block)
376                .collect(),
377
378            // Missing transaction IDs from partial responses.
379            (_, Ok(Response::Transactions(tx_statuses))) => tx_statuses
380                .iter()
381                .filter_map(|tx| tx.missing())
382                .map(|tx| tx.into())
383                .collect(),
384
385            // Other response types never contain missing inventory.
386            (_, Ok(_)) => iter::empty().collect(),
387
388            // We don't forward NotFoundRegistry errors,
389            // because the errors are generated locally from the registry,
390            // so those statuses are already in the registry.
391            //
392            // Unfortunately, we can't access the inner error variant here,
393            // due to TracedError.
394            (_, Err(e)) if e.inner_debug().contains("NotFoundRegistry") => iter::empty().collect(),
395
396            // Missing inventory from other errors, including NotFoundResponse, timeouts,
397            // and dropped connections.
398            (request, Err(_)) => {
399                // The request either contains blocks or transactions,
400                // but this is a convenient way to collect them both.
401                let missing_blocks = request
402                    .block_hash_inventory()
403                    .into_iter()
404                    .map(InventoryHash::Block);
405
406                let missing_txs = request
407                    .transaction_id_inventory()
408                    .into_iter()
409                    .map(InventoryHash::from);
410
411                missing_blocks.chain(missing_txs).collect()
412            }
413        };
414
415        if let Some(missing_inv) =
416            InventoryChange::new_missing_multi(missing_inv.iter(), self.transient_addr)
417        {
418            // if all the receivers are closed, assume we're in tests or an isolated connection
419            let _ = self.collector.send(missing_inv);
420        }
421    }
422}
423
424impl Client {
425    /// Check if this connection's heartbeat task has exited.
426    ///
427    /// Returns an error if the heartbeat task exited. Otherwise, schedules the client task for
428    /// wakeup when the heartbeat task finishes, or the channel closes, and returns `Pending`.
429    ///
430    /// # Panics
431    ///
432    /// If the heartbeat task panicked.
433    #[allow(clippy::unwrap_in_result)]
434    fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
435        let is_canceled = self
436            .shutdown_tx
437            .as_mut()
438            .expect("only taken on drop")
439            .poll_canceled(cx)
440            .is_ready();
441
442        let result = match self.heartbeat_task.poll_unpin(cx) {
443            Poll::Pending => {
444                // The heartbeat task returns `Pending` while it continues to run.
445                // But if it has dropped its receiver, it is shutting down, and we should also shut down.
446                if is_canceled {
447                    self.set_task_exited_error(
448                        "heartbeat",
449                        PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
450                    )
451                } else {
452                    // Heartbeat task is still running.
453                    return Poll::Pending;
454                }
455            }
456            Poll::Ready(Ok(Ok(_))) => {
457                // Heartbeat task stopped unexpectedly, without panic or error.
458                self.set_task_exited_error(
459                    "heartbeat",
460                    PeerError::HeartbeatTaskExited(
461                        "Heartbeat task stopped unexpectedly".to_string(),
462                    ),
463                )
464            }
465            Poll::Ready(Ok(Err(error))) => {
466                // Heartbeat task stopped unexpectedly, with error.
467                self.set_task_exited_error(
468                    "heartbeat",
469                    PeerError::HeartbeatTaskExited(error.to_string()),
470                )
471            }
472            Poll::Ready(Err(error)) => {
473                // Heartbeat task panicked.
474                let error = error.panic_if_task_has_panicked();
475
476                // Heartbeat task was cancelled.
477                if error.is_cancelled() {
478                    self.set_task_exited_error(
479                        "heartbeat",
480                        PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
481                    )
482                }
483                // Heartbeat task stopped with another kind of task error.
484                else {
485                    self.set_task_exited_error(
486                        "heartbeat",
487                        PeerError::HeartbeatTaskExited(error.to_string()),
488                    )
489                }
490            }
491        };
492
493        Poll::Ready(result)
494    }
495
496    /// Check if the connection's request/response task has exited.
497    ///
498    /// Returns an error if the connection task exited. Otherwise, schedules the client task for
499    /// wakeup when the connection task finishes, and returns `Pending`.
500    ///
501    /// # Panics
502    ///
503    /// If the connection task panicked.
504    fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
505        // Return `Pending` if the connection task is still running.
506        let result = match ready!(self.connection_task.poll_unpin(context)) {
507            Ok(()) => {
508                // Connection task stopped unexpectedly, without panicking.
509                self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
510            }
511            Err(error) => {
512                // Connection task panicked.
513                let error = error.panic_if_task_has_panicked();
514
515                // Connection task was cancelled.
516                if error.is_cancelled() {
517                    self.set_task_exited_error(
518                        "connection",
519                        PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
520                    )
521                }
522                // Connection task stopped with another kind of task error.
523                else {
524                    self.set_task_exited_error(
525                        "connection",
526                        PeerError::HeartbeatTaskExited(error.to_string()),
527                    )
528                }
529            }
530        };
531
532        Poll::Ready(result)
533    }
534
535    /// Properly update the error slot after a background task has unexpectedly stopped.
536    fn set_task_exited_error(
537        &mut self,
538        task_name: &str,
539        error: PeerError,
540    ) -> Result<(), SharedPeerError> {
541        // Make sure there is an error in the slot
542        let task_error = SharedPeerError::from(error);
543        let original_error = self.error_slot.try_update_error(task_error.clone());
544        debug!(
545            ?original_error,
546            latest_error = ?task_error,
547            "client {} task exited", task_name
548        );
549
550        if let Err(AlreadyErrored { original_error }) = original_error {
551            Err(original_error)
552        } else {
553            Err(task_error)
554        }
555    }
556
557    /// Poll for space in the shared request sender channel.
558    ///
559    /// Returns an error if the sender channel is closed. If there is no space in the channel,
560    /// returns `Pending`, and schedules the task for wakeup when there is space available.
561    fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
562        let server_result = ready!(self.server_tx.poll_ready(cx));
563        if server_result.is_err() {
564            Poll::Ready(Err(self
565                .error_slot
566                .try_get_error()
567                .unwrap_or_else(|| PeerError::ConnectionTaskExited.into())))
568        } else if let Some(error) = self.error_slot.try_get_error() {
569            Poll::Ready(Err(error))
570        } else {
571            Poll::Ready(Ok(()))
572        }
573    }
574
575    /// Poll for space in the shared request sender channel, and for errors in the connection tasks.
576    ///
577    /// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have
578    /// terminated. If there is no space in the channel, returns `Pending`, and schedules the task
579    /// for wakeup when there is space available, or one of the tasks terminates.
580    fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
581        // # Correctness
582        //
583        // The current task must be scheduled for wakeup every time we return
584        // `Poll::Pending`.
585        //
586        // `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup
587        // if either task exits, or if the heartbeat task drops the cancel handle.
588        //
589        //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
590        // schedules this task for wakeup.
591        //
592        // It's ok to exit early and skip wakeups when there is an error, because the connection
593        // and its tasks are shut down immediately on error.
594
595        let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?;
596        let _connection_pending: Poll<()> = self.poll_connection(cx)?;
597
598        // We're only pending if the sender channel is full.
599        self.poll_request(cx)
600    }
601
602    /// Shut down the resources held by the client half of this peer connection.
603    ///
604    /// Stops further requests to the remote peer, and stops the heartbeat task.
605    fn shutdown(&mut self) {
606        // Prevent any senders from sending more messages to this peer.
607        self.server_tx.close_channel();
608
609        // Ask the heartbeat task to stop.
610        if let Some(shutdown_tx) = self.shutdown_tx.take() {
611            let _ = shutdown_tx.send(CancelHeartbeatTask);
612        }
613
614        // Force the connection and heartbeat tasks to stop.
615        self.connection_task.abort();
616        self.heartbeat_task.abort();
617    }
618}
619
620impl Service<Request> for Client {
621    type Response = Response;
622    type Error = SharedPeerError;
623    type Future =
624        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
625
626    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
627        // # Correctness
628        //
629        // The current task must be scheduled for wakeup every time we return
630        // `Poll::Pending`.
631        //
632        // `poll_client()` schedules the client task for wakeup if the sender channel has space,
633        // either connection task exits, or if the heartbeat task drops the cancel handle.
634
635        // Check all the tasks and channels.
636        //
637        //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
638        // schedules this task for wakeup.
639        let result = ready!(self.poll_client(cx));
640
641        // Shut down the client and connection if there is an error.
642        if let Err(error) = result {
643            self.shutdown();
644
645            Poll::Ready(Err(error))
646        } else {
647            Poll::Ready(Ok(()))
648        }
649    }
650
651    fn call(&mut self, request: Request) -> Self::Future {
652        let (tx, rx) = oneshot::channel();
653        // get the current Span to propagate it to the peer connection task.
654        // this allows the peer connection to enter the correct tracing context
655        // when it's handling messages in the context of processing this
656        // request.
657        let span = tracing::Span::current();
658
659        match self.server_tx.try_send(ClientRequest {
660            request,
661            tx,
662            inv_collector: Some(self.inv_collector.clone()),
663            transient_addr: self.connection_info.connected_addr.get_transient_addr(),
664            span,
665        }) {
666            Err(e) => {
667                if e.is_disconnected() {
668                    let peer_error = self
669                        .error_slot
670                        .try_get_error()
671                        .unwrap_or_else(|| PeerError::ConnectionTaskExited.into());
672
673                    let ClientRequest { tx, .. } = e.into_inner();
674                    let _ = tx.send(Err(peer_error.clone()));
675
676                    future::ready(Err(peer_error)).boxed()
677                } else {
678                    // sending fails when there's not enough
679                    // channel space, but we called poll_ready
680                    panic!("called call without poll_ready");
681                }
682            }
683            Ok(()) => {
684                // The receiver end of the oneshot is itself a future.
685                rx.map(|oneshot_recv_result| {
686                    // The ClientRequest oneshot sender should not be dropped before sending a
687                    // response. But sometimes that happens during process or connection shutdown.
688                    // So we just return a generic error here.
689                    match oneshot_recv_result {
690                        Ok(result) => result,
691                        Err(oneshot::Canceled) => Err(PeerError::ConnectionDropped.into()),
692                    }
693                })
694                .boxed()
695            }
696        }
697    }
698}
699
700impl Drop for Client {
701    fn drop(&mut self) {
702        // Make sure there is an error in the slot
703        let drop_error: SharedPeerError = PeerError::ClientDropped.into();
704        let original_error = self.error_slot.try_update_error(drop_error.clone());
705        debug!(
706            ?original_error,
707            latest_error = ?drop_error,
708            "client struct dropped"
709        );
710
711        self.shutdown();
712    }
713}