zebra_network/protocol/external/
codec.rs

1//! A Tokio codec mapping byte streams to Bitcoin message streams.
2
3use std::{
4    cmp::min,
5    fmt,
6    io::{Cursor, Read, Write},
7};
8
9use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
10use bytes::{BufMut, BytesMut};
11use chrono::{TimeZone, Utc};
12use tokio_util::codec::{Decoder, Encoder};
13
14use zebra_chain::{
15    block::{self, Block},
16    parameters::{Magic, Network},
17    serialization::{
18        sha256d, zcash_deserialize_bytes_external_count, zcash_deserialize_string_external_count,
19        CompactSizeMessage, FakeWriter, ReadZcashExt, SerializationError as Error,
20        ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize, MAX_PROTOCOL_MESSAGE_LEN,
21    },
22    transaction::Transaction,
23};
24
25use crate::constants;
26
27use super::{
28    addr::{AddrInVersion, AddrV1, AddrV2},
29    message::{
30        Message, RejectReason, VersionMessage, MAX_REJECT_MESSAGE_LENGTH, MAX_REJECT_REASON_LENGTH,
31        MAX_USER_AGENT_LENGTH,
32    },
33    types::*,
34};
35
36#[cfg(test)]
37mod tests;
38
39/// The length of a Bitcoin message header.
40const HEADER_LEN: usize = 24usize;
41
42/// A codec which produces Bitcoin messages from byte streams and vice versa.
43pub struct Codec {
44    builder: Builder,
45    state: DecodeState,
46}
47
48/// A builder for specifying [`Codec`] options.
49pub struct Builder {
50    /// The network magic to use in encoding.
51    network: Network,
52    /// The protocol version to speak when encoding/decoding.
53    version: Version,
54    /// The maximum allowable message length.
55    max_len: usize,
56    /// An optional address label, to use for reporting metrics.
57    metrics_addr_label: Option<String>,
58}
59
60impl Codec {
61    /// Return a builder for constructing a [`Codec`].
62    pub fn builder() -> Builder {
63        Builder {
64            network: Network::Mainnet,
65            version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
66            max_len: MAX_PROTOCOL_MESSAGE_LEN,
67            metrics_addr_label: None,
68        }
69    }
70
71    /// Reconfigure the version used by the codec, e.g., after completing a handshake.
72    pub fn reconfigure_version(&mut self, version: Version) {
73        self.builder.version = version;
74    }
75}
76
77impl Builder {
78    /// Finalize the builder and return a [`Codec`].
79    pub fn finish(self) -> Codec {
80        Codec {
81            builder: self,
82            state: DecodeState::Head,
83        }
84    }
85
86    /// Configure the codec for the given [`Network`].
87    pub fn for_network(mut self, network: &Network) -> Self {
88        self.network = network.clone();
89        self
90    }
91
92    /// Configure the codec for the given [`Version`].
93    #[allow(dead_code)]
94    pub fn for_version(mut self, version: Version) -> Self {
95        self.version = version;
96        self
97    }
98
99    /// Configure the codec's maximum accepted payload size, in bytes.
100    #[allow(dead_code)]
101    pub fn with_max_body_len(mut self, len: usize) -> Self {
102        self.max_len = len;
103        self
104    }
105
106    /// Configure the codec with a label corresponding to the peer address.
107    pub fn with_metrics_addr_label(mut self, metrics_addr_label: String) -> Self {
108        self.metrics_addr_label = Some(metrics_addr_label);
109        self
110    }
111}
112
113// ======== Encoding =========
114
115impl Encoder<Message> for Codec {
116    type Error = Error;
117
118    fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
119        use Error::Parse;
120
121        let body_length = self.body_length(&item);
122
123        if body_length > self.builder.max_len {
124            return Err(Parse("body length exceeded maximum size"));
125        }
126
127        if let Some(addr_label) = self.builder.metrics_addr_label.clone() {
128            metrics::counter!("zcash.net.out.bytes.total",
129                              "addr" => addr_label)
130            .increment((body_length + HEADER_LEN) as u64);
131        }
132
133        use Message::*;
134        // Note: because all match arms must have
135        // the same type, and the array length is
136        // part of the type, having at least one
137        // of length 12 checks that they are all
138        // of length 12, as they must be &[u8; 12].
139        let command = match item {
140            Version { .. } => b"version\0\0\0\0\0",
141            Verack => b"verack\0\0\0\0\0\0",
142            Ping { .. } => b"ping\0\0\0\0\0\0\0\0",
143            Pong { .. } => b"pong\0\0\0\0\0\0\0\0",
144            Reject { .. } => b"reject\0\0\0\0\0\0",
145            Addr { .. } => b"addr\0\0\0\0\0\0\0\0",
146            GetAddr => b"getaddr\0\0\0\0\0",
147            Block { .. } => b"block\0\0\0\0\0\0\0",
148            GetBlocks { .. } => b"getblocks\0\0\0",
149            Headers { .. } => b"headers\0\0\0\0\0",
150            GetHeaders { .. } => b"getheaders\0\0",
151            Inv { .. } => b"inv\0\0\0\0\0\0\0\0\0",
152            GetData { .. } => b"getdata\0\0\0\0\0",
153            NotFound { .. } => b"notfound\0\0\0\0",
154            Tx { .. } => b"tx\0\0\0\0\0\0\0\0\0\0",
155            Mempool => b"mempool\0\0\0\0\0",
156            FilterLoad { .. } => b"filterload\0\0",
157            FilterAdd { .. } => b"filteradd\0\0\0",
158            FilterClear => b"filterclear\0",
159        };
160        trace!(?item, len = body_length);
161
162        dst.reserve(HEADER_LEN + body_length);
163        let start_len = dst.len();
164        {
165            let dst = &mut dst.writer();
166            dst.write_all(&self.builder.network.magic().0[..])?;
167            dst.write_all(command)?;
168            dst.write_u32::<LittleEndian>(body_length as u32)?;
169
170            // We zero the checksum at first, and compute it later
171            // after the body has been written.
172            dst.write_u32::<LittleEndian>(0)?;
173
174            self.write_body(&item, dst)?;
175        }
176        let checksum = sha256d::Checksum::from(&dst[start_len + HEADER_LEN..]);
177        dst[start_len + 20..][..4].copy_from_slice(&checksum.0);
178
179        Ok(())
180    }
181}
182
183impl Codec {
184    /// Obtain the size of the body of a given message. This will match the
185    /// number of bytes written to the writer provided to `write_body` for the
186    /// same message.
187    // # Performance TODO
188    //
189    // If this code shows up in profiles, replace with a size estimate or cached size,
190    // to avoid multiple serializations for large data structures like lists, blocks, and transactions.
191    fn body_length(&self, msg: &Message) -> usize {
192        let mut writer = FakeWriter(0);
193
194        self.write_body(msg, &mut writer)
195            .expect("writer should never fail");
196        writer.0
197    }
198
199    /// Write the body of the message into the given writer. This allows writing
200    /// the message body prior to writing the header, so that the header can
201    /// contain a checksum of the message body.
202    fn write_body<W: Write>(&self, msg: &Message, mut writer: W) -> Result<(), Error> {
203        match msg {
204            Message::Version(VersionMessage {
205                version,
206                services,
207                timestamp,
208                address_recv,
209                address_from,
210                nonce,
211                user_agent,
212                start_height,
213                relay,
214            }) => {
215                writer.write_u32::<LittleEndian>(version.0)?;
216                writer.write_u64::<LittleEndian>(services.bits())?;
217                // # Security
218                // DateTime<Utc>::timestamp has a smaller range than i64, so
219                // serialization can not error.
220                writer.write_i64::<LittleEndian>(timestamp.timestamp())?;
221
222                address_recv.zcash_serialize(&mut writer)?;
223                address_from.zcash_serialize(&mut writer)?;
224
225                writer.write_u64::<LittleEndian>(nonce.0)?;
226
227                if user_agent.len() > MAX_USER_AGENT_LENGTH {
228                    // zcashd won't accept this version message
229                    return Err(Error::Parse(
230                        "user agent too long: must be 256 bytes or less",
231                    ));
232                }
233
234                user_agent.zcash_serialize(&mut writer)?;
235                writer.write_u32::<LittleEndian>(start_height.0)?;
236                writer.write_u8(*relay as u8)?;
237            }
238            Message::Verack => { /* Empty payload -- no-op */ }
239            Message::Ping(nonce) => {
240                writer.write_u64::<LittleEndian>(nonce.0)?;
241            }
242            Message::Pong(nonce) => {
243                writer.write_u64::<LittleEndian>(nonce.0)?;
244            }
245            Message::Reject {
246                message,
247                ccode,
248                reason,
249                data,
250            } => {
251                if message.len() > MAX_REJECT_MESSAGE_LENGTH {
252                    // zcashd won't accept this reject message
253                    return Err(Error::Parse(
254                        "reject message too long: must be 12 bytes or less",
255                    ));
256                }
257
258                message.zcash_serialize(&mut writer)?;
259
260                writer.write_u8(*ccode as u8)?;
261
262                if reason.len() > MAX_REJECT_REASON_LENGTH {
263                    return Err(Error::Parse(
264                        "reject reason too long: must be 111 bytes or less",
265                    ));
266                }
267
268                reason.zcash_serialize(&mut writer)?;
269                if let Some(data) = data {
270                    writer.write_all(data)?;
271                }
272            }
273            Message::Addr(addrs) => {
274                assert!(
275                    addrs.len() <= constants::MAX_ADDRS_IN_MESSAGE,
276                    "unexpectedly large Addr message: greater than MAX_ADDRS_IN_MESSAGE addresses"
277                );
278
279                // Regardless of the way we received the address,
280                // Zebra always sends `addr` messages
281                let v1_addrs: Vec<AddrV1> = addrs.iter().map(|addr| AddrV1::from(*addr)).collect();
282                v1_addrs.zcash_serialize(&mut writer)?
283            }
284            Message::GetAddr => { /* Empty payload -- no-op */ }
285            Message::Block(block) => block.zcash_serialize(&mut writer)?,
286            Message::GetBlocks { known_blocks, stop } => {
287                writer.write_u32::<LittleEndian>(self.builder.version.0)?;
288                known_blocks.zcash_serialize(&mut writer)?;
289                stop.unwrap_or(block::Hash([0; 32]))
290                    .zcash_serialize(&mut writer)?;
291            }
292            Message::GetHeaders { known_blocks, stop } => {
293                writer.write_u32::<LittleEndian>(self.builder.version.0)?;
294                known_blocks.zcash_serialize(&mut writer)?;
295                stop.unwrap_or(block::Hash([0; 32]))
296                    .zcash_serialize(&mut writer)?;
297            }
298            Message::Headers(headers) => headers.zcash_serialize(&mut writer)?,
299            Message::Inv(hashes) => hashes.zcash_serialize(&mut writer)?,
300            Message::GetData(hashes) => hashes.zcash_serialize(&mut writer)?,
301            Message::NotFound(hashes) => hashes.zcash_serialize(&mut writer)?,
302            Message::Tx(transaction) => transaction.transaction.zcash_serialize(&mut writer)?,
303            Message::Mempool => { /* Empty payload -- no-op */ }
304            Message::FilterLoad {
305                filter,
306                hash_functions_count,
307                tweak,
308                flags,
309            } => {
310                writer.write_all(&filter.0)?;
311                writer.write_u32::<LittleEndian>(*hash_functions_count)?;
312                writer.write_u32::<LittleEndian>(tweak.0)?;
313                writer.write_u8(*flags)?;
314            }
315            Message::FilterAdd { data } => {
316                writer.write_all(data)?;
317            }
318            Message::FilterClear => { /* Empty payload -- no-op */ }
319        }
320        Ok(())
321    }
322}
323
324// ======== Decoding =========
325
326enum DecodeState {
327    Head,
328    Body {
329        body_len: usize,
330        command: [u8; 12],
331        checksum: sha256d::Checksum,
332    },
333}
334
335impl fmt::Debug for DecodeState {
336    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
337        match self {
338            DecodeState::Head => write!(f, "DecodeState::Head"),
339            DecodeState::Body {
340                body_len,
341                command,
342                checksum,
343            } => f
344                .debug_struct("DecodeState::Body")
345                .field("body_len", &body_len)
346                .field("command", &String::from_utf8_lossy(command))
347                .field("checksum", &checksum)
348                .finish(),
349        }
350    }
351}
352
353impl Decoder for Codec {
354    type Item = Message;
355    type Error = Error;
356
357    #[allow(clippy::unwrap_in_result)]
358    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
359        use Error::Parse;
360        match self.state {
361            DecodeState::Head => {
362                // First check that the src buffer contains an entire header.
363                if src.len() < HEADER_LEN {
364                    trace!(?self.state, "src buffer does not have an entire header, waiting");
365                    // Signal that decoding requires more data.
366                    return Ok(None);
367                }
368
369                // Now that we know that src contains a header, split off the header section.
370                let header = src.split_to(HEADER_LEN);
371
372                // Create a cursor over the header and parse its fields.
373                let mut header_reader = Cursor::new(&header);
374                let magic = Magic(header_reader.read_4_bytes()?);
375                let command = header_reader.read_12_bytes()?;
376                let body_len = header_reader.read_u32::<LittleEndian>()? as usize;
377                let checksum = sha256d::Checksum(header_reader.read_4_bytes()?);
378                trace!(
379                    ?self.state,
380                    ?magic,
381                    command = %String::from_utf8(
382                        command.iter()
383                            .cloned()
384                            .flat_map(std::ascii::escape_default)
385                            .collect()
386                    ).unwrap(),
387                    body_len,
388                    ?checksum,
389                    "read header from src buffer"
390                );
391
392                if magic != self.builder.network.magic() {
393                    return Err(Parse("supplied magic did not meet expectations"));
394                }
395                if body_len > self.builder.max_len {
396                    return Err(Parse("body length exceeded maximum size"));
397                }
398
399                if let Some(label) = self.builder.metrics_addr_label.clone() {
400                    metrics::counter!("zcash.net.in.bytes.total", "addr" =>  label)
401                        .increment((body_len + HEADER_LEN) as u64);
402                }
403
404                // Reserve buffer space for the expected body and the following header.
405                src.reserve(body_len + HEADER_LEN);
406
407                self.state = DecodeState::Body {
408                    body_len,
409                    command,
410                    checksum,
411                };
412
413                // Now that the state is updated, recurse to attempt body decoding.
414                self.decode(src)
415            }
416            DecodeState::Body {
417                body_len,
418                command,
419                checksum,
420            } => {
421                if src.len() < body_len {
422                    // Need to wait for the full body
423                    trace!(?self.state, len = src.len(), "src buffer does not have an entire body, waiting");
424                    return Ok(None);
425                }
426
427                // Now that we know we have the full body, split off the body,
428                // and reset the decoder state for the next message. Otherwise
429                // we will attempt to read the next header as the current body.
430                let body = src.split_to(body_len);
431                self.state = DecodeState::Head;
432
433                if checksum != sha256d::Checksum::from(&body[..]) {
434                    return Err(Parse(
435                        "supplied message checksum does not match computed checksum",
436                    ));
437                }
438
439                let mut body_reader = Cursor::new(&body);
440                match &command {
441                    b"version\0\0\0\0\0" => self.read_version(&mut body_reader),
442                    b"verack\0\0\0\0\0\0" => self.read_verack(&mut body_reader),
443                    b"ping\0\0\0\0\0\0\0\0" => self.read_ping(&mut body_reader),
444                    b"pong\0\0\0\0\0\0\0\0" => self.read_pong(&mut body_reader),
445                    b"reject\0\0\0\0\0\0" => self.read_reject(&mut body_reader),
446                    b"addr\0\0\0\0\0\0\0\0" => self.read_addr(&mut body_reader),
447                    b"addrv2\0\0\0\0\0\0" => self.read_addrv2(&mut body_reader),
448                    b"getaddr\0\0\0\0\0" => self.read_getaddr(&mut body_reader),
449                    b"block\0\0\0\0\0\0\0" => self.read_block(&mut body_reader),
450                    b"getblocks\0\0\0" => self.read_getblocks(&mut body_reader),
451                    b"headers\0\0\0\0\0" => self.read_headers(&mut body_reader),
452                    b"getheaders\0\0" => self.read_getheaders(&mut body_reader),
453                    b"inv\0\0\0\0\0\0\0\0\0" => self.read_inv(&mut body_reader),
454                    b"getdata\0\0\0\0\0" => self.read_getdata(&mut body_reader),
455                    b"notfound\0\0\0\0" => self.read_notfound(&mut body_reader),
456                    b"tx\0\0\0\0\0\0\0\0\0\0" => self.read_tx(&mut body_reader),
457                    b"mempool\0\0\0\0\0" => self.read_mempool(&mut body_reader),
458                    b"filterload\0\0" => self.read_filterload(&mut body_reader, body_len),
459                    b"filteradd\0\0\0" => self.read_filteradd(&mut body_reader, body_len),
460                    b"filterclear\0" => self.read_filterclear(&mut body_reader),
461                    _ => {
462                        let command_string = String::from_utf8_lossy(&command);
463
464                        // # Security
465                        //
466                        // Zcash connections are not authenticated, so malicious nodes can
467                        // send fake messages, with connected peers' IP addresses in the IP header.
468                        //
469                        // Since we can't verify their source, Zebra needs to ignore unexpected messages,
470                        // because closing the connection could cause a denial of service or eclipse attack.
471                        debug!(?command, %command_string, "unknown message command from peer");
472                        return Ok(None);
473                    }
474                }
475                // We need Ok(Some(msg)) to signal that we're done decoding.
476                // This is also convenient for tracing the parse result.
477                .map(|msg| {
478                    // bitcoin allows extra data at the end of most messages,
479                    // so that old nodes can still read newer message formats,
480                    // and ignore any extra fields
481                    let extra_bytes = body.len() as u64 - body_reader.position();
482                    if extra_bytes == 0 {
483                        trace!(?extra_bytes, %msg, "finished message decoding");
484                    } else {
485                        // log when there are extra bytes, so we know when we need to
486                        // upgrade message formats
487                        debug!(?extra_bytes, %msg, "extra data after decoding message");
488                    }
489                    Some(msg)
490                })
491            }
492        }
493    }
494}
495
496impl Codec {
497    /// Deserializes a version message.
498    ///
499    /// The `relay` field is optional, as defined in <https://developer.bitcoin.org/reference/p2p_networking.html#version>
500    ///
501    /// Note: zcashd only requires fields up to `address_recv`, but everything up to `relay` is required in Zebra.
502    ///       see <https://github.com/zcash/zcash/blob/11d563904933e889a11d9685c3b249f1536cfbe7/src/main.cpp#L6490-L6507>
503    fn read_version<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
504        Ok(VersionMessage {
505            version: Version(reader.read_u32::<LittleEndian>()?),
506            // Use from_bits_truncate to discard unknown service bits.
507            services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
508            timestamp: Utc
509                .timestamp_opt(reader.read_i64::<LittleEndian>()?, 0)
510                .single()
511                .ok_or(Error::Parse(
512                    "version timestamp is out of range for DateTime",
513                ))?,
514            address_recv: AddrInVersion::zcash_deserialize(&mut reader)?,
515            address_from: AddrInVersion::zcash_deserialize(&mut reader)?,
516            nonce: Nonce(reader.read_u64::<LittleEndian>()?),
517            user_agent: {
518                let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
519                let byte_count: usize = byte_count.into();
520
521                // # Security
522                //
523                // Limit peer set memory usage, Zebra stores an `Arc<VersionMessage>` per
524                // connected peer.
525                //
526                // Without this check, we can use `200 peers * 2 MB message size limit = 400 MB`.
527                if byte_count > MAX_USER_AGENT_LENGTH {
528                    return Err(Error::Parse(
529                        "user agent too long: must be 256 bytes or less",
530                    ));
531                }
532
533                zcash_deserialize_string_external_count(byte_count, &mut reader)?
534            },
535            start_height: block::Height(reader.read_u32::<LittleEndian>()?),
536            relay: match reader.read_u8() {
537                Ok(val @ 0..=1) => val == 1,
538                Ok(_) => return Err(Error::Parse("non-bool value supplied in relay field")),
539                Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => true,
540                Err(err) => Err(err)?,
541            },
542        }
543        .into())
544    }
545
546    fn read_verack<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
547        Ok(Message::Verack)
548    }
549
550    fn read_ping<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
551        Ok(Message::Ping(Nonce(reader.read_u64::<LittleEndian>()?)))
552    }
553
554    fn read_pong<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
555        Ok(Message::Pong(Nonce(reader.read_u64::<LittleEndian>()?)))
556    }
557
558    fn read_reject<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
559        Ok(Message::Reject {
560            message: {
561                let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
562                let byte_count: usize = byte_count.into();
563
564                // # Security
565                //
566                // Limit log size on disk, Zebra might print large reject messages to disk.
567                if byte_count > MAX_REJECT_MESSAGE_LENGTH {
568                    return Err(Error::Parse(
569                        "reject message too long: must be 12 bytes or less",
570                    ));
571                }
572
573                zcash_deserialize_string_external_count(byte_count, &mut reader)?
574            },
575            ccode: match reader.read_u8()? {
576                0x01 => RejectReason::Malformed,
577                0x10 => RejectReason::Invalid,
578                0x11 => RejectReason::Obsolete,
579                0x12 => RejectReason::Duplicate,
580                0x40 => RejectReason::Nonstandard,
581                0x41 => RejectReason::Dust,
582                0x42 => RejectReason::InsufficientFee,
583                0x43 => RejectReason::Checkpoint,
584                0x50 => RejectReason::Other,
585                _ => return Err(Error::Parse("invalid RejectReason value in ccode field")),
586            },
587            reason: {
588                let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
589                let byte_count: usize = byte_count.into();
590
591                // # Security
592                //
593                // Limit log size on disk, Zebra might print large reject messages to disk.
594                if byte_count > MAX_REJECT_REASON_LENGTH {
595                    return Err(Error::Parse(
596                        "reject reason too long: must be 111 bytes or less",
597                    ));
598                }
599
600                zcash_deserialize_string_external_count(byte_count, &mut reader)?
601            },
602            // Sometimes there's data, sometimes there isn't. There's no length
603            // field, this is just implicitly encoded by the body_len.
604            // Apparently all existing implementations only supply 32 bytes of
605            // data (hash identifying the rejected object) or none (and we model
606            // the Reject message that way), so instead of passing in the
607            // body_len separately and calculating remaining bytes, just try to
608            // read 32 bytes and ignore any failures. (The caller will log and
609            // ignore any trailing bytes.)
610            data: reader.read_32_bytes().ok(),
611        })
612    }
613
614    /// Deserialize an `addr` (v1) message into a list of `MetaAddr`s.
615    pub(super) fn read_addr<R: Read>(&self, reader: R) -> Result<Message, Error> {
616        let addrs: Vec<AddrV1> = reader.zcash_deserialize_into()?;
617
618        if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
619            return Err(Error::Parse(
620                "more than MAX_ADDRS_IN_MESSAGE in addr message",
621            ));
622        }
623
624        // Convert the received address format to Zebra's internal `MetaAddr`.
625        let addrs = addrs.into_iter().map(Into::into).collect();
626        Ok(Message::Addr(addrs))
627    }
628
629    /// Deserialize an `addrv2` message into a list of `MetaAddr`s.
630    ///
631    /// Currently, Zebra parses received `addrv2`s, ignoring some address types.
632    /// Zebra never sends `addrv2` messages.
633    pub(super) fn read_addrv2<R: Read>(&self, reader: R) -> Result<Message, Error> {
634        let addrs: Vec<AddrV2> = reader.zcash_deserialize_into()?;
635
636        if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
637            return Err(Error::Parse(
638                "more than MAX_ADDRS_IN_MESSAGE in addrv2 message",
639            ));
640        }
641
642        // Convert the received address format to Zebra's internal `MetaAddr`,
643        // ignoring unsupported network IDs.
644        let addrs = addrs
645            .into_iter()
646            .filter_map(|addr| addr.try_into().ok())
647            .collect();
648        Ok(Message::Addr(addrs))
649    }
650
651    fn read_getaddr<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
652        Ok(Message::GetAddr)
653    }
654
655    fn read_block<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
656        let result = Self::deserialize_block_spawning(reader);
657        Ok(Message::Block(result?.into()))
658    }
659
660    fn read_getblocks<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
661        if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
662            let known_blocks = Vec::zcash_deserialize(&mut reader)?;
663            let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
664            let stop = if stop_hash != block::Hash([0; 32]) {
665                Some(stop_hash)
666            } else {
667                None
668            };
669            Ok(Message::GetBlocks { known_blocks, stop })
670        } else {
671            Err(Error::Parse("getblocks version did not match negotiation"))
672        }
673    }
674
675    /// Deserialize a `headers` message.
676    ///
677    /// See [Zcash block header] for the enumeration of these fields.
678    ///
679    /// [Zcash block header](https://zips.z.cash/protocol/protocol.pdf#page=84)
680    fn read_headers<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
681        Ok(Message::Headers(Vec::zcash_deserialize(&mut reader)?))
682    }
683
684    fn read_getheaders<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
685        if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
686            let known_blocks = Vec::zcash_deserialize(&mut reader)?;
687            let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
688            let stop = if stop_hash != block::Hash([0; 32]) {
689                Some(stop_hash)
690            } else {
691                None
692            };
693            Ok(Message::GetHeaders { known_blocks, stop })
694        } else {
695            Err(Error::Parse("getblocks version did not match negotiation"))
696        }
697    }
698
699    fn read_inv<R: Read>(&self, reader: R) -> Result<Message, Error> {
700        Ok(Message::Inv(Vec::zcash_deserialize(reader)?))
701    }
702
703    fn read_getdata<R: Read>(&self, reader: R) -> Result<Message, Error> {
704        Ok(Message::GetData(Vec::zcash_deserialize(reader)?))
705    }
706
707    fn read_notfound<R: Read>(&self, reader: R) -> Result<Message, Error> {
708        Ok(Message::NotFound(Vec::zcash_deserialize(reader)?))
709    }
710
711    fn read_tx<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
712        let result = Self::deserialize_transaction_spawning(reader);
713        Ok(Message::Tx(result?.into()))
714    }
715
716    fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
717        Ok(Message::Mempool)
718    }
719
720    fn read_filterload<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
721        // The maximum length of a filter.
722        const MAX_FILTERLOAD_FILTER_LENGTH: usize = 36000;
723
724        // The data length of the fields:
725        // hash_functions_count + tweak + flags.
726        const FILTERLOAD_FIELDS_LENGTH: usize = 4 + 4 + 1;
727
728        // The maximum length of a filter message's data.
729        const MAX_FILTERLOAD_MESSAGE_LENGTH: usize =
730            MAX_FILTERLOAD_FILTER_LENGTH + FILTERLOAD_FIELDS_LENGTH;
731
732        if !(FILTERLOAD_FIELDS_LENGTH..=MAX_FILTERLOAD_MESSAGE_LENGTH).contains(&body_len) {
733            return Err(Error::Parse("Invalid filterload message body length."));
734        }
735
736        // Memory Denial of Service: we just checked the untrusted parsed length
737        let filter_length: usize = body_len - FILTERLOAD_FIELDS_LENGTH;
738        let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
739
740        Ok(Message::FilterLoad {
741            filter: Filter(filter_bytes),
742            hash_functions_count: reader.read_u32::<LittleEndian>()?,
743            tweak: Tweak(reader.read_u32::<LittleEndian>()?),
744            flags: reader.read_u8()?,
745        })
746    }
747
748    fn read_filteradd<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
749        const MAX_FILTERADD_LENGTH: usize = 520;
750
751        // Memory Denial of Service: limit the untrusted parsed length
752        let filter_length: usize = min(body_len, MAX_FILTERADD_LENGTH);
753        let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
754
755        Ok(Message::FilterAdd { data: filter_bytes })
756    }
757
758    fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
759        Ok(Message::FilterClear)
760    }
761
762    /// Given the reader, deserialize the transaction in the rayon thread pool.
763    #[allow(clippy::unwrap_in_result)]
764    fn deserialize_transaction_spawning<R: Read + std::marker::Send>(
765        reader: R,
766    ) -> Result<Transaction, Error> {
767        let mut result = None;
768
769        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
770        //
771        // Since we use `block_in_place()`, other futures running on the connection task will be blocked:
772        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
773        //
774        // We can't use `spawn_blocking()` because:
775        // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
776        // - There is no way to check the blocking task's future for panics
777        tokio::task::block_in_place(|| {
778            rayon::in_place_scope_fifo(|s| {
779                s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader)))
780            })
781        });
782
783        result.expect("scope has already finished")
784    }
785
786    /// Given the reader, deserialize the block in the rayon thread pool.
787    #[allow(clippy::unwrap_in_result)]
788    fn deserialize_block_spawning<R: Read + std::marker::Send>(reader: R) -> Result<Block, Error> {
789        let mut result = None;
790
791        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
792        //
793        // Since we use `block_in_place()`, other futures running on the connection task will be blocked:
794        // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
795        //
796        // We can't use `spawn_blocking()` because:
797        // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
798        // - There is no way to check the blocking task's future for panics
799        tokio::task::block_in_place(|| {
800            rayon::in_place_scope_fifo(|s| {
801                s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader)))
802            })
803        });
804
805        result.expect("scope has already finished")
806    }
807}