zebra_network/peer_set/
unready_service.rs

1//! Services that are busy or newly created.
2//!
3//! The [`UnreadyService`] implementation is adapted from the one in [tower::Balance][tower-balance].
4//!
5//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
6
7use std::{
8    future::Future,
9    marker::PhantomData,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use futures::{channel::oneshot, ready};
15use tower::Service;
16
17use crate::peer_set::set::CancelClientWork;
18
19#[cfg(test)]
20mod tests;
21
22/// A Future that becomes satisfied when an `S`-typed service is ready.
23///
24/// May fail due to cancellation, i.e. if the service is removed from discovery.
25#[pin_project]
26#[derive(Debug)]
27pub(super) struct UnreadyService<K, S, Req> {
28    /// The key used to lookup `service`.
29    pub(super) key: Option<K>,
30
31    /// A oneshot used to cancel the request the `service` is currently working on, if any.
32    #[pin]
33    pub(super) cancel: oneshot::Receiver<CancelClientWork>,
34
35    /// The `service` that is busy (or newly created).
36    pub(super) service: Option<S>,
37
38    /// Dropping `service` might drop a request.
39    /// This [`PhantomData`] tells the Rust compiler to do a drop check for `Req`.
40    pub(super) _req: PhantomData<Req>,
41}
42
43#[derive(Debug, Eq, PartialEq)]
44pub(super) enum Error<E> {
45    Inner(E),
46    Canceled,
47    CancelHandleDropped(oneshot::Canceled),
48}
49
50impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
51    type Output = Result<(K, S), (K, Error<S::Error>)>;
52
53    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54        let this = self.project();
55
56        if let Poll::Ready(oneshot_result) = this.cancel.poll(cx) {
57            let key = this.key.take().expect("polled after ready");
58
59            // # Correctness
60            //
61            // Return an error if the service is explicitly canceled,
62            // or its cancel handle is dropped, implicitly cancelling it.
63            match oneshot_result {
64                Ok(CancelClientWork) => return Poll::Ready(Err((key, Error::Canceled))),
65                Err(canceled_error) => {
66                    return Poll::Ready(Err((key, Error::CancelHandleDropped(canceled_error))))
67                }
68            }
69        }
70
71        // # Correctness
72        //
73        // The current task must be scheduled for wakeup every time we return
74        // `Poll::Pending`.
75        //
76        //`ready!` returns `Poll::Pending` when the service is unready, and
77        // the inner `poll_ready` schedules this task for wakeup.
78        //
79        // `cancel.poll` also schedules this task for wakeup if it is canceled.
80        let res = ready!(this
81            .service
82            .as_mut()
83            .expect("polled after ready")
84            .poll_ready(cx));
85
86        let key = this.key.take().expect("polled after ready");
87        let svc = this.service.take().expect("polled after ready");
88
89        match res {
90            Ok(()) => Poll::Ready(Ok((key, svc))),
91            Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
92        }
93    }
94}