1use std::{
4 collections::HashMap,
5 convert,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use futures::{
12 future::{FutureExt, TryFutureExt},
13 ready,
14 stream::{FuturesUnordered, Stream},
15};
16use pin_project::pin_project;
17use thiserror::Error;
18use tokio::{
19 sync::{oneshot, watch},
20 task::JoinHandle,
21 time::timeout,
22};
23use tower::{hedge, Service, ServiceExt};
24use tracing_futures::Instrument;
25
26use zebra_chain::{
27 block::{self, Height, HeightDiff},
28 chain_tip::ChainTip,
29};
30use zebra_network::{self as zn, PeerSocketAddr};
31use zebra_state as zs;
32
33use crate::components::sync::{
34 FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT,
35};
36
37type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
56
57pub const VERIFICATION_PIPELINE_DROP_LIMIT: HeightDiff = 50_000;
60
61#[derive(Copy, Clone, Debug)]
62pub(super) struct AlwaysHedge;
63
64impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
65 fn can_retry(&self, _req: &Request) -> bool {
66 true
67 }
68 fn clone_request(&self, req: &Request) -> Option<Request> {
69 Some(req.clone())
70 }
71}
72
73#[derive(Error, Debug)]
75#[allow(dead_code)]
76pub enum BlockDownloadVerifyError {
77 #[error("permanent readiness error from the network service: {error:?}")]
78 NetworkServiceError {
79 #[source]
80 error: BoxError,
81 },
82
83 #[error("permanent readiness error from the verifier service: {error:?}")]
84 VerifierServiceError {
85 #[source]
86 error: BoxError,
87 },
88
89 #[error("duplicate block hash queued for download: {hash:?}")]
90 DuplicateBlockQueuedForDownload { hash: block::Hash },
91
92 #[error("error downloading block: {error:?} {hash:?}")]
93 DownloadFailed {
94 #[source]
95 error: BoxError,
96 hash: block::Hash,
97 },
98
99 #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
108 AboveLookaheadHeightLimit {
109 height: block::Height,
110 hash: block::Hash,
111 },
112
113 #[error("downloaded block was too far behind the chain tip: {height:?} {hash:?}")]
114 BehindTipHeightLimit {
115 height: block::Height,
116 hash: block::Hash,
117 },
118
119 #[error("downloaded block had an invalid height: {hash:?}")]
120 InvalidHeight { hash: block::Hash },
121
122 #[error("block failed consensus validation: {error:?} {height:?} {hash:?}")]
123 Invalid {
124 #[source]
125 error: zebra_consensus::router::RouterError,
126 height: block::Height,
127 hash: block::Hash,
128 advertiser_addr: Option<PeerSocketAddr>,
129 },
130
131 #[error("block validation request failed: {error:?} {height:?} {hash:?}")]
132 ValidationRequestError {
133 #[source]
134 error: BoxError,
135 height: block::Height,
136 hash: block::Hash,
137 },
138
139 #[error("block download & verification was cancelled during download: {hash:?}")]
140 CancelledDuringDownload { hash: block::Hash },
141
142 #[error(
143 "block download & verification was cancelled while waiting for the verifier service: \
144 to become ready: {height:?} {hash:?}"
145 )]
146 CancelledAwaitingVerifierReadiness {
147 height: block::Height,
148 hash: block::Hash,
149 },
150
151 #[error(
152 "block download & verification was cancelled during verification: {height:?} {hash:?}"
153 )]
154 CancelledDuringVerification {
155 height: block::Height,
156 hash: block::Hash,
157 },
158
159 #[error(
160 "timeout during service readiness, download, verification, or internal downloader operation"
161 )]
162 Timeout,
163}
164
165impl From<tokio::time::error::Elapsed> for BlockDownloadVerifyError {
166 fn from(_value: tokio::time::error::Elapsed) -> Self {
167 BlockDownloadVerifyError::Timeout
168 }
169}
170
171#[pin_project]
173#[derive(Debug)]
174pub struct Downloads<ZN, ZV, ZSTip>
175where
176 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
177 ZN::Future: Send,
178 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
179 + Send
180 + Sync
181 + Clone
182 + 'static,
183 ZV::Future: Send,
184 ZSTip: ChainTip + Clone + Send + 'static,
185{
186 network: ZN,
191
192 verifier: ZV,
194
195 latest_chain_tip: ZSTip,
197
198 lookahead_limit: usize,
202
203 max_checkpoint_height: Height,
205
206 past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
211
212 past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
214
215 #[pin]
219 pending: FuturesUnordered<
220 JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
221 >,
222
223 cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
226}
227
228impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
229where
230 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
231 ZN::Future: Send,
232 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
233 + Send
234 + Sync
235 + Clone
236 + 'static,
237 ZV::Future: Send,
238 ZSTip: ChainTip + Clone + Send + 'static,
239{
240 type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
241
242 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
243 let this = self.project();
244 if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
254 match join_result.expect("block download and verify tasks must not panic") {
255 Ok((height, hash)) => {
256 this.cancel_handles.remove(&hash);
257
258 Poll::Ready(Some(Ok((height, hash))))
259 }
260 Err((e, hash)) => {
261 this.cancel_handles.remove(&hash);
262 Poll::Ready(Some(Err(e)))
263 }
264 }
265 } else {
266 Poll::Ready(None)
267 }
268 }
269
270 fn size_hint(&self) -> (usize, Option<usize>) {
271 self.pending.size_hint()
272 }
273}
274
275impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
276where
277 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
278 ZN::Future: Send,
279 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
280 + Send
281 + Sync
282 + Clone
283 + 'static,
284 ZV::Future: Send,
285 ZSTip: ChainTip + Clone + Send + 'static,
286{
287 pub fn new(
298 network: ZN,
299 verifier: ZV,
300 latest_chain_tip: ZSTip,
301 past_lookahead_limit_sender: watch::Sender<bool>,
302 lookahead_limit: usize,
303 max_checkpoint_height: Height,
304 ) -> Self {
305 let past_lookahead_limit_receiver =
306 zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
307
308 Self {
309 network,
310 verifier,
311 latest_chain_tip,
312 lookahead_limit,
313 max_checkpoint_height,
314 past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
315 past_lookahead_limit_sender,
316 )),
317 past_lookahead_limit_receiver,
318 pending: FuturesUnordered::new(),
319 cancel_handles: HashMap::new(),
320 }
321 }
322
323 #[instrument(level = "debug", skip(self), fields(%hash))]
329 pub async fn download_and_verify(
330 &mut self,
331 hash: block::Hash,
332 ) -> Result<(), BlockDownloadVerifyError> {
333 if self.cancel_handles.contains_key(&hash) {
334 metrics::counter!("sync.already.queued.dropped.block.hash.count").increment(1);
335 return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
336 }
337
338 let block_req = self
346 .network
347 .ready()
348 .await
349 .map_err(|error| BlockDownloadVerifyError::NetworkServiceError { error })?
350 .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
351
352 let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
354
355 let mut verifier = self.verifier.clone();
356 let latest_chain_tip = self.latest_chain_tip.clone();
357
358 let lookahead_limit = self.lookahead_limit;
359 let max_checkpoint_height = self.max_checkpoint_height;
360
361 let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
362 let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
363
364 let task = tokio::spawn(
365 async move {
366 let rsp = tokio::select! {
369 biased;
370 _ = &mut cancel_rx => {
371 trace!("task cancelled prior to download completion");
372 metrics::counter!("sync.cancelled.download.count").increment(1);
373 return Err(BlockDownloadVerifyError::CancelledDuringDownload { hash })
374 }
375 rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
376 };
377
378 let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
379 assert_eq!(
380 blocks.len(),
381 1,
382 "wrong number of blocks in response to a single hash"
383 );
384
385 blocks
386 .first()
387 .expect("just checked length")
388 .available()
389 .expect("unexpected missing block status: single block failures should be errors")
390 } else {
391 unreachable!("wrong response to block request");
392 };
393 metrics::counter!("sync.downloaded.block.count").increment(1);
394
395 let tip_height = latest_chain_tip.best_tip_height();
399
400 let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
401 let lookahead_pause = HeightDiff::try_from(
404 lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
405 )
406 .expect("fits in HeightDiff");
407
408
409 ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
410 (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
411 (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
412 } else {
413 let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
414 let genesis_lookahead =
415 u32::try_from(lookahead_limit - 1).expect("fits in u32");
416
417 (block::Height(genesis_drop),
418 block::Height(genesis_lookahead),
419 block::Height(genesis_lookahead/2))
420 };
421
422 let min_accepted_height = tip_height
432 .map(|tip_height| {
433 block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
434 })
435 .unwrap_or(block::Height(0));
436
437 let block_height = if let Some(block_height) = block.coinbase_height() {
438 block_height
439 } else {
440 debug!(
441 ?hash,
442 "synced block with no height: dropped downloaded block"
443 );
444 metrics::counter!("sync.no.height.dropped.block.count").increment(1);
445
446 return Err(BlockDownloadVerifyError::InvalidHeight { hash });
447 };
448
449 if block_height > lookahead_drop_height {
450 Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
451 } else if block_height > lookahead_pause_height {
452 if !past_lookahead_limit_receiver.cloned_watch_data() {
455 info!(
456 ?hash,
457 ?block_height,
458 ?tip_height,
459 ?lookahead_pause_height,
460 ?lookahead_reset_height,
461 lookahead_limit = ?lookahead_limit,
462 "synced block height too far ahead of the tip: \
463 waiting for downloaded blocks to commit to the state",
464 );
465
466 let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
471 } else {
472 debug!(
473 ?hash,
474 ?block_height,
475 ?tip_height,
476 ?lookahead_pause_height,
477 ?lookahead_reset_height,
478 lookahead_limit = ?lookahead_limit,
479 "synced block height too far ahead of the tip: \
480 waiting for downloaded blocks to commit to the state",
481 );
482 }
483
484 metrics::counter!("sync.max.height.limit.paused.count").increment(1);
485 } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
486 let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false);
491 metrics::counter!("sync.max.height.limit.reset.count").increment(1);
492
493 metrics::counter!("sync.max.height.limit.reset.attempt.count").increment(1);
494 }
495
496 if block_height < min_accepted_height {
497 debug!(
498 ?hash,
499 ?block_height,
500 ?tip_height,
501 ?min_accepted_height,
502 behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
503 "synced block height behind the finalized tip: dropped downloaded block"
504 );
505 metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
506
507 Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
508 }
509
510 let readiness = verifier.ready();
512 let verifier = tokio::select! {
514 biased;
515 _ = &mut cancel_rx => {
516 trace!("task cancelled waiting for verifier service readiness");
517 metrics::counter!("sync.cancelled.verify.ready.count").increment(1);
518 return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
519 }
520 verifier = readiness => verifier,
521 };
522
523 let mut rsp = verifier
525 .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
526 .call(zebra_consensus::Request::Commit(block)).boxed();
527
528 let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");
530 if block_height >= max_checkpoint_height && block_height <= short_timeout_max {
531 rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp)
532 .map_err(|timeout| format!("initial fully verified block timed out: retrying: {timeout:?}").into())
533 .map(|nested_result| nested_result.and_then(convert::identity)).boxed();
534 }
535
536 let verification = tokio::select! {
537 biased;
538 _ = &mut cancel_rx => {
539 trace!("task cancelled prior to verification");
540 metrics::counter!("sync.cancelled.verify.count").increment(1);
541 return Err(BlockDownloadVerifyError::CancelledDuringVerification { height: block_height, hash })
542 }
543 verification = rsp => verification,
544 };
545
546 if verification.is_ok() {
547 metrics::counter!("sync.verified.block.count").increment(1);
548 }
549
550 verification
551 .map(|hash| (block_height, hash))
552 .map_err(|err| {
553 match err.downcast::<zebra_consensus::router::RouterError>() {
554 Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
555 Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
556 }
557 })
558 }
559 .in_current_span()
560 .map_err(move |e| (e, hash)),
563 );
564
565 tokio::task::yield_now().await;
567
568 self.pending.push(task);
569 assert!(
570 self.cancel_handles.insert(hash, cancel_tx).is_none(),
571 "blocks are only queued once"
572 );
573
574 Ok(())
575 }
576
577 pub fn cancel_all(&mut self) {
579 let _ = std::mem::take(&mut self.pending);
581
582 for (_hash, cancel) in self.cancel_handles.drain() {
586 let _ = cancel.send(());
587 }
588
589 assert!(self.pending.is_empty());
590 assert!(self.cancel_handles.is_empty());
591
592 let _ = self
597 .past_lookahead_limit_sender
598 .lock()
599 .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard")
600 .send(false);
601 }
602
603 pub fn in_flight(&mut self) -> usize {
605 self.pending.len()
606 }
607
608 #[allow(dead_code)]
610 pub fn is_empty(&mut self) -> bool {
611 self.pending.is_empty()
612 }
613}