zebra_network/peer/
connector.rs

1//! Wrapper around handshake logic that also opens a TCP connection.
2
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures::prelude::*;
9use tokio::net::TcpStream;
10use tower::{Service, ServiceExt};
11use tracing_futures::Instrument;
12
13use zebra_chain::chain_tip::{ChainTip, NoChainTip};
14
15use crate::{
16    peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
17    peer_set::ConnectionTracker,
18    BoxError, PeerSocketAddr, Request, Response,
19};
20
21/// A wrapper around [`Handshake`] that opens a TCP connection before
22/// forwarding to the inner handshake service. Writing this as its own
23/// [`tower::Service`] lets us apply unified timeout policies, etc.
24pub struct Connector<S, C = NoChainTip>
25where
26    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
27    S::Future: Send,
28    C: ChainTip + Clone + Send + 'static,
29{
30    handshaker: Handshake<S, C>,
31}
32
33impl<S, C> Clone for Connector<S, C>
34where
35    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
36    S::Future: Send,
37    C: ChainTip + Clone + Send + 'static,
38{
39    fn clone(&self) -> Self {
40        Connector {
41            handshaker: self.handshaker.clone(),
42        }
43    }
44}
45
46impl<S, C> Connector<S, C>
47where
48    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
49    S::Future: Send,
50    C: ChainTip + Clone + Send + 'static,
51{
52    pub fn new(handshaker: Handshake<S, C>) -> Self {
53        Connector { handshaker }
54    }
55}
56
57/// A connector request.
58/// Contains the information needed to make an outbound connection to the peer.
59pub struct OutboundConnectorRequest {
60    /// The Zcash listener address of the peer.
61    pub addr: PeerSocketAddr,
62
63    /// A connection tracker that reduces the open connection count when dropped.
64    ///
65    /// Used to limit the number of open connections in Zebra.
66    pub connection_tracker: ConnectionTracker,
67}
68
69impl<S, C> Service<OutboundConnectorRequest> for Connector<S, C>
70where
71    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
72    S::Future: Send,
73    C: ChainTip + Clone + Send + 'static,
74{
75    type Response = (PeerSocketAddr, Client);
76    type Error = BoxError;
77    type Future =
78        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
79
80    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        Poll::Ready(Ok(()))
82    }
83
84    fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future {
85        let OutboundConnectorRequest {
86            addr,
87            connection_tracker,
88        }: OutboundConnectorRequest = req;
89
90        let hs = self.handshaker.clone();
91        let connected_addr = ConnectedAddr::new_outbound_direct(addr);
92        let connector_span = info_span!("connector", peer = ?connected_addr);
93
94        // # Security
95        //
96        // `zebra_network::init()` implements a connection timeout on this future.
97        // Any code outside this future does not have a timeout.
98        async move {
99            let tcp_stream = TcpStream::connect(*addr).await?;
100            let client = hs
101                .oneshot(HandshakeRequest::<TcpStream> {
102                    data_stream: tcp_stream,
103                    connected_addr,
104                    connection_tracker,
105                })
106                .await?;
107            Ok((addr, client))
108        }
109        .instrument(connector_span)
110        .boxed()
111    }
112}