1use std::{
4 cmp::min,
5 fmt,
6 io::{Cursor, Read, Write},
7};
8
9use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
10use bytes::{BufMut, BytesMut};
11use chrono::{TimeZone, Utc};
12use tokio_util::codec::{Decoder, Encoder};
13
14use zebra_chain::{
15 block::{self, Block},
16 parameters::{Magic, Network},
17 serialization::{
18 sha256d, zcash_deserialize_bytes_external_count, zcash_deserialize_string_external_count,
19 CompactSizeMessage, FakeWriter, ReadZcashExt, SerializationError as Error,
20 ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize, MAX_PROTOCOL_MESSAGE_LEN,
21 },
22 transaction::Transaction,
23};
24
25use crate::constants;
26
27use super::{
28 addr::{AddrInVersion, AddrV1, AddrV2},
29 message::{
30 Message, RejectReason, VersionMessage, MAX_REJECT_MESSAGE_LENGTH, MAX_REJECT_REASON_LENGTH,
31 MAX_USER_AGENT_LENGTH,
32 },
33 types::*,
34};
35
36#[cfg(test)]
37mod tests;
38
39const HEADER_LEN: usize = 24usize;
41
42pub struct Codec {
44 builder: Builder,
45 state: DecodeState,
46}
47
48pub struct Builder {
50 network: Network,
52 version: Version,
54 max_len: usize,
56 metrics_addr_label: Option<String>,
58}
59
60impl Codec {
61 pub fn builder() -> Builder {
63 Builder {
64 network: Network::Mainnet,
65 version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
66 max_len: MAX_PROTOCOL_MESSAGE_LEN,
67 metrics_addr_label: None,
68 }
69 }
70
71 pub fn reconfigure_version(&mut self, version: Version) {
73 self.builder.version = version;
74 }
75}
76
77impl Builder {
78 pub fn finish(self) -> Codec {
80 Codec {
81 builder: self,
82 state: DecodeState::Head,
83 }
84 }
85
86 pub fn for_network(mut self, network: &Network) -> Self {
88 self.network = network.clone();
89 self
90 }
91
92 #[allow(dead_code)]
94 pub fn for_version(mut self, version: Version) -> Self {
95 self.version = version;
96 self
97 }
98
99 #[allow(dead_code)]
101 pub fn with_max_body_len(mut self, len: usize) -> Self {
102 self.max_len = len;
103 self
104 }
105
106 pub fn with_metrics_addr_label(mut self, metrics_addr_label: String) -> Self {
108 self.metrics_addr_label = Some(metrics_addr_label);
109 self
110 }
111}
112
113impl Encoder<Message> for Codec {
116 type Error = Error;
117
118 fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
119 use Error::Parse;
120
121 let body_length = self.body_length(&item);
122
123 if body_length > self.builder.max_len {
124 return Err(Parse("body length exceeded maximum size"));
125 }
126
127 if let Some(addr_label) = self.builder.metrics_addr_label.clone() {
128 metrics::counter!("zcash.net.out.bytes.total",
129 "addr" => addr_label)
130 .increment((body_length + HEADER_LEN) as u64);
131 }
132
133 use Message::*;
134 let command = match item {
140 Version { .. } => b"version\0\0\0\0\0",
141 Verack => b"verack\0\0\0\0\0\0",
142 Ping { .. } => b"ping\0\0\0\0\0\0\0\0",
143 Pong { .. } => b"pong\0\0\0\0\0\0\0\0",
144 Reject { .. } => b"reject\0\0\0\0\0\0",
145 Addr { .. } => b"addr\0\0\0\0\0\0\0\0",
146 GetAddr => b"getaddr\0\0\0\0\0",
147 Block { .. } => b"block\0\0\0\0\0\0\0",
148 GetBlocks { .. } => b"getblocks\0\0\0",
149 Headers { .. } => b"headers\0\0\0\0\0",
150 GetHeaders { .. } => b"getheaders\0\0",
151 Inv { .. } => b"inv\0\0\0\0\0\0\0\0\0",
152 GetData { .. } => b"getdata\0\0\0\0\0",
153 NotFound { .. } => b"notfound\0\0\0\0",
154 Tx { .. } => b"tx\0\0\0\0\0\0\0\0\0\0",
155 Mempool => b"mempool\0\0\0\0\0",
156 FilterLoad { .. } => b"filterload\0\0",
157 FilterAdd { .. } => b"filteradd\0\0\0",
158 FilterClear => b"filterclear\0",
159 };
160 trace!(?item, len = body_length);
161
162 dst.reserve(HEADER_LEN + body_length);
163 let start_len = dst.len();
164 {
165 let dst = &mut dst.writer();
166 dst.write_all(&self.builder.network.magic().0[..])?;
167 dst.write_all(command)?;
168 dst.write_u32::<LittleEndian>(body_length as u32)?;
169
170 dst.write_u32::<LittleEndian>(0)?;
173
174 self.write_body(&item, dst)?;
175 }
176 let checksum = sha256d::Checksum::from(&dst[start_len + HEADER_LEN..]);
177 dst[start_len + 20..][..4].copy_from_slice(&checksum.0);
178
179 Ok(())
180 }
181}
182
183impl Codec {
184 fn body_length(&self, msg: &Message) -> usize {
192 let mut writer = FakeWriter(0);
193
194 self.write_body(msg, &mut writer)
195 .expect("writer should never fail");
196 writer.0
197 }
198
199 fn write_body<W: Write>(&self, msg: &Message, mut writer: W) -> Result<(), Error> {
203 match msg {
204 Message::Version(VersionMessage {
205 version,
206 services,
207 timestamp,
208 address_recv,
209 address_from,
210 nonce,
211 user_agent,
212 start_height,
213 relay,
214 }) => {
215 writer.write_u32::<LittleEndian>(version.0)?;
216 writer.write_u64::<LittleEndian>(services.bits())?;
217 writer.write_i64::<LittleEndian>(timestamp.timestamp())?;
221
222 address_recv.zcash_serialize(&mut writer)?;
223 address_from.zcash_serialize(&mut writer)?;
224
225 writer.write_u64::<LittleEndian>(nonce.0)?;
226
227 if user_agent.len() > MAX_USER_AGENT_LENGTH {
228 return Err(Error::Parse(
230 "user agent too long: must be 256 bytes or less",
231 ));
232 }
233
234 user_agent.zcash_serialize(&mut writer)?;
235 writer.write_u32::<LittleEndian>(start_height.0)?;
236 writer.write_u8(*relay as u8)?;
237 }
238 Message::Verack => { }
239 Message::Ping(nonce) => {
240 writer.write_u64::<LittleEndian>(nonce.0)?;
241 }
242 Message::Pong(nonce) => {
243 writer.write_u64::<LittleEndian>(nonce.0)?;
244 }
245 Message::Reject {
246 message,
247 ccode,
248 reason,
249 data,
250 } => {
251 if message.len() > MAX_REJECT_MESSAGE_LENGTH {
252 return Err(Error::Parse(
254 "reject message too long: must be 12 bytes or less",
255 ));
256 }
257
258 message.zcash_serialize(&mut writer)?;
259
260 writer.write_u8(*ccode as u8)?;
261
262 if reason.len() > MAX_REJECT_REASON_LENGTH {
263 return Err(Error::Parse(
264 "reject reason too long: must be 111 bytes or less",
265 ));
266 }
267
268 reason.zcash_serialize(&mut writer)?;
269 if let Some(data) = data {
270 writer.write_all(data)?;
271 }
272 }
273 Message::Addr(addrs) => {
274 assert!(
275 addrs.len() <= constants::MAX_ADDRS_IN_MESSAGE,
276 "unexpectedly large Addr message: greater than MAX_ADDRS_IN_MESSAGE addresses"
277 );
278
279 let v1_addrs: Vec<AddrV1> = addrs.iter().map(|addr| AddrV1::from(*addr)).collect();
282 v1_addrs.zcash_serialize(&mut writer)?
283 }
284 Message::GetAddr => { }
285 Message::Block(block) => block.zcash_serialize(&mut writer)?,
286 Message::GetBlocks { known_blocks, stop } => {
287 writer.write_u32::<LittleEndian>(self.builder.version.0)?;
288 known_blocks.zcash_serialize(&mut writer)?;
289 stop.unwrap_or(block::Hash([0; 32]))
290 .zcash_serialize(&mut writer)?;
291 }
292 Message::GetHeaders { known_blocks, stop } => {
293 writer.write_u32::<LittleEndian>(self.builder.version.0)?;
294 known_blocks.zcash_serialize(&mut writer)?;
295 stop.unwrap_or(block::Hash([0; 32]))
296 .zcash_serialize(&mut writer)?;
297 }
298 Message::Headers(headers) => headers.zcash_serialize(&mut writer)?,
299 Message::Inv(hashes) => hashes.zcash_serialize(&mut writer)?,
300 Message::GetData(hashes) => hashes.zcash_serialize(&mut writer)?,
301 Message::NotFound(hashes) => hashes.zcash_serialize(&mut writer)?,
302 Message::Tx(transaction) => transaction.transaction.zcash_serialize(&mut writer)?,
303 Message::Mempool => { }
304 Message::FilterLoad {
305 filter,
306 hash_functions_count,
307 tweak,
308 flags,
309 } => {
310 writer.write_all(&filter.0)?;
311 writer.write_u32::<LittleEndian>(*hash_functions_count)?;
312 writer.write_u32::<LittleEndian>(tweak.0)?;
313 writer.write_u8(*flags)?;
314 }
315 Message::FilterAdd { data } => {
316 writer.write_all(data)?;
317 }
318 Message::FilterClear => { }
319 }
320 Ok(())
321 }
322}
323
324enum DecodeState {
327 Head,
328 Body {
329 body_len: usize,
330 command: [u8; 12],
331 checksum: sha256d::Checksum,
332 },
333}
334
335impl fmt::Debug for DecodeState {
336 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
337 match self {
338 DecodeState::Head => write!(f, "DecodeState::Head"),
339 DecodeState::Body {
340 body_len,
341 command,
342 checksum,
343 } => f
344 .debug_struct("DecodeState::Body")
345 .field("body_len", &body_len)
346 .field("command", &String::from_utf8_lossy(command))
347 .field("checksum", &checksum)
348 .finish(),
349 }
350 }
351}
352
353impl Decoder for Codec {
354 type Item = Message;
355 type Error = Error;
356
357 #[allow(clippy::unwrap_in_result)]
358 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
359 use Error::Parse;
360 match self.state {
361 DecodeState::Head => {
362 if src.len() < HEADER_LEN {
364 trace!(?self.state, "src buffer does not have an entire header, waiting");
365 return Ok(None);
367 }
368
369 let header = src.split_to(HEADER_LEN);
371
372 let mut header_reader = Cursor::new(&header);
374 let magic = Magic(header_reader.read_4_bytes()?);
375 let command = header_reader.read_12_bytes()?;
376 let body_len = header_reader.read_u32::<LittleEndian>()? as usize;
377 let checksum = sha256d::Checksum(header_reader.read_4_bytes()?);
378 trace!(
379 ?self.state,
380 ?magic,
381 command = %String::from_utf8(
382 command.iter()
383 .cloned()
384 .flat_map(std::ascii::escape_default)
385 .collect()
386 ).unwrap(),
387 body_len,
388 ?checksum,
389 "read header from src buffer"
390 );
391
392 if magic != self.builder.network.magic() {
393 return Err(Parse("supplied magic did not meet expectations"));
394 }
395 if body_len > self.builder.max_len {
396 return Err(Parse("body length exceeded maximum size"));
397 }
398
399 if let Some(label) = self.builder.metrics_addr_label.clone() {
400 metrics::counter!("zcash.net.in.bytes.total", "addr" => label)
401 .increment((body_len + HEADER_LEN) as u64);
402 }
403
404 src.reserve(body_len + HEADER_LEN);
406
407 self.state = DecodeState::Body {
408 body_len,
409 command,
410 checksum,
411 };
412
413 self.decode(src)
415 }
416 DecodeState::Body {
417 body_len,
418 command,
419 checksum,
420 } => {
421 if src.len() < body_len {
422 trace!(?self.state, len = src.len(), "src buffer does not have an entire body, waiting");
424 return Ok(None);
425 }
426
427 let body = src.split_to(body_len);
431 self.state = DecodeState::Head;
432
433 if checksum != sha256d::Checksum::from(&body[..]) {
434 return Err(Parse(
435 "supplied message checksum does not match computed checksum",
436 ));
437 }
438
439 let mut body_reader = Cursor::new(&body);
440 match &command {
441 b"version\0\0\0\0\0" => self.read_version(&mut body_reader),
442 b"verack\0\0\0\0\0\0" => self.read_verack(&mut body_reader),
443 b"ping\0\0\0\0\0\0\0\0" => self.read_ping(&mut body_reader),
444 b"pong\0\0\0\0\0\0\0\0" => self.read_pong(&mut body_reader),
445 b"reject\0\0\0\0\0\0" => self.read_reject(&mut body_reader),
446 b"addr\0\0\0\0\0\0\0\0" => self.read_addr(&mut body_reader),
447 b"addrv2\0\0\0\0\0\0" => self.read_addrv2(&mut body_reader),
448 b"getaddr\0\0\0\0\0" => self.read_getaddr(&mut body_reader),
449 b"block\0\0\0\0\0\0\0" => self.read_block(&mut body_reader),
450 b"getblocks\0\0\0" => self.read_getblocks(&mut body_reader),
451 b"headers\0\0\0\0\0" => self.read_headers(&mut body_reader),
452 b"getheaders\0\0" => self.read_getheaders(&mut body_reader),
453 b"inv\0\0\0\0\0\0\0\0\0" => self.read_inv(&mut body_reader),
454 b"getdata\0\0\0\0\0" => self.read_getdata(&mut body_reader),
455 b"notfound\0\0\0\0" => self.read_notfound(&mut body_reader),
456 b"tx\0\0\0\0\0\0\0\0\0\0" => self.read_tx(&mut body_reader),
457 b"mempool\0\0\0\0\0" => self.read_mempool(&mut body_reader),
458 b"filterload\0\0" => self.read_filterload(&mut body_reader, body_len),
459 b"filteradd\0\0\0" => self.read_filteradd(&mut body_reader, body_len),
460 b"filterclear\0" => self.read_filterclear(&mut body_reader),
461 _ => {
462 let command_string = String::from_utf8_lossy(&command);
463
464 debug!(?command, %command_string, "unknown message command from peer");
472 return Ok(None);
473 }
474 }
475 .map(|msg| {
478 let extra_bytes = body.len() as u64 - body_reader.position();
482 if extra_bytes == 0 {
483 trace!(?extra_bytes, %msg, "finished message decoding");
484 } else {
485 debug!(?extra_bytes, %msg, "extra data after decoding message");
488 }
489 Some(msg)
490 })
491 }
492 }
493 }
494}
495
496impl Codec {
497 fn read_version<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
504 Ok(VersionMessage {
505 version: Version(reader.read_u32::<LittleEndian>()?),
506 services: PeerServices::from_bits_truncate(reader.read_u64::<LittleEndian>()?),
508 timestamp: Utc
509 .timestamp_opt(reader.read_i64::<LittleEndian>()?, 0)
510 .single()
511 .ok_or(Error::Parse(
512 "version timestamp is out of range for DateTime",
513 ))?,
514 address_recv: AddrInVersion::zcash_deserialize(&mut reader)?,
515 address_from: AddrInVersion::zcash_deserialize(&mut reader)?,
516 nonce: Nonce(reader.read_u64::<LittleEndian>()?),
517 user_agent: {
518 let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
519 let byte_count: usize = byte_count.into();
520
521 if byte_count > MAX_USER_AGENT_LENGTH {
528 return Err(Error::Parse(
529 "user agent too long: must be 256 bytes or less",
530 ));
531 }
532
533 zcash_deserialize_string_external_count(byte_count, &mut reader)?
534 },
535 start_height: block::Height(reader.read_u32::<LittleEndian>()?),
536 relay: match reader.read_u8() {
537 Ok(val @ 0..=1) => val == 1,
538 Ok(_) => return Err(Error::Parse("non-bool value supplied in relay field")),
539 Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => true,
540 Err(err) => Err(err)?,
541 },
542 }
543 .into())
544 }
545
546 fn read_verack<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
547 Ok(Message::Verack)
548 }
549
550 fn read_ping<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
551 Ok(Message::Ping(Nonce(reader.read_u64::<LittleEndian>()?)))
552 }
553
554 fn read_pong<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
555 Ok(Message::Pong(Nonce(reader.read_u64::<LittleEndian>()?)))
556 }
557
558 fn read_reject<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
559 Ok(Message::Reject {
560 message: {
561 let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
562 let byte_count: usize = byte_count.into();
563
564 if byte_count > MAX_REJECT_MESSAGE_LENGTH {
568 return Err(Error::Parse(
569 "reject message too long: must be 12 bytes or less",
570 ));
571 }
572
573 zcash_deserialize_string_external_count(byte_count, &mut reader)?
574 },
575 ccode: match reader.read_u8()? {
576 0x01 => RejectReason::Malformed,
577 0x10 => RejectReason::Invalid,
578 0x11 => RejectReason::Obsolete,
579 0x12 => RejectReason::Duplicate,
580 0x40 => RejectReason::Nonstandard,
581 0x41 => RejectReason::Dust,
582 0x42 => RejectReason::InsufficientFee,
583 0x43 => RejectReason::Checkpoint,
584 0x50 => RejectReason::Other,
585 _ => return Err(Error::Parse("invalid RejectReason value in ccode field")),
586 },
587 reason: {
588 let byte_count: CompactSizeMessage = (&mut reader).zcash_deserialize_into()?;
589 let byte_count: usize = byte_count.into();
590
591 if byte_count > MAX_REJECT_REASON_LENGTH {
595 return Err(Error::Parse(
596 "reject reason too long: must be 111 bytes or less",
597 ));
598 }
599
600 zcash_deserialize_string_external_count(byte_count, &mut reader)?
601 },
602 data: reader.read_32_bytes().ok(),
611 })
612 }
613
614 pub(super) fn read_addr<R: Read>(&self, reader: R) -> Result<Message, Error> {
616 let addrs: Vec<AddrV1> = reader.zcash_deserialize_into()?;
617
618 if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
619 return Err(Error::Parse(
620 "more than MAX_ADDRS_IN_MESSAGE in addr message",
621 ));
622 }
623
624 let addrs = addrs.into_iter().map(Into::into).collect();
626 Ok(Message::Addr(addrs))
627 }
628
629 pub(super) fn read_addrv2<R: Read>(&self, reader: R) -> Result<Message, Error> {
634 let addrs: Vec<AddrV2> = reader.zcash_deserialize_into()?;
635
636 if addrs.len() > constants::MAX_ADDRS_IN_MESSAGE {
637 return Err(Error::Parse(
638 "more than MAX_ADDRS_IN_MESSAGE in addrv2 message",
639 ));
640 }
641
642 let addrs = addrs
645 .into_iter()
646 .filter_map(|addr| addr.try_into().ok())
647 .collect();
648 Ok(Message::Addr(addrs))
649 }
650
651 fn read_getaddr<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
652 Ok(Message::GetAddr)
653 }
654
655 fn read_block<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
656 let result = Self::deserialize_block_spawning(reader);
657 Ok(Message::Block(result?.into()))
658 }
659
660 fn read_getblocks<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
661 if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
662 let known_blocks = Vec::zcash_deserialize(&mut reader)?;
663 let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
664 let stop = if stop_hash != block::Hash([0; 32]) {
665 Some(stop_hash)
666 } else {
667 None
668 };
669 Ok(Message::GetBlocks { known_blocks, stop })
670 } else {
671 Err(Error::Parse("getblocks version did not match negotiation"))
672 }
673 }
674
675 fn read_headers<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
681 Ok(Message::Headers(Vec::zcash_deserialize(&mut reader)?))
682 }
683
684 fn read_getheaders<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
685 if self.builder.version == Version(reader.read_u32::<LittleEndian>()?) {
686 let known_blocks = Vec::zcash_deserialize(&mut reader)?;
687 let stop_hash = block::Hash::zcash_deserialize(&mut reader)?;
688 let stop = if stop_hash != block::Hash([0; 32]) {
689 Some(stop_hash)
690 } else {
691 None
692 };
693 Ok(Message::GetHeaders { known_blocks, stop })
694 } else {
695 Err(Error::Parse("getblocks version did not match negotiation"))
696 }
697 }
698
699 fn read_inv<R: Read>(&self, reader: R) -> Result<Message, Error> {
700 Ok(Message::Inv(Vec::zcash_deserialize(reader)?))
701 }
702
703 fn read_getdata<R: Read>(&self, reader: R) -> Result<Message, Error> {
704 Ok(Message::GetData(Vec::zcash_deserialize(reader)?))
705 }
706
707 fn read_notfound<R: Read>(&self, reader: R) -> Result<Message, Error> {
708 Ok(Message::NotFound(Vec::zcash_deserialize(reader)?))
709 }
710
711 fn read_tx<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
712 let result = Self::deserialize_transaction_spawning(reader);
713 Ok(Message::Tx(result?.into()))
714 }
715
716 fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
717 Ok(Message::Mempool)
718 }
719
720 fn read_filterload<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
721 const MAX_FILTERLOAD_FILTER_LENGTH: usize = 36000;
723
724 const FILTERLOAD_FIELDS_LENGTH: usize = 4 + 4 + 1;
727
728 const MAX_FILTERLOAD_MESSAGE_LENGTH: usize =
730 MAX_FILTERLOAD_FILTER_LENGTH + FILTERLOAD_FIELDS_LENGTH;
731
732 if !(FILTERLOAD_FIELDS_LENGTH..=MAX_FILTERLOAD_MESSAGE_LENGTH).contains(&body_len) {
733 return Err(Error::Parse("Invalid filterload message body length."));
734 }
735
736 let filter_length: usize = body_len - FILTERLOAD_FIELDS_LENGTH;
738 let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
739
740 Ok(Message::FilterLoad {
741 filter: Filter(filter_bytes),
742 hash_functions_count: reader.read_u32::<LittleEndian>()?,
743 tweak: Tweak(reader.read_u32::<LittleEndian>()?),
744 flags: reader.read_u8()?,
745 })
746 }
747
748 fn read_filteradd<R: Read>(&self, mut reader: R, body_len: usize) -> Result<Message, Error> {
749 const MAX_FILTERADD_LENGTH: usize = 520;
750
751 let filter_length: usize = min(body_len, MAX_FILTERADD_LENGTH);
753 let filter_bytes = zcash_deserialize_bytes_external_count(filter_length, &mut reader)?;
754
755 Ok(Message::FilterAdd { data: filter_bytes })
756 }
757
758 fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
759 Ok(Message::FilterClear)
760 }
761
762 #[allow(clippy::unwrap_in_result)]
764 fn deserialize_transaction_spawning<R: Read + std::marker::Send>(
765 reader: R,
766 ) -> Result<Transaction, Error> {
767 let mut result = None;
768
769 tokio::task::block_in_place(|| {
778 rayon::in_place_scope_fifo(|s| {
779 s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader)))
780 })
781 });
782
783 result.expect("scope has already finished")
784 }
785
786 #[allow(clippy::unwrap_in_result)]
788 fn deserialize_block_spawning<R: Read + std::marker::Send>(reader: R) -> Result<Block, Error> {
789 let mut result = None;
790
791 tokio::task::block_in_place(|| {
800 rayon::in_place_scope_fifo(|s| {
801 s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader)))
802 })
803 });
804
805 result.expect("scope has already finished")
806 }
807}