zebra_network/peer/connection/
peer_tx.rs

1//! The peer message sender channel.
2
3use futures::{FutureExt, Sink, SinkExt};
4
5use zebra_chain::serialization::SerializationError;
6
7use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError};
8
9/// A wrapper type for a peer connection message sender.
10///
11/// Used to apply a timeout to send messages.
12#[derive(Clone, Debug)]
13pub struct PeerTx<Tx>
14where
15    Tx: Sink<Message, Error = SerializationError> + Unpin,
16{
17    /// A channel for sending Zcash messages to the connected peer.
18    ///
19    /// This channel accepts [`Message`]s.
20    inner: Tx,
21}
22
23impl<Tx> PeerTx<Tx>
24where
25    Tx: Sink<Message, Error = SerializationError> + Unpin,
26{
27    /// Sends `msg` on `self.inner`, returning a timeout error if it takes too long.
28    pub async fn send(&mut self, msg: Message) -> Result<(), PeerError> {
29        tokio::time::timeout(REQUEST_TIMEOUT, self.inner.send(msg))
30            .await
31            .map_err(|_| PeerError::ConnectionSendTimeout)?
32            .map_err(Into::into)
33    }
34
35    /// Flush any remaining output and close this [`PeerTx`], if necessary.
36    pub async fn close(&mut self) -> Result<(), SerializationError> {
37        self.inner.close().await
38    }
39}
40
41impl<Tx> From<Tx> for PeerTx<Tx>
42where
43    Tx: Sink<Message, Error = SerializationError> + Unpin,
44{
45    fn from(tx: Tx) -> Self {
46        PeerTx { inner: tx }
47    }
48}
49
50impl<Tx> Drop for PeerTx<Tx>
51where
52    Tx: Sink<Message, Error = SerializationError> + Unpin,
53{
54    fn drop(&mut self) {
55        // Do a last-ditch close attempt on the sink
56        self.close().now_or_never();
57    }
58}