zebra_network/peer/connection/
peer_tx.rs
1use futures::{FutureExt, Sink, SinkExt};
4
5use zebra_chain::serialization::SerializationError;
6
7use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError};
8
9#[derive(Clone, Debug)]
13pub struct PeerTx<Tx>
14where
15 Tx: Sink<Message, Error = SerializationError> + Unpin,
16{
17 inner: Tx,
21}
22
23impl<Tx> PeerTx<Tx>
24where
25 Tx: Sink<Message, Error = SerializationError> + Unpin,
26{
27 pub async fn send(&mut self, msg: Message) -> Result<(), PeerError> {
29 tokio::time::timeout(REQUEST_TIMEOUT, self.inner.send(msg))
30 .await
31 .map_err(|_| PeerError::ConnectionSendTimeout)?
32 .map_err(Into::into)
33 }
34
35 pub async fn close(&mut self) -> Result<(), SerializationError> {
37 self.inner.close().await
38 }
39}
40
41impl<Tx> From<Tx> for PeerTx<Tx>
42where
43 Tx: Sink<Message, Error = SerializationError> + Unpin,
44{
45 fn from(tx: Tx) -> Self {
46 PeerTx { inner: tx }
47 }
48}
49
50impl<Tx> Drop for PeerTx<Tx>
51where
52 Tx: Sink<Message, Error = SerializationError> + Unpin,
53{
54 fn drop(&mut self) {
55 self.close().now_or_never();
57 }
58}