zebra_network/peer/
connector.rs
1use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::prelude::*;
9use tokio::net::TcpStream;
10use tower::{Service, ServiceExt};
11use tracing_futures::Instrument;
12
13use zebra_chain::chain_tip::{ChainTip, NoChainTip};
14
15use crate::{
16 peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
17 peer_set::ConnectionTracker,
18 BoxError, PeerSocketAddr, Request, Response,
19};
20
21pub struct Connector<S, C = NoChainTip>
25where
26 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
27 S::Future: Send,
28 C: ChainTip + Clone + Send + 'static,
29{
30 handshaker: Handshake<S, C>,
31}
32
33impl<S, C> Clone for Connector<S, C>
34where
35 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
36 S::Future: Send,
37 C: ChainTip + Clone + Send + 'static,
38{
39 fn clone(&self) -> Self {
40 Connector {
41 handshaker: self.handshaker.clone(),
42 }
43 }
44}
45
46impl<S, C> Connector<S, C>
47where
48 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
49 S::Future: Send,
50 C: ChainTip + Clone + Send + 'static,
51{
52 pub fn new(handshaker: Handshake<S, C>) -> Self {
53 Connector { handshaker }
54 }
55}
56
57pub struct OutboundConnectorRequest {
60 pub addr: PeerSocketAddr,
62
63 pub connection_tracker: ConnectionTracker,
67}
68
69impl<S, C> Service<OutboundConnectorRequest> for Connector<S, C>
70where
71 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
72 S::Future: Send,
73 C: ChainTip + Clone + Send + 'static,
74{
75 type Response = (PeerSocketAddr, Client);
76 type Error = BoxError;
77 type Future =
78 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
79
80 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81 Poll::Ready(Ok(()))
82 }
83
84 fn call(&mut self, req: OutboundConnectorRequest) -> Self::Future {
85 let OutboundConnectorRequest {
86 addr,
87 connection_tracker,
88 }: OutboundConnectorRequest = req;
89
90 let hs = self.handshaker.clone();
91 let connected_addr = ConnectedAddr::new_outbound_direct(addr);
92 let connector_span = info_span!("connector", peer = ?connected_addr);
93
94 async move {
99 let tcp_stream = TcpStream::connect(*addr).await?;
100 let client = hs
101 .oneshot(HandshakeRequest::<TcpStream> {
102 data_stream: tcp_stream,
103 connected_addr,
104 connection_tracker,
105 })
106 .await?;
107 Ok((addr, client))
108 }
109 .instrument(connector_span)
110 .boxed()
111 }
112}