zebra_consensus/
router.rs

1//! Top-level semantic block verification for Zebra.
2//!
3//! Verifies blocks using the [`CheckpointVerifier`] or full [`SemanticBlockVerifier`],
4//! depending on the config and block height.
5//!
6//! # Correctness
7//!
8//! Block and transaction verification requests should be wrapped in a timeout, because:
9//! - checkpoint verification waits for previous blocks, and
10//! - full block and transaction verification wait for UTXOs from previous blocks.
11//!
12//! Otherwise, verification of out-of-order and invalid blocks and transactions can hang
13//! indefinitely.
14
15use core::fmt;
16use std::{
17    future::Future,
18    pin::Pin,
19    task::{Context, Poll},
20};
21
22use futures::{FutureExt, TryFutureExt};
23use thiserror::Error;
24use tokio::{sync::oneshot, task::JoinHandle};
25use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
26use tracing::{instrument, Instrument, Span};
27
28use zebra_chain::{
29    block::{self, Height},
30    parameters::Network,
31};
32
33use zebra_node_services::mempool;
34use zebra_state as zs;
35
36use crate::{
37    block::{Request, SemanticBlockVerifier, VerifyBlockError},
38    checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
39    error::TransactionError,
40    transaction, BoxError, Config, ParameterCheckpoint as _,
41};
42
43#[cfg(test)]
44mod tests;
45
46/// The bound for the chain verifier and transaction verifier buffers.
47///
48/// We choose the verifier buffer bound based on the maximum number of
49/// concurrent verifier users, to avoid contention:
50///   - the `ChainSync` block download and verify stream
51///   - the `Inbound` block download and verify stream
52///   - the `Mempool` transaction download and verify stream
53///   - a block miner component, which we might add in future, and
54///   - 1 extra slot to avoid contention.
55///
56/// We deliberately add extra slots, because they only cost a small amount of
57/// memory, but missing slots can significantly slow down Zebra.
58const VERIFIER_BUFFER_BOUND: usize = 5;
59
60/// The block verifier router routes requests to either the checkpoint verifier or the
61/// semantic block verifier, depending on the maximum checkpoint height.
62///
63/// # Correctness
64///
65/// Block verification requests should be wrapped in a timeout, so that
66/// out-of-order and invalid requests do not hang indefinitely. See the [`router`](`crate::router`)
67/// module documentation for details.
68struct BlockVerifierRouter<S, V>
69where
70    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
71    S::Future: Send + 'static,
72    V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
73        + Send
74        + Clone
75        + 'static,
76    V::Future: Send + 'static,
77{
78    /// The checkpointing block verifier.
79    ///
80    /// Always used for blocks before `Canopy`, optionally used for the entire checkpoint list.
81    checkpoint: CheckpointVerifier<S>,
82
83    /// The highest permitted checkpoint block.
84    ///
85    /// This height must be in the `checkpoint` verifier's checkpoint list.
86    max_checkpoint_height: block::Height,
87
88    /// The full semantic block verifier, used for blocks after `max_checkpoint_height`.
89    block: SemanticBlockVerifier<S, V>,
90}
91
92/// An error while semantically verifying a block.
93//
94// One or both of these error variants are at least 140 bytes
95#[derive(Debug, Error)]
96#[allow(missing_docs)]
97pub enum RouterError {
98    /// Block could not be checkpointed
99    Checkpoint { source: Box<VerifyCheckpointError> },
100    /// Block could not be full-verified
101    Block { source: Box<VerifyBlockError> },
102}
103
104impl fmt::Display for RouterError {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        f.write_str(&match self {
107            RouterError::Checkpoint { source } => {
108                format!("block could not be checkpointed due to: {source}")
109            }
110            RouterError::Block { source } => {
111                format!("block could not be full-verified due to: {source}")
112            }
113        })
114    }
115}
116
117impl From<VerifyCheckpointError> for RouterError {
118    fn from(err: VerifyCheckpointError) -> Self {
119        RouterError::Checkpoint {
120            source: Box::new(err),
121        }
122    }
123}
124
125impl From<VerifyBlockError> for RouterError {
126    fn from(err: VerifyBlockError) -> Self {
127        RouterError::Block {
128            source: Box::new(err),
129        }
130    }
131}
132
133impl RouterError {
134    /// Returns `true` if this is definitely a duplicate request.
135    /// Some duplicate requests might not be detected, and therefore return `false`.
136    pub fn is_duplicate_request(&self) -> bool {
137        match self {
138            RouterError::Checkpoint { source, .. } => source.is_duplicate_request(),
139            RouterError::Block { source, .. } => source.is_duplicate_request(),
140        }
141    }
142
143    /// Returns a suggested misbehaviour score increment for a certain error.
144    pub fn misbehavior_score(&self) -> u32 {
145        // TODO: Adjust these values based on zcashd (#9258).
146        match self {
147            RouterError::Checkpoint { source } => source.misbehavior_score(),
148            RouterError::Block { source } => source.misbehavior_score(),
149        }
150    }
151}
152
153impl<S, V> Service<Request> for BlockVerifierRouter<S, V>
154where
155    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
156    S::Future: Send + 'static,
157    V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
158        + Send
159        + Clone
160        + 'static,
161    V::Future: Send + 'static,
162{
163    type Response = block::Hash;
164    type Error = RouterError;
165    type Future =
166        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
167
168    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169        // CORRECTNESS
170        //
171        // The current task must be scheduled for wakeup every time we return
172        // `Poll::Pending`.
173        //
174        // If either verifier is unready, this task is scheduled for wakeup when it becomes
175        // ready.
176        //
177        // We acquire checkpoint readiness before block readiness, to avoid an unlikely
178        // hang during the checkpoint to block verifier transition. If the checkpoint and
179        // block verifiers are contending for the same buffer/batch, we want the checkpoint
180        // verifier to win, so that checkpoint verification completes, and block verification
181        // can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
182        use futures::ready;
183        // The chain verifier holds one slot in each verifier, for each concurrent task.
184        // Therefore, any shared buffers or batches polled by these verifiers should double
185        // their bounds. (For example, the state service buffer.)
186        ready!(self.checkpoint.poll_ready(cx))?;
187        ready!(self.block.poll_ready(cx))?;
188        Poll::Ready(Ok(()))
189    }
190
191    fn call(&mut self, request: Request) -> Self::Future {
192        let block = request.block();
193
194        match block.coinbase_height() {
195            // There's currently no known use case for block proposals below the checkpoint height,
196            // so it's okay to immediately return an error here.
197            Some(height) if height <= self.max_checkpoint_height && request.is_proposal() => {
198                async {
199                    // TODO: Add a `ValidateProposalError` enum with a `BelowCheckpoint` variant?
200                    Err(VerifyBlockError::ValidateProposal(
201                        "block proposals must be above checkpoint height".into(),
202                    ))?
203                }
204                .boxed()
205            }
206
207            Some(height) if height <= self.max_checkpoint_height => {
208                self.checkpoint.call(block).map_err(Into::into).boxed()
209            }
210            // This also covers blocks with no height, which the block verifier
211            // will reject immediately.
212            _ => self.block.call(request).map_err(Into::into).boxed(),
213        }
214    }
215}
216
217/// Initialize block and transaction verification services.
218///
219/// Returns a block verifier, transaction verifier,
220/// a [`BackgroundTaskHandles`] with the state checkpoint verify task,
221/// and the maximum configured checkpoint verification height.
222///
223/// The consensus configuration is specified by `config`, and the Zcash network
224/// to verify blocks for is specified by `network`.
225///
226/// The block verification service asynchronously performs semantic verification
227/// checks. Blocks that pass semantic verification are submitted to the supplied
228/// `state_service` for contextual verification before being committed to the chain.
229///
230/// The transaction verification service asynchronously performs semantic verification
231/// checks. Transactions that pass semantic verification return an `Ok` result to the caller.
232///
233/// This function should only be called once for a particular state service.
234///
235/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
236///
237/// # Correctness
238///
239/// Block and transaction verification requests should be wrapped in a timeout,
240/// so that out-of-order and invalid requests do not hang indefinitely.
241/// See the [`router`](`crate::router`) module documentation for details.
242#[instrument(skip(state_service, mempool))]
243pub async fn init<S, Mempool>(
244    config: Config,
245    network: &Network,
246    mut state_service: S,
247    mempool: oneshot::Receiver<Mempool>,
248) -> (
249    Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
250    Buffer<
251        BoxService<transaction::Request, transaction::Response, TransactionError>,
252        transaction::Request,
253    >,
254    BackgroundTaskHandles,
255    Height,
256)
257where
258    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
259    S::Future: Send + 'static,
260    Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>
261        + Send
262        + Clone
263        + 'static,
264    Mempool::Future: Send + 'static,
265{
266    // Give other tasks priority before spawning the checkpoint task.
267    tokio::task::yield_now().await;
268
269    // Make sure the state contains the known best chain checkpoints, in a separate thread.
270
271    let checkpoint_state_service = state_service.clone();
272    let checkpoint_sync = config.checkpoint_sync;
273    let checkpoint_network = network.clone();
274
275    let state_checkpoint_verify_handle = tokio::task::spawn(
276        // TODO: move this into an async function?
277        async move {
278            tracing::info!("starting state checkpoint validation");
279
280            // # Consensus
281            //
282            // We want to verify all available checkpoints, even if the node is not configured
283            // to use them for syncing. Zebra's checkpoints are updated with every release,
284            // which makes sure they include the latest settled network upgrade.
285            //
286            // > A network upgrade is settled on a given network when there is a social
287            // > consensus that it has activated with a given activation block hash.
288            // > A full validator that potentially risks Mainnet funds or displays Mainnet
289            // > transaction information to a user MUST do so only for a block chain that
290            // > includes the activation block of the most recent settled network upgrade,
291            // > with the corresponding activation block hash. Currently, there is social
292            // > consensus that NU5 has activated on the Zcash Mainnet and Testnet with the
293            // > activation block hashes given in § 3.12 ‘Mainnet and Testnet’ on p. 20.
294            //
295            // <https://zips.z.cash/protocol/protocol.pdf#blockchain>
296            let full_checkpoints = checkpoint_network.checkpoint_list();
297            let mut already_warned = false;
298
299            for (height, checkpoint_hash) in full_checkpoints.iter() {
300                let checkpoint_state_service = checkpoint_state_service.clone();
301                let request = zebra_state::Request::BestChainBlockHash(*height);
302
303                match checkpoint_state_service.oneshot(request).await {
304                    Ok(zebra_state::Response::BlockHash(Some(state_hash))) => assert_eq!(
305                        *checkpoint_hash, state_hash,
306                        "invalid block in state: a previous Zebra instance followed an \
307                         incorrect chain. Delete and re-sync your state to use the best chain"
308                    ),
309
310                    Ok(zebra_state::Response::BlockHash(None)) => {
311                        if checkpoint_sync {
312                            tracing::info!(
313                                "state is not fully synced yet, remaining checkpoints will be \
314                                 verified during syncing"
315                            );
316                        } else {
317                            tracing::warn!(
318                                "state is not fully synced yet, remaining checkpoints will be \
319                                 verified next time Zebra starts up. Zebra will be less secure \
320                                 until it is restarted. Use consensus.checkpoint_sync = true \
321                                 in zebrad.toml to make sure you are following a valid chain"
322                            );
323                        }
324
325                        break;
326                    }
327
328                    Ok(response) => {
329                        unreachable!("unexpected response type: {response:?} from state request")
330                    }
331                    Err(e) => {
332                        // This error happens a lot in some tests, and it could happen to users.
333                        if !already_warned {
334                            tracing::warn!(
335                                "unexpected error: {e:?} in state request while verifying previous \
336                                 state checkpoints. Is Zebra shutting down?"
337                            );
338                            already_warned = true;
339                        }
340                    }
341                }
342            }
343
344            tracing::info!("finished state checkpoint validation");
345        }
346        .instrument(Span::current()),
347    );
348
349    // transaction verification
350
351    let transaction = transaction::Verifier::new(network, state_service.clone(), mempool);
352    let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
353
354    // block verification
355    let (list, max_checkpoint_height) = init_checkpoint_list(config, network);
356
357    let tip = match state_service
358        .ready()
359        .await
360        .unwrap()
361        .call(zs::Request::Tip)
362        .await
363        .unwrap()
364    {
365        zs::Response::Tip(tip) => tip,
366        _ => unreachable!("wrong response to Request::Tip"),
367    };
368    tracing::info!(
369        ?tip,
370        ?max_checkpoint_height,
371        "initializing block verifier router"
372    );
373
374    let block = SemanticBlockVerifier::new(network, state_service.clone(), transaction.clone());
375    let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
376    let router = BlockVerifierRouter {
377        checkpoint,
378        max_checkpoint_height,
379        block,
380    };
381
382    let router = Buffer::new(BoxService::new(router), VERIFIER_BUFFER_BOUND);
383
384    let task_handles = BackgroundTaskHandles {
385        state_checkpoint_verify_handle,
386    };
387
388    (router, transaction, task_handles, max_checkpoint_height)
389}
390
391/// Parses the checkpoint list for `network` and `config`.
392/// Returns the checkpoint list and maximum checkpoint height.
393pub fn init_checkpoint_list(config: Config, network: &Network) -> (CheckpointList, Height) {
394    // TODO: Zebra parses the checkpoint list three times at startup.
395    //       Instead, cache the checkpoint list for each `network`.
396    let list = network.checkpoint_list();
397
398    let max_checkpoint_height = if config.checkpoint_sync {
399        list.max_height()
400    } else {
401        list.min_height_in_range(network.mandatory_checkpoint_height()..)
402            .expect("hardcoded checkpoint list extends past canopy activation")
403    };
404
405    (list, max_checkpoint_height)
406}
407
408/// The background task handles for `zebra-consensus` verifier initialization.
409#[derive(Debug)]
410pub struct BackgroundTaskHandles {
411    /// A handle to the state checkpoint verify task.
412    /// Finishes when all the checkpoints are verified, or when the state tip is reached.
413    pub state_checkpoint_verify_handle: JoinHandle<()>,
414}
415
416/// Calls [`init`] with a closed mempool setup channel for conciseness in tests.
417///
418/// See [`init`] for more details.
419#[cfg(any(test, feature = "proptest-impl"))]
420pub async fn init_test<S>(
421    config: Config,
422    network: &Network,
423    state_service: S,
424) -> (
425    Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
426    Buffer<
427        BoxService<transaction::Request, transaction::Response, TransactionError>,
428        transaction::Request,
429    >,
430    BackgroundTaskHandles,
431    Height,
432)
433where
434    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
435    S::Future: Send + 'static,
436{
437    init(
438        config.clone(),
439        network,
440        state_service.clone(),
441        oneshot::channel::<
442            Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
443        >()
444        .1,
445    )
446    .await
447}