zebra_network/peer/client.rs
1//! Handles outbound requests from our node to the network.
2
3use std::{
4 collections::HashSet,
5 future::Future,
6 iter,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10};
11
12use futures::{
13 channel::{mpsc, oneshot},
14 future, ready,
15 stream::{Stream, StreamExt},
16 FutureExt,
17};
18use tokio::{sync::broadcast, task::JoinHandle};
19use tower::Service;
20
21use zebra_chain::diagnostic::task::CheckForPanics;
22
23use crate::{
24 peer::{
25 error::{AlreadyErrored, ErrorSlot, PeerError, SharedPeerError},
26 ConnectionInfo,
27 },
28 peer_set::InventoryChange,
29 protocol::{
30 external::InventoryHash,
31 internal::{Request, Response},
32 },
33 BoxError, PeerSocketAddr,
34};
35
36#[cfg(any(test, feature = "proptest-impl"))]
37pub mod tests;
38
39/// The "client" duplex half of a peer connection.
40pub struct Client {
41 /// The metadata for the connected peer `service`.
42 pub connection_info: Arc<ConnectionInfo>,
43
44 /// Used to shut down the corresponding heartbeat.
45 /// This is always Some except when we take it on drop.
46 pub(crate) shutdown_tx: Option<oneshot::Sender<CancelHeartbeatTask>>,
47
48 /// Used to send [`Request`]s to the remote peer.
49 pub(crate) server_tx: mpsc::Sender<ClientRequest>,
50
51 /// Used to register missing inventory in client [`Response`]s,
52 /// so that the peer set can route retries to other clients.
53 pub(crate) inv_collector: broadcast::Sender<InventoryChange>,
54
55 /// A slot for an error shared between the Connection and the Client that uses it.
56 ///
57 /// `None` unless the connection or client have errored.
58 pub(crate) error_slot: ErrorSlot,
59
60 /// A handle to the task responsible for connecting to the peer.
61 pub(crate) connection_task: JoinHandle<()>,
62
63 /// A handle to the task responsible for sending periodic heartbeats.
64 pub(crate) heartbeat_task: JoinHandle<Result<(), BoxError>>,
65}
66
67/// A signal sent by the [`Client`] half of a peer connection,
68/// to cancel a [`Client`]'s heartbeat task.
69///
70/// When it receives this signal, the heartbeat task exits.
71#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
72pub struct CancelHeartbeatTask;
73
74/// A message from the `peer::Client` to the `peer::Server`.
75#[derive(Debug)]
76pub(crate) struct ClientRequest {
77 /// The actual network request for the peer.
78 pub request: Request,
79
80 /// The response `Message` channel, included because `peer::Client::call`
81 /// returns a future that may be moved around before it resolves.
82 pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
83
84 /// Used to register missing inventory in responses on `tx`,
85 /// so that the peer set can route retries to other clients.
86 pub inv_collector: Option<broadcast::Sender<InventoryChange>>,
87
88 /// The peer address for registering missing inventory.
89 ///
90 /// TODO: replace this with `ConnectedAddr`?
91 pub transient_addr: Option<PeerSocketAddr>,
92
93 /// The tracing context for the request, so that work the connection task does
94 /// processing messages in the context of this request will have correct context.
95 pub span: tracing::Span,
96}
97
98/// A receiver for the `peer::Server`, which wraps a `mpsc::Receiver`,
99/// converting `ClientRequest`s into `InProgressClientRequest`s.
100#[derive(Debug)]
101pub(super) struct ClientRequestReceiver {
102 /// The inner receiver
103 inner: mpsc::Receiver<ClientRequest>,
104}
105
106/// A message from the `peer::Client` to the `peer::Server`,
107/// after it has been received by the `peer::Server`.
108#[derive(Debug)]
109#[must_use = "tx.send() must be called before drop"]
110pub(super) struct InProgressClientRequest {
111 /// The actual request.
112 pub request: Request,
113
114 /// The return message channel, included because `peer::Client::call` returns a
115 /// future that may be moved around before it resolves.
116 ///
117 /// INVARIANT: `tx.send()` must be called before dropping `tx`.
118 ///
119 /// JUSTIFICATION: the `peer::Client` translates `Request`s into
120 /// `ClientRequest`s, which it sends to a background task. If the send is
121 /// `Ok(())`, it will assume that it is safe to unconditionally poll the
122 /// `Receiver` tied to the `Sender` used to create the `ClientRequest`.
123 ///
124 /// We also take advantage of this invariant to route inventory requests
125 /// away from peers that did not respond with that inventory.
126 ///
127 /// We enforce this invariant via the type system, by converting
128 /// `ClientRequest`s to `InProgressClientRequest`s when they are received by
129 /// the background task. These conversions are implemented by
130 /// `ClientRequestReceiver`.
131 pub tx: MustUseClientResponseSender,
132
133 /// The tracing context for the request, so that work the connection task does
134 /// processing messages in the context of this request will have correct context.
135 pub span: tracing::Span,
136}
137
138/// A `oneshot::Sender` for client responses, that must be used by calling `send()`.
139/// Also handles forwarding missing inventory to the inventory registry.
140///
141/// Panics on drop if `tx` has not been used or canceled.
142/// Panics if `tx.send()` is used more than once.
143#[derive(Debug)]
144#[must_use = "tx.send() must be called before drop"]
145pub(super) struct MustUseClientResponseSender {
146 /// The sender for the oneshot client response channel.
147 ///
148 /// `None` if `tx.send()` has been used.
149 pub tx: Option<oneshot::Sender<Result<Response, SharedPeerError>>>,
150
151 /// Forwards missing inventory in the response to the inventory collector.
152 ///
153 /// Boxed to reduce the size of containing structures.
154 pub missing_inv: Option<Box<MissingInventoryCollector>>,
155}
156
157/// Forwards missing inventory in the response to the inventory registry.
158#[derive(Debug)]
159pub(super) struct MissingInventoryCollector {
160 /// A clone of the original request, if it is an inventory request.
161 ///
162 /// This struct is only ever created with inventory requests.
163 request: Request,
164
165 /// Used to register missing inventory from responses,
166 /// so that the peer set can route retries to other clients.
167 collector: broadcast::Sender<InventoryChange>,
168
169 /// The peer address for registering missing inventory.
170 transient_addr: PeerSocketAddr,
171}
172
173impl std::fmt::Debug for Client {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 // skip the channels, they don't tell us anything useful
176 f.debug_struct("Client")
177 .field("connection_info", &self.connection_info)
178 .field("error_slot", &self.error_slot)
179 .field("connection_task", &self.connection_task)
180 .field("heartbeat_task", &self.heartbeat_task)
181 .finish()
182 }
183}
184
185impl From<ClientRequest> for InProgressClientRequest {
186 fn from(client_request: ClientRequest) -> Self {
187 let ClientRequest {
188 request,
189 tx,
190 inv_collector,
191 transient_addr,
192 span,
193 } = client_request;
194
195 let tx = MustUseClientResponseSender::new(tx, &request, inv_collector, transient_addr);
196
197 InProgressClientRequest { request, tx, span }
198 }
199}
200
201impl ClientRequestReceiver {
202 /// Forwards to `inner.close()`.
203 pub fn close(&mut self) {
204 self.inner.close()
205 }
206
207 /// Closes `inner`, then gets the next pending [`Request`].
208 ///
209 /// Closing the channel ensures that:
210 /// - the request stream terminates, and
211 /// - task notifications are not required.
212 pub fn close_and_flush_next(&mut self) -> Option<InProgressClientRequest> {
213 self.inner.close();
214
215 // # Correctness
216 //
217 // The request stream terminates, because the sender is closed,
218 // and the channel has a limited capacity.
219 // Task notifications are not required, because the sender is closed.
220 //
221 // Despite what its documentation says, we've seen futures::channel::mpsc::Receiver::try_next()
222 // return an error after the channel is closed.
223 self.inner.try_next().ok()?.map(Into::into)
224 }
225}
226
227impl Stream for ClientRequestReceiver {
228 type Item = InProgressClientRequest;
229
230 /// Converts the successful result of `inner.poll_next()` to an
231 /// `InProgressClientRequest`.
232 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
233 match self.inner.poll_next_unpin(cx) {
234 Poll::Ready(client_request) => Poll::Ready(client_request.map(Into::into)),
235 // CORRECTNESS
236 //
237 // The current task must be scheduled for wakeup every time we
238 // return `Poll::Pending`.
239 //
240 // inner.poll_next_unpin` schedules this task for wakeup when
241 // there are new items available in the inner stream.
242 Poll::Pending => Poll::Pending,
243 }
244 }
245
246 /// Returns `inner.size_hint()`
247 fn size_hint(&self) -> (usize, Option<usize>) {
248 self.inner.size_hint()
249 }
250}
251
252impl From<mpsc::Receiver<ClientRequest>> for ClientRequestReceiver {
253 fn from(rx: mpsc::Receiver<ClientRequest>) -> Self {
254 ClientRequestReceiver { inner: rx }
255 }
256}
257
258impl MustUseClientResponseSender {
259 /// Returns a newly created client response sender for `tx`.
260 ///
261 /// If `request` or the response contains missing inventory,
262 /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
263 pub fn new(
264 tx: oneshot::Sender<Result<Response, SharedPeerError>>,
265 request: &Request,
266 inv_collector: Option<broadcast::Sender<InventoryChange>>,
267 transient_addr: Option<PeerSocketAddr>,
268 ) -> Self {
269 Self {
270 tx: Some(tx),
271 missing_inv: MissingInventoryCollector::new(request, inv_collector, transient_addr),
272 }
273 }
274
275 /// Forwards `response` to `tx.send()`, and missing inventory to `inv_collector`,
276 /// and marks this sender as used.
277 ///
278 /// Panics if `tx.send()` is used more than once.
279 pub fn send(
280 mut self,
281 response: Result<Response, SharedPeerError>,
282 ) -> Result<(), Result<Response, SharedPeerError>> {
283 // Forward any missing inventory to the registry.
284 if let Some(missing_inv) = self.missing_inv.take() {
285 missing_inv.send(&response);
286 }
287
288 // Forward the response to the internal requester.
289 self.tx
290 .take()
291 .unwrap_or_else(|| {
292 panic!(
293 "multiple uses of response sender: response must be sent exactly once: {self:?}"
294 )
295 })
296 .send(response)
297 }
298
299 /// Returns `tx.cancellation()`.
300 ///
301 /// Panics if `tx.send()` has previously been used.
302 pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, Result<Response, SharedPeerError>> {
303 self.tx
304 .as_mut()
305 .map(|tx| tx.cancellation())
306 .unwrap_or_else( || {
307 panic!("called cancellation() after using oneshot sender: oneshot must be used exactly once")
308 })
309 }
310
311 /// Returns `tx.is_canceled()`.
312 ///
313 /// Panics if `tx.send()` has previously been used.
314 pub fn is_canceled(&self) -> bool {
315 self.tx
316 .as_ref()
317 .map(|tx| tx.is_canceled())
318 .unwrap_or_else(
319 || panic!("called is_canceled() after using oneshot sender: oneshot must be used exactly once: {self:?}"))
320 }
321}
322
323impl Drop for MustUseClientResponseSender {
324 #[instrument(skip(self))]
325 fn drop(&mut self) {
326 // we don't panic if we are shutting down anyway
327 if !zebra_chain::shutdown::is_shutting_down() {
328 // is_canceled() will not panic, because we check is_none() first
329 assert!(
330 self.tx.is_none() || self.is_canceled(),
331 "unused client response sender: oneshot must be used or canceled: {self:?}"
332 );
333 }
334 }
335}
336
337impl MissingInventoryCollector {
338 /// Returns a newly created missing inventory collector, if needed.
339 ///
340 /// If `request` or the response contains missing inventory,
341 /// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
342 pub fn new(
343 request: &Request,
344 inv_collector: Option<broadcast::Sender<InventoryChange>>,
345 transient_addr: Option<PeerSocketAddr>,
346 ) -> Option<Box<MissingInventoryCollector>> {
347 if !request.is_inventory_download() {
348 return None;
349 }
350
351 if let (Some(inv_collector), Some(transient_addr)) = (inv_collector, transient_addr) {
352 Some(Box::new(MissingInventoryCollector {
353 request: request.clone(),
354 collector: inv_collector,
355 transient_addr,
356 }))
357 } else {
358 None
359 }
360 }
361
362 /// Forwards any missing inventory to the registry.
363 ///
364 /// `zcashd` doesn't send `notfound` messages for blocks,
365 /// so we need to track missing blocks ourselves.
366 ///
367 /// This can sometimes send duplicate missing inventory,
368 /// but the registry ignores duplicates anyway.
369 pub fn send(self, response: &Result<Response, SharedPeerError>) {
370 let missing_inv: HashSet<InventoryHash> = match (self.request, response) {
371 // Missing block hashes from partial responses.
372 (_, Ok(Response::Blocks(block_statuses))) => block_statuses
373 .iter()
374 .filter_map(|b| b.missing())
375 .map(InventoryHash::Block)
376 .collect(),
377
378 // Missing transaction IDs from partial responses.
379 (_, Ok(Response::Transactions(tx_statuses))) => tx_statuses
380 .iter()
381 .filter_map(|tx| tx.missing())
382 .map(|tx| tx.into())
383 .collect(),
384
385 // Other response types never contain missing inventory.
386 (_, Ok(_)) => iter::empty().collect(),
387
388 // We don't forward NotFoundRegistry errors,
389 // because the errors are generated locally from the registry,
390 // so those statuses are already in the registry.
391 //
392 // Unfortunately, we can't access the inner error variant here,
393 // due to TracedError.
394 (_, Err(e)) if e.inner_debug().contains("NotFoundRegistry") => iter::empty().collect(),
395
396 // Missing inventory from other errors, including NotFoundResponse, timeouts,
397 // and dropped connections.
398 (request, Err(_)) => {
399 // The request either contains blocks or transactions,
400 // but this is a convenient way to collect them both.
401 let missing_blocks = request
402 .block_hash_inventory()
403 .into_iter()
404 .map(InventoryHash::Block);
405
406 let missing_txs = request
407 .transaction_id_inventory()
408 .into_iter()
409 .map(InventoryHash::from);
410
411 missing_blocks.chain(missing_txs).collect()
412 }
413 };
414
415 if let Some(missing_inv) =
416 InventoryChange::new_missing_multi(missing_inv.iter(), self.transient_addr)
417 {
418 // if all the receivers are closed, assume we're in tests or an isolated connection
419 let _ = self.collector.send(missing_inv);
420 }
421 }
422}
423
424impl Client {
425 /// Check if this connection's heartbeat task has exited.
426 ///
427 /// Returns an error if the heartbeat task exited. Otherwise, schedules the client task for
428 /// wakeup when the heartbeat task finishes, or the channel closes, and returns `Pending`.
429 ///
430 /// # Panics
431 ///
432 /// If the heartbeat task panicked.
433 #[allow(clippy::unwrap_in_result)]
434 fn poll_heartbeat(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
435 let is_canceled = self
436 .shutdown_tx
437 .as_mut()
438 .expect("only taken on drop")
439 .poll_canceled(cx)
440 .is_ready();
441
442 let result = match self.heartbeat_task.poll_unpin(cx) {
443 Poll::Pending => {
444 // The heartbeat task returns `Pending` while it continues to run.
445 // But if it has dropped its receiver, it is shutting down, and we should also shut down.
446 if is_canceled {
447 self.set_task_exited_error(
448 "heartbeat",
449 PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
450 )
451 } else {
452 // Heartbeat task is still running.
453 return Poll::Pending;
454 }
455 }
456 Poll::Ready(Ok(Ok(_))) => {
457 // Heartbeat task stopped unexpectedly, without panic or error.
458 self.set_task_exited_error(
459 "heartbeat",
460 PeerError::HeartbeatTaskExited(
461 "Heartbeat task stopped unexpectedly".to_string(),
462 ),
463 )
464 }
465 Poll::Ready(Ok(Err(error))) => {
466 // Heartbeat task stopped unexpectedly, with error.
467 self.set_task_exited_error(
468 "heartbeat",
469 PeerError::HeartbeatTaskExited(error.to_string()),
470 )
471 }
472 Poll::Ready(Err(error)) => {
473 // Heartbeat task panicked.
474 let error = error.panic_if_task_has_panicked();
475
476 // Heartbeat task was cancelled.
477 if error.is_cancelled() {
478 self.set_task_exited_error(
479 "heartbeat",
480 PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
481 )
482 }
483 // Heartbeat task stopped with another kind of task error.
484 else {
485 self.set_task_exited_error(
486 "heartbeat",
487 PeerError::HeartbeatTaskExited(error.to_string()),
488 )
489 }
490 }
491 };
492
493 Poll::Ready(result)
494 }
495
496 /// Check if the connection's request/response task has exited.
497 ///
498 /// Returns an error if the connection task exited. Otherwise, schedules the client task for
499 /// wakeup when the connection task finishes, and returns `Pending`.
500 ///
501 /// # Panics
502 ///
503 /// If the connection task panicked.
504 fn poll_connection(&mut self, context: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
505 // Return `Pending` if the connection task is still running.
506 let result = match ready!(self.connection_task.poll_unpin(context)) {
507 Ok(()) => {
508 // Connection task stopped unexpectedly, without panicking.
509 self.set_task_exited_error("connection", PeerError::ConnectionTaskExited)
510 }
511 Err(error) => {
512 // Connection task panicked.
513 let error = error.panic_if_task_has_panicked();
514
515 // Connection task was cancelled.
516 if error.is_cancelled() {
517 self.set_task_exited_error(
518 "connection",
519 PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
520 )
521 }
522 // Connection task stopped with another kind of task error.
523 else {
524 self.set_task_exited_error(
525 "connection",
526 PeerError::HeartbeatTaskExited(error.to_string()),
527 )
528 }
529 }
530 };
531
532 Poll::Ready(result)
533 }
534
535 /// Properly update the error slot after a background task has unexpectedly stopped.
536 fn set_task_exited_error(
537 &mut self,
538 task_name: &str,
539 error: PeerError,
540 ) -> Result<(), SharedPeerError> {
541 // Make sure there is an error in the slot
542 let task_error = SharedPeerError::from(error);
543 let original_error = self.error_slot.try_update_error(task_error.clone());
544 debug!(
545 ?original_error,
546 latest_error = ?task_error,
547 "client {} task exited", task_name
548 );
549
550 if let Err(AlreadyErrored { original_error }) = original_error {
551 Err(original_error)
552 } else {
553 Err(task_error)
554 }
555 }
556
557 /// Poll for space in the shared request sender channel.
558 ///
559 /// Returns an error if the sender channel is closed. If there is no space in the channel,
560 /// returns `Pending`, and schedules the task for wakeup when there is space available.
561 fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
562 let server_result = ready!(self.server_tx.poll_ready(cx));
563 if server_result.is_err() {
564 Poll::Ready(Err(self
565 .error_slot
566 .try_get_error()
567 .unwrap_or_else(|| PeerError::ConnectionTaskExited.into())))
568 } else if let Some(error) = self.error_slot.try_get_error() {
569 Poll::Ready(Err(error))
570 } else {
571 Poll::Ready(Ok(()))
572 }
573 }
574
575 /// Poll for space in the shared request sender channel, and for errors in the connection tasks.
576 ///
577 /// Returns an error if the sender channel is closed, or the heartbeat or connection tasks have
578 /// terminated. If there is no space in the channel, returns `Pending`, and schedules the task
579 /// for wakeup when there is space available, or one of the tasks terminates.
580 fn poll_client(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SharedPeerError>> {
581 // # Correctness
582 //
583 // The current task must be scheduled for wakeup every time we return
584 // `Poll::Pending`.
585 //
586 // `poll_heartbeat()` and `poll_connection()` schedule the client task for wakeup
587 // if either task exits, or if the heartbeat task drops the cancel handle.
588 //
589 //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
590 // schedules this task for wakeup.
591 //
592 // It's ok to exit early and skip wakeups when there is an error, because the connection
593 // and its tasks are shut down immediately on error.
594
595 let _heartbeat_pending: Poll<()> = self.poll_heartbeat(cx)?;
596 let _connection_pending: Poll<()> = self.poll_connection(cx)?;
597
598 // We're only pending if the sender channel is full.
599 self.poll_request(cx)
600 }
601
602 /// Shut down the resources held by the client half of this peer connection.
603 ///
604 /// Stops further requests to the remote peer, and stops the heartbeat task.
605 fn shutdown(&mut self) {
606 // Prevent any senders from sending more messages to this peer.
607 self.server_tx.close_channel();
608
609 // Ask the heartbeat task to stop.
610 if let Some(shutdown_tx) = self.shutdown_tx.take() {
611 let _ = shutdown_tx.send(CancelHeartbeatTask);
612 }
613
614 // Force the connection and heartbeat tasks to stop.
615 self.connection_task.abort();
616 self.heartbeat_task.abort();
617 }
618}
619
620impl Service<Request> for Client {
621 type Response = Response;
622 type Error = SharedPeerError;
623 type Future =
624 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
625
626 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
627 // # Correctness
628 //
629 // The current task must be scheduled for wakeup every time we return
630 // `Poll::Pending`.
631 //
632 // `poll_client()` schedules the client task for wakeup if the sender channel has space,
633 // either connection task exits, or if the heartbeat task drops the cancel handle.
634
635 // Check all the tasks and channels.
636 //
637 //`ready!` returns `Poll::Pending` when `server_tx` is unready, and
638 // schedules this task for wakeup.
639 let result = ready!(self.poll_client(cx));
640
641 // Shut down the client and connection if there is an error.
642 if let Err(error) = result {
643 self.shutdown();
644
645 Poll::Ready(Err(error))
646 } else {
647 Poll::Ready(Ok(()))
648 }
649 }
650
651 fn call(&mut self, request: Request) -> Self::Future {
652 let (tx, rx) = oneshot::channel();
653 // get the current Span to propagate it to the peer connection task.
654 // this allows the peer connection to enter the correct tracing context
655 // when it's handling messages in the context of processing this
656 // request.
657 let span = tracing::Span::current();
658
659 match self.server_tx.try_send(ClientRequest {
660 request,
661 tx,
662 inv_collector: Some(self.inv_collector.clone()),
663 transient_addr: self.connection_info.connected_addr.get_transient_addr(),
664 span,
665 }) {
666 Err(e) => {
667 if e.is_disconnected() {
668 let peer_error = self
669 .error_slot
670 .try_get_error()
671 .unwrap_or_else(|| PeerError::ConnectionTaskExited.into());
672
673 let ClientRequest { tx, .. } = e.into_inner();
674 let _ = tx.send(Err(peer_error.clone()));
675
676 future::ready(Err(peer_error)).boxed()
677 } else {
678 // sending fails when there's not enough
679 // channel space, but we called poll_ready
680 panic!("called call without poll_ready");
681 }
682 }
683 Ok(()) => {
684 // The receiver end of the oneshot is itself a future.
685 rx.map(|oneshot_recv_result| {
686 // The ClientRequest oneshot sender should not be dropped before sending a
687 // response. But sometimes that happens during process or connection shutdown.
688 // So we just return a generic error here.
689 match oneshot_recv_result {
690 Ok(result) => result,
691 Err(oneshot::Canceled) => Err(PeerError::ConnectionDropped.into()),
692 }
693 })
694 .boxed()
695 }
696 }
697 }
698}
699
700impl Drop for Client {
701 fn drop(&mut self) {
702 // Make sure there is an error in the slot
703 let drop_error: SharedPeerError = PeerError::ClientDropped.into();
704 let original_error = self.error_slot.try_update_error(drop_error.clone());
705 debug!(
706 ?original_error,
707 latest_error = ?drop_error,
708 "client struct dropped"
709 );
710
711 self.shutdown();
712 }
713}