zebra_consensus/router.rs
1//! Top-level semantic block verification for Zebra.
2//!
3//! Verifies blocks using the [`CheckpointVerifier`] or full [`SemanticBlockVerifier`],
4//! depending on the config and block height.
5//!
6//! # Correctness
7//!
8//! Block and transaction verification requests should be wrapped in a timeout, because:
9//! - checkpoint verification waits for previous blocks, and
10//! - full block and transaction verification wait for UTXOs from previous blocks.
11//!
12//! Otherwise, verification of out-of-order and invalid blocks and transactions can hang
13//! indefinitely.
14
15use core::fmt;
16use std::{
17 future::Future,
18 pin::Pin,
19 task::{Context, Poll},
20};
21
22use futures::{FutureExt, TryFutureExt};
23use thiserror::Error;
24use tokio::{sync::oneshot, task::JoinHandle};
25use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
26use tracing::{instrument, Instrument, Span};
27
28use zebra_chain::{
29 block::{self, Height},
30 parameters::Network,
31};
32
33use zebra_node_services::mempool;
34use zebra_state as zs;
35
36use crate::{
37 block::{Request, SemanticBlockVerifier, VerifyBlockError},
38 checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
39 error::TransactionError,
40 transaction, BoxError, Config, ParameterCheckpoint as _,
41};
42
43#[cfg(test)]
44mod tests;
45
46/// The bound for the chain verifier and transaction verifier buffers.
47///
48/// We choose the verifier buffer bound based on the maximum number of
49/// concurrent verifier users, to avoid contention:
50/// - the `ChainSync` block download and verify stream
51/// - the `Inbound` block download and verify stream
52/// - the `Mempool` transaction download and verify stream
53/// - a block miner component, which we might add in future, and
54/// - 1 extra slot to avoid contention.
55///
56/// We deliberately add extra slots, because they only cost a small amount of
57/// memory, but missing slots can significantly slow down Zebra.
58const VERIFIER_BUFFER_BOUND: usize = 5;
59
60/// The block verifier router routes requests to either the checkpoint verifier or the
61/// semantic block verifier, depending on the maximum checkpoint height.
62///
63/// # Correctness
64///
65/// Block verification requests should be wrapped in a timeout, so that
66/// out-of-order and invalid requests do not hang indefinitely. See the [`router`](`crate::router`)
67/// module documentation for details.
68struct BlockVerifierRouter<S, V>
69where
70 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
71 S::Future: Send + 'static,
72 V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
73 + Send
74 + Clone
75 + 'static,
76 V::Future: Send + 'static,
77{
78 /// The checkpointing block verifier.
79 ///
80 /// Always used for blocks before `Canopy`, optionally used for the entire checkpoint list.
81 checkpoint: CheckpointVerifier<S>,
82
83 /// The highest permitted checkpoint block.
84 ///
85 /// This height must be in the `checkpoint` verifier's checkpoint list.
86 max_checkpoint_height: block::Height,
87
88 /// The full semantic block verifier, used for blocks after `max_checkpoint_height`.
89 block: SemanticBlockVerifier<S, V>,
90}
91
92/// An error while semantically verifying a block.
93//
94// One or both of these error variants are at least 140 bytes
95#[derive(Debug, Error)]
96#[allow(missing_docs)]
97pub enum RouterError {
98 /// Block could not be checkpointed
99 Checkpoint { source: Box<VerifyCheckpointError> },
100 /// Block could not be full-verified
101 Block { source: Box<VerifyBlockError> },
102}
103
104impl fmt::Display for RouterError {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 f.write_str(&match self {
107 RouterError::Checkpoint { source } => {
108 format!("block could not be checkpointed due to: {source}")
109 }
110 RouterError::Block { source } => {
111 format!("block could not be full-verified due to: {source}")
112 }
113 })
114 }
115}
116
117impl From<VerifyCheckpointError> for RouterError {
118 fn from(err: VerifyCheckpointError) -> Self {
119 RouterError::Checkpoint {
120 source: Box::new(err),
121 }
122 }
123}
124
125impl From<VerifyBlockError> for RouterError {
126 fn from(err: VerifyBlockError) -> Self {
127 RouterError::Block {
128 source: Box::new(err),
129 }
130 }
131}
132
133impl RouterError {
134 /// Returns `true` if this is definitely a duplicate request.
135 /// Some duplicate requests might not be detected, and therefore return `false`.
136 pub fn is_duplicate_request(&self) -> bool {
137 match self {
138 RouterError::Checkpoint { source, .. } => source.is_duplicate_request(),
139 RouterError::Block { source, .. } => source.is_duplicate_request(),
140 }
141 }
142
143 /// Returns a suggested misbehaviour score increment for a certain error.
144 pub fn misbehavior_score(&self) -> u32 {
145 // TODO: Adjust these values based on zcashd (#9258).
146 match self {
147 RouterError::Checkpoint { source } => source.misbehavior_score(),
148 RouterError::Block { source } => source.misbehavior_score(),
149 }
150 }
151}
152
153impl<S, V> Service<Request> for BlockVerifierRouter<S, V>
154where
155 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
156 S::Future: Send + 'static,
157 V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
158 + Send
159 + Clone
160 + 'static,
161 V::Future: Send + 'static,
162{
163 type Response = block::Hash;
164 type Error = RouterError;
165 type Future =
166 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
167
168 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 // CORRECTNESS
170 //
171 // The current task must be scheduled for wakeup every time we return
172 // `Poll::Pending`.
173 //
174 // If either verifier is unready, this task is scheduled for wakeup when it becomes
175 // ready.
176 //
177 // We acquire checkpoint readiness before block readiness, to avoid an unlikely
178 // hang during the checkpoint to block verifier transition. If the checkpoint and
179 // block verifiers are contending for the same buffer/batch, we want the checkpoint
180 // verifier to win, so that checkpoint verification completes, and block verification
181 // can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
182 use futures::ready;
183 // The chain verifier holds one slot in each verifier, for each concurrent task.
184 // Therefore, any shared buffers or batches polled by these verifiers should double
185 // their bounds. (For example, the state service buffer.)
186 ready!(self.checkpoint.poll_ready(cx))?;
187 ready!(self.block.poll_ready(cx))?;
188 Poll::Ready(Ok(()))
189 }
190
191 fn call(&mut self, request: Request) -> Self::Future {
192 let block = request.block();
193
194 match block.coinbase_height() {
195 // There's currently no known use case for block proposals below the checkpoint height,
196 // so it's okay to immediately return an error here.
197 Some(height) if height <= self.max_checkpoint_height && request.is_proposal() => {
198 async {
199 // TODO: Add a `ValidateProposalError` enum with a `BelowCheckpoint` variant?
200 Err(VerifyBlockError::ValidateProposal(
201 "block proposals must be above checkpoint height".into(),
202 ))?
203 }
204 .boxed()
205 }
206
207 Some(height) if height <= self.max_checkpoint_height => {
208 self.checkpoint.call(block).map_err(Into::into).boxed()
209 }
210 // This also covers blocks with no height, which the block verifier
211 // will reject immediately.
212 _ => self.block.call(request).map_err(Into::into).boxed(),
213 }
214 }
215}
216
217/// Initialize block and transaction verification services.
218///
219/// Returns a block verifier, transaction verifier,
220/// a [`BackgroundTaskHandles`] with the state checkpoint verify task,
221/// and the maximum configured checkpoint verification height.
222///
223/// The consensus configuration is specified by `config`, and the Zcash network
224/// to verify blocks for is specified by `network`.
225///
226/// The block verification service asynchronously performs semantic verification
227/// checks. Blocks that pass semantic verification are submitted to the supplied
228/// `state_service` for contextual verification before being committed to the chain.
229///
230/// The transaction verification service asynchronously performs semantic verification
231/// checks. Transactions that pass semantic verification return an `Ok` result to the caller.
232///
233/// This function should only be called once for a particular state service.
234///
235/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
236///
237/// # Correctness
238///
239/// Block and transaction verification requests should be wrapped in a timeout,
240/// so that out-of-order and invalid requests do not hang indefinitely.
241/// See the [`router`](`crate::router`) module documentation for details.
242#[instrument(skip(state_service, mempool))]
243pub async fn init<S, Mempool>(
244 config: Config,
245 network: &Network,
246 mut state_service: S,
247 mempool: oneshot::Receiver<Mempool>,
248) -> (
249 Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
250 Buffer<
251 BoxService<transaction::Request, transaction::Response, TransactionError>,
252 transaction::Request,
253 >,
254 BackgroundTaskHandles,
255 Height,
256)
257where
258 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
259 S::Future: Send + 'static,
260 Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>
261 + Send
262 + Clone
263 + 'static,
264 Mempool::Future: Send + 'static,
265{
266 // Give other tasks priority before spawning the checkpoint task.
267 tokio::task::yield_now().await;
268
269 // Make sure the state contains the known best chain checkpoints, in a separate thread.
270
271 let checkpoint_state_service = state_service.clone();
272 let checkpoint_sync = config.checkpoint_sync;
273 let checkpoint_network = network.clone();
274
275 let state_checkpoint_verify_handle = tokio::task::spawn(
276 // TODO: move this into an async function?
277 async move {
278 tracing::info!("starting state checkpoint validation");
279
280 // # Consensus
281 //
282 // We want to verify all available checkpoints, even if the node is not configured
283 // to use them for syncing. Zebra's checkpoints are updated with every release,
284 // which makes sure they include the latest settled network upgrade.
285 //
286 // > A network upgrade is settled on a given network when there is a social
287 // > consensus that it has activated with a given activation block hash.
288 // > A full validator that potentially risks Mainnet funds or displays Mainnet
289 // > transaction information to a user MUST do so only for a block chain that
290 // > includes the activation block of the most recent settled network upgrade,
291 // > with the corresponding activation block hash. Currently, there is social
292 // > consensus that NU5 has activated on the Zcash Mainnet and Testnet with the
293 // > activation block hashes given in § 3.12 ‘Mainnet and Testnet’ on p. 20.
294 //
295 // <https://zips.z.cash/protocol/protocol.pdf#blockchain>
296 let full_checkpoints = checkpoint_network.checkpoint_list();
297 let mut already_warned = false;
298
299 for (height, checkpoint_hash) in full_checkpoints.iter() {
300 let checkpoint_state_service = checkpoint_state_service.clone();
301 let request = zebra_state::Request::BestChainBlockHash(*height);
302
303 match checkpoint_state_service.oneshot(request).await {
304 Ok(zebra_state::Response::BlockHash(Some(state_hash))) => assert_eq!(
305 *checkpoint_hash, state_hash,
306 "invalid block in state: a previous Zebra instance followed an \
307 incorrect chain. Delete and re-sync your state to use the best chain"
308 ),
309
310 Ok(zebra_state::Response::BlockHash(None)) => {
311 if checkpoint_sync {
312 tracing::info!(
313 "state is not fully synced yet, remaining checkpoints will be \
314 verified during syncing"
315 );
316 } else {
317 tracing::warn!(
318 "state is not fully synced yet, remaining checkpoints will be \
319 verified next time Zebra starts up. Zebra will be less secure \
320 until it is restarted. Use consensus.checkpoint_sync = true \
321 in zebrad.toml to make sure you are following a valid chain"
322 );
323 }
324
325 break;
326 }
327
328 Ok(response) => {
329 unreachable!("unexpected response type: {response:?} from state request")
330 }
331 Err(e) => {
332 // This error happens a lot in some tests, and it could happen to users.
333 if !already_warned {
334 tracing::warn!(
335 "unexpected error: {e:?} in state request while verifying previous \
336 state checkpoints. Is Zebra shutting down?"
337 );
338 already_warned = true;
339 }
340 }
341 }
342 }
343
344 tracing::info!("finished state checkpoint validation");
345 }
346 .instrument(Span::current()),
347 );
348
349 // transaction verification
350
351 let transaction = transaction::Verifier::new(network, state_service.clone(), mempool);
352 let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
353
354 // block verification
355 let (list, max_checkpoint_height) = init_checkpoint_list(config, network);
356
357 let tip = match state_service
358 .ready()
359 .await
360 .unwrap()
361 .call(zs::Request::Tip)
362 .await
363 .unwrap()
364 {
365 zs::Response::Tip(tip) => tip,
366 _ => unreachable!("wrong response to Request::Tip"),
367 };
368 tracing::info!(
369 ?tip,
370 ?max_checkpoint_height,
371 "initializing block verifier router"
372 );
373
374 let block = SemanticBlockVerifier::new(network, state_service.clone(), transaction.clone());
375 let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
376 let router = BlockVerifierRouter {
377 checkpoint,
378 max_checkpoint_height,
379 block,
380 };
381
382 let router = Buffer::new(BoxService::new(router), VERIFIER_BUFFER_BOUND);
383
384 let task_handles = BackgroundTaskHandles {
385 state_checkpoint_verify_handle,
386 };
387
388 (router, transaction, task_handles, max_checkpoint_height)
389}
390
391/// Parses the checkpoint list for `network` and `config`.
392/// Returns the checkpoint list and maximum checkpoint height.
393pub fn init_checkpoint_list(config: Config, network: &Network) -> (CheckpointList, Height) {
394 // TODO: Zebra parses the checkpoint list three times at startup.
395 // Instead, cache the checkpoint list for each `network`.
396 let list = network.checkpoint_list();
397
398 let max_checkpoint_height = if config.checkpoint_sync {
399 list.max_height()
400 } else {
401 list.min_height_in_range(network.mandatory_checkpoint_height()..)
402 .expect("hardcoded checkpoint list extends past canopy activation")
403 };
404
405 (list, max_checkpoint_height)
406}
407
408/// The background task handles for `zebra-consensus` verifier initialization.
409#[derive(Debug)]
410pub struct BackgroundTaskHandles {
411 /// A handle to the state checkpoint verify task.
412 /// Finishes when all the checkpoints are verified, or when the state tip is reached.
413 pub state_checkpoint_verify_handle: JoinHandle<()>,
414}
415
416/// Calls [`init`] with a closed mempool setup channel for conciseness in tests.
417///
418/// See [`init`] for more details.
419#[cfg(any(test, feature = "proptest-impl"))]
420pub async fn init_test<S>(
421 config: Config,
422 network: &Network,
423 state_service: S,
424) -> (
425 Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
426 Buffer<
427 BoxService<transaction::Request, transaction::Response, TransactionError>,
428 transaction::Request,
429 >,
430 BackgroundTaskHandles,
431 Height,
432)
433where
434 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
435 S::Future: Send + 'static,
436{
437 init(
438 config.clone(),
439 network,
440 state_service.clone(),
441 oneshot::channel::<
442 Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
443 >()
444 .1,
445 )
446 .await
447}