1use std::{
4 collections::HashMap,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures::{
10 future::TryFutureExt,
11 ready,
12 stream::{FuturesUnordered, Stream},
13};
14use pin_project::pin_project;
15use tokio::{sync::oneshot, task::JoinHandle};
16use tower::{Service, ServiceExt};
17use tracing_futures::Instrument;
18
19use zebra_chain::{
20 block::{self, HeightDiff},
21 chain_tip::ChainTip,
22};
23use zebra_network::{self as zn, PeerSocketAddr};
24use zebra_state as zs;
25
26use crate::components::sync::MIN_CONCURRENCY_LIMIT;
27
28type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
29
30pub const MAX_INBOUND_CONCURRENCY: usize = 30;
53
54pub enum DownloadAction {
56 AddedToQueue,
58
59 AlreadyQueued,
63
64 FullQueue,
69}
70
71#[pin_project]
73#[derive(Debug)]
74pub struct Downloads<ZN, ZV, ZS>
75where
76 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
77 ZN::Future: Send,
78 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
79 + Send
80 + Clone
81 + 'static,
82 ZV::Future: Send,
83 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
84 ZS::Future: Send,
85{
86 full_verify_concurrency_limit: usize,
90
91 network: ZN,
96
97 verifier: ZV,
99
100 state: ZS,
102
103 latest_chain_tip: zs::LatestChainTip,
105
106 #[pin]
110 pending: FuturesUnordered<
111 JoinHandle<Result<block::Hash, (BoxError, block::Hash, Option<PeerSocketAddr>)>>,
112 >,
113
114 cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
117}
118
119impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
120where
121 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
122 ZN::Future: Send,
123 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
124 + Send
125 + Clone
126 + 'static,
127 ZV::Future: Send,
128 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
129 ZS::Future: Send,
130{
131 type Item = Result<block::Hash, (BoxError, Option<PeerSocketAddr>)>;
132
133 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
134 let this = self.project();
135 if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
145 match join_result.expect("block download and verify tasks must not panic") {
146 Ok(hash) => {
147 this.cancel_handles.remove(&hash);
148 Poll::Ready(Some(Ok(hash)))
149 }
150 Err((e, hash, advertiser_addr)) => {
151 this.cancel_handles.remove(&hash);
152 Poll::Ready(Some(Err((e, advertiser_addr))))
153 }
154 }
155 } else {
156 Poll::Ready(None)
157 }
158 }
159
160 fn size_hint(&self) -> (usize, Option<usize>) {
161 self.pending.size_hint()
162 }
163}
164
165impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
166where
167 ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
168 ZN::Future: Send,
169 ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
170 + Send
171 + Clone
172 + 'static,
173 ZV::Future: Send,
174 ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
175 ZS::Future: Send,
176{
177 pub fn new(
184 full_verify_concurrency_limit: usize,
185 network: ZN,
186 verifier: ZV,
187 state: ZS,
188 latest_chain_tip: zs::LatestChainTip,
189 ) -> Self {
190 let full_verify_concurrency_limit =
192 full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);
193
194 Self {
195 full_verify_concurrency_limit,
196 network,
197 verifier,
198 state,
199 latest_chain_tip,
200 pending: FuturesUnordered::new(),
201 cancel_handles: HashMap::new(),
202 }
203 }
204
205 #[instrument(skip(self, hash), fields(hash = %hash))]
209 pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction {
210 if self.cancel_handles.contains_key(&hash) {
211 debug!(
212 ?hash,
213 queue_len = self.pending.len(),
214 concurrency_limit = self.full_verify_concurrency_limit,
215 "block hash already queued for inbound download: ignored block",
216 );
217
218 metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
219 metrics::counter!("gossip.already.queued.dropped.block.hash.count").increment(1);
220
221 return DownloadAction::AlreadyQueued;
222 }
223
224 if self.pending.len() >= self.full_verify_concurrency_limit {
225 debug!(
226 ?hash,
227 queue_len = self.pending.len(),
228 concurrency_limit = self.full_verify_concurrency_limit,
229 "too many blocks queued for inbound download: ignored block",
230 );
231
232 metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
233 metrics::counter!("gossip.full.queue.dropped.block.hash.count").increment(1);
234
235 return DownloadAction::FullQueue;
236 }
237
238 let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
240
241 let state = self.state.clone();
242 let network = self.network.clone();
243 let verifier = self.verifier.clone();
244 let latest_chain_tip = self.latest_chain_tip.clone();
245 let full_verify_concurrency_limit = self.full_verify_concurrency_limit;
246
247 let fut = async move {
248 match state.oneshot(zs::Request::KnownBlock(hash)).await {
250 Ok(zs::Response::KnownBlock(None)) => Ok(()),
251 Ok(zs::Response::KnownBlock(Some(_))) => Err("already present".into()),
252 Ok(_) => unreachable!("wrong response"),
253 Err(e) => Err(e),
254 }
255 .map_err(|e| (e, None))?;
256
257 let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = network
258 .oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
259 .await
260 .map_err(|e| (e, None))?
261 {
262 assert_eq!(
263 blocks.len(),
264 1,
265 "wrong number of blocks in response to a single hash",
266 );
267
268 blocks
269 .first()
270 .expect("just checked length")
271 .available()
272 .expect(
273 "unexpected missing block status: single block failures should be errors",
274 )
275 } else {
276 unreachable!("wrong response to block request");
277 };
278 metrics::counter!("gossip.downloaded.block.count").increment(1);
279
280 let tip_height = latest_chain_tip.best_tip_height();
288
289 let max_lookahead_height = if let Some(tip_height) = tip_height {
290 let lookahead = HeightDiff::try_from(full_verify_concurrency_limit)
291 .expect("fits in HeightDiff");
292 (tip_height + lookahead).expect("tip is much lower than Height::MAX")
293 } else {
294 let genesis_lookahead =
295 u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
296 block::Height(genesis_lookahead)
297 };
298
299 let min_accepted_height = tip_height
309 .map(|tip_height| {
310 block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
311 })
312 .unwrap_or(block::Height(0));
313
314 let block_height = block
315 .coinbase_height()
316 .ok_or_else(|| {
317 debug!(
318 ?hash,
319 "gossiped block with no height: dropped downloaded block"
320 );
321 metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
322
323 BoxError::from("gossiped block with no height")
324 })
325 .map_err(|e| (e, None))?;
326
327 if block_height > max_lookahead_height {
328 debug!(
329 ?hash,
330 ?block_height,
331 ?tip_height,
332 ?max_lookahead_height,
333 lookahead_limit = full_verify_concurrency_limit,
334 "gossiped block height too far ahead of the tip: dropped downloaded block",
335 );
336 metrics::counter!("gossip.max.height.limit.dropped.block.count").increment(1);
337
338 Err("gossiped block height too far ahead").map_err(|e| (e.into(), None))?;
339 } else if block_height < min_accepted_height {
340 debug!(
341 ?hash,
342 ?block_height,
343 ?tip_height,
344 ?min_accepted_height,
345 behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
346 "gossiped block height behind the finalized tip: dropped downloaded block",
347 );
348 metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
349
350 Err("gossiped block height behind the finalized tip")
351 .map_err(|e| (e.into(), None))?;
352 }
353
354 verifier
355 .oneshot(zebra_consensus::Request::Commit(block))
356 .await
357 .map(|hash| (hash, block_height))
358 .map_err(|e| (e, advertiser_addr))
359 }
360 .map_ok(|(hash, height)| {
361 info!(?height, "downloaded and verified gossiped block");
362 metrics::counter!("gossip.verified.block.count").increment(1);
363 hash
364 })
365 .map_err(move |(e, advertiser_addr)| (e, hash, advertiser_addr))
368 .in_current_span();
369
370 let task = tokio::spawn(async move {
371 tokio::select! {
373 biased;
374 _ = &mut cancel_rx => {
375 trace!("task cancelled prior to completion");
376 metrics::counter!("gossip.cancelled.count").increment(1);
377 Err(("canceled".into(), hash, None))
378 }
379 verification = fut => verification,
380 }
381 });
382
383 self.pending.push(task);
384 assert!(
385 self.cancel_handles.insert(hash, cancel_tx).is_none(),
386 "blocks are only queued once"
387 );
388
389 debug!(
390 ?hash,
391 queue_len = self.pending.len(),
392 concurrency_limit = self.full_verify_concurrency_limit,
393 "queued hash for download",
394 );
395 metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
396
397 DownloadAction::AddedToQueue
398 }
399}