zebra_consensus/
router.rs

1//! Top-level semantic block verification for Zebra.
2//!
3//! Verifies blocks using the [`CheckpointVerifier`] or full [`SemanticBlockVerifier`],
4//! depending on the config and block height.
5//!
6//! # Correctness
7//!
8//! Block and transaction verification requests should be wrapped in a timeout, because:
9//! - checkpoint verification waits for previous blocks, and
10//! - full block and transaction verification wait for UTXOs from previous blocks.
11//!
12//! Otherwise, verification of out-of-order and invalid blocks and transactions can hang
13//! indefinitely.
14
15use core::fmt;
16use std::{
17    future::Future,
18    pin::Pin,
19    sync::Arc,
20    task::{Context, Poll},
21};
22
23use futures::{FutureExt, TryFutureExt};
24use thiserror::Error;
25use tokio::{sync::oneshot, task::JoinHandle};
26use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
27use tracing::{instrument, Instrument, Span};
28
29use zebra_chain::{
30    block::{self, Height},
31    parameters::{checkpoint::list::CheckpointList, Network},
32};
33
34use zebra_node_services::mempool;
35use zebra_state as zs;
36
37use crate::{
38    block::{Request, SemanticBlockVerifier, VerifyBlockError},
39    checkpoint::{CheckpointVerifier, VerifyCheckpointError},
40    error::TransactionError,
41    transaction, BoxError, Config,
42};
43
44#[cfg(test)]
45mod tests;
46
47/// The bound for the chain verifier and transaction verifier buffers.
48///
49/// We choose the verifier buffer bound based on the maximum number of
50/// concurrent verifier users, to avoid contention:
51///   - the `ChainSync` block download and verify stream
52///   - the `Inbound` block download and verify stream
53///   - the `Mempool` transaction download and verify stream
54///   - a block miner component, which we might add in future, and
55///   - 1 extra slot to avoid contention.
56///
57/// We deliberately add extra slots, because they only cost a small amount of
58/// memory, but missing slots can significantly slow down Zebra.
59const VERIFIER_BUFFER_BOUND: usize = 5;
60
61/// The block verifier router routes requests to either the checkpoint verifier or the
62/// semantic block verifier, depending on the maximum checkpoint height.
63///
64/// # Correctness
65///
66/// Block verification requests should be wrapped in a timeout, so that
67/// out-of-order and invalid requests do not hang indefinitely. See the [`router`](`crate::router`)
68/// module documentation for details.
69struct BlockVerifierRouter<S, V>
70where
71    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
72    S::Future: Send + 'static,
73    V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
74        + Send
75        + Clone
76        + 'static,
77    V::Future: Send + 'static,
78{
79    /// The checkpointing block verifier.
80    ///
81    /// Always used for blocks before `Canopy`, optionally used for the entire checkpoint list.
82    checkpoint: CheckpointVerifier<S>,
83
84    /// The highest permitted checkpoint block.
85    ///
86    /// This height must be in the `checkpoint` verifier's checkpoint list.
87    max_checkpoint_height: block::Height,
88
89    /// The full semantic block verifier, used for blocks after `max_checkpoint_height`.
90    block: SemanticBlockVerifier<S, V>,
91}
92
93/// An error while semantically verifying a block.
94//
95// One or both of these error variants are at least 140 bytes
96#[derive(Debug, Error)]
97#[allow(missing_docs)]
98pub enum RouterError {
99    /// Block could not be checkpointed
100    Checkpoint { source: Box<VerifyCheckpointError> },
101    /// Block could not be full-verified
102    Block { source: Box<VerifyBlockError> },
103}
104
105impl fmt::Display for RouterError {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        f.write_str(&match self {
108            RouterError::Checkpoint { source } => {
109                format!("block could not be checkpointed due to: {source}")
110            }
111            RouterError::Block { source } => {
112                format!("block could not be full-verified due to: {source}")
113            }
114        })
115    }
116}
117
118impl From<VerifyCheckpointError> for RouterError {
119    fn from(err: VerifyCheckpointError) -> Self {
120        RouterError::Checkpoint {
121            source: Box::new(err),
122        }
123    }
124}
125
126impl From<VerifyBlockError> for RouterError {
127    fn from(err: VerifyBlockError) -> Self {
128        RouterError::Block {
129            source: Box::new(err),
130        }
131    }
132}
133
134impl RouterError {
135    /// Returns `true` if this is definitely a duplicate request.
136    /// Some duplicate requests might not be detected, and therefore return `false`.
137    pub fn is_duplicate_request(&self) -> bool {
138        match self {
139            RouterError::Checkpoint { source, .. } => source.is_duplicate_request(),
140            RouterError::Block { source, .. } => source.is_duplicate_request(),
141        }
142    }
143
144    /// Returns a suggested misbehaviour score increment for a certain error.
145    pub fn misbehavior_score(&self) -> u32 {
146        // TODO: Adjust these values based on zcashd (#9258).
147        match self {
148            RouterError::Checkpoint { source } => source.misbehavior_score(),
149            RouterError::Block { source } => source.misbehavior_score(),
150        }
151    }
152}
153
154impl<S, V> Service<Request> for BlockVerifierRouter<S, V>
155where
156    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
157    S::Future: Send + 'static,
158    V: Service<transaction::Request, Response = transaction::Response, Error = BoxError>
159        + Send
160        + Clone
161        + 'static,
162    V::Future: Send + 'static,
163{
164    type Response = block::Hash;
165    type Error = RouterError;
166    type Future =
167        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
168
169    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
170        // CORRECTNESS
171        //
172        // The current task must be scheduled for wakeup every time we return
173        // `Poll::Pending`.
174        //
175        // If either verifier is unready, this task is scheduled for wakeup when it becomes
176        // ready.
177        //
178        // We acquire checkpoint readiness before block readiness, to avoid an unlikely
179        // hang during the checkpoint to block verifier transition. If the checkpoint and
180        // block verifiers are contending for the same buffer/batch, we want the checkpoint
181        // verifier to win, so that checkpoint verification completes, and block verification
182        // can start. (Buffers and batches have multiple slots, so this contention is unlikely.)
183        use futures::ready;
184        // The chain verifier holds one slot in each verifier, for each concurrent task.
185        // Therefore, any shared buffers or batches polled by these verifiers should double
186        // their bounds. (For example, the state service buffer.)
187        ready!(self.checkpoint.poll_ready(cx))?;
188        ready!(self.block.poll_ready(cx))?;
189        Poll::Ready(Ok(()))
190    }
191
192    fn call(&mut self, request: Request) -> Self::Future {
193        let block = request.block();
194
195        match block.coinbase_height() {
196            // There's currently no known use case for block proposals below the checkpoint height,
197            // so it's okay to immediately return an error here.
198            Some(height) if height <= self.max_checkpoint_height && request.is_proposal() => {
199                async {
200                    // TODO: Add a `ValidateProposalError` enum with a `BelowCheckpoint` variant?
201                    Err(VerifyBlockError::ValidateProposal(
202                        "block proposals must be above checkpoint height".into(),
203                    ))?
204                }
205                .boxed()
206            }
207
208            Some(height) if height <= self.max_checkpoint_height => {
209                self.checkpoint.call(block).map_err(Into::into).boxed()
210            }
211            // This also covers blocks with no height, which the block verifier
212            // will reject immediately.
213            _ => self.block.call(request).map_err(Into::into).boxed(),
214        }
215    }
216}
217
218/// Initialize block and transaction verification services.
219///
220/// Returns a block verifier, transaction verifier,
221/// a [`BackgroundTaskHandles`] with the state checkpoint verify task,
222/// and the maximum configured checkpoint verification height.
223///
224/// The consensus configuration is specified by `config`, and the Zcash network
225/// to verify blocks for is specified by `network`.
226///
227/// The block verification service asynchronously performs semantic verification
228/// checks. Blocks that pass semantic verification are submitted to the supplied
229/// `state_service` for contextual verification before being committed to the chain.
230///
231/// The transaction verification service asynchronously performs semantic verification
232/// checks. Transactions that pass semantic verification return an `Ok` result to the caller.
233///
234/// This function should only be called once for a particular state service.
235///
236/// Dropped requests are cancelled on a best-effort basis, but may continue to be processed.
237///
238/// # Correctness
239///
240/// Block and transaction verification requests should be wrapped in a timeout,
241/// so that out-of-order and invalid requests do not hang indefinitely.
242/// See the [`router`](`crate::router`) module documentation for details.
243#[instrument(skip(state_service, mempool))]
244pub async fn init<S, Mempool>(
245    config: Config,
246    network: &Network,
247    mut state_service: S,
248    mempool: oneshot::Receiver<Mempool>,
249) -> (
250    Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
251    Buffer<
252        BoxService<transaction::Request, transaction::Response, TransactionError>,
253        transaction::Request,
254    >,
255    BackgroundTaskHandles,
256    Height,
257)
258where
259    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
260    S::Future: Send + 'static,
261    Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>
262        + Send
263        + Clone
264        + 'static,
265    Mempool::Future: Send + 'static,
266{
267    // Give other tasks priority before spawning the checkpoint task.
268    tokio::task::yield_now().await;
269
270    // Make sure the state contains the known best chain checkpoints, in a separate thread.
271
272    let checkpoint_state_service = state_service.clone();
273    let checkpoint_sync = config.checkpoint_sync;
274    let checkpoint_network = network.clone();
275
276    let state_checkpoint_verify_handle = tokio::task::spawn(
277        // TODO: move this into an async function?
278        async move {
279            tracing::info!("starting state checkpoint validation");
280
281            // # Consensus
282            //
283            // We want to verify all available checkpoints, even if the node is not configured
284            // to use them for syncing. Zebra's checkpoints are updated with every release,
285            // which makes sure they include the latest settled network upgrade.
286            //
287            // > A network upgrade is settled on a given network when there is a social
288            // > consensus that it has activated with a given activation block hash.
289            // > A full validator that potentially risks Mainnet funds or displays Mainnet
290            // > transaction information to a user MUST do so only for a block chain that
291            // > includes the activation block of the most recent settled network upgrade,
292            // > with the corresponding activation block hash. Currently, there is social
293            // > consensus that NU5 has activated on the Zcash Mainnet and Testnet with the
294            // > activation block hashes given in § 3.12 ‘Mainnet and Testnet’ on p. 20.
295            //
296            // <https://zips.z.cash/protocol/protocol.pdf#blockchain>
297            let full_checkpoints = checkpoint_network.checkpoint_list();
298            let mut already_warned = false;
299
300            for (height, checkpoint_hash) in full_checkpoints.iter() {
301                let checkpoint_state_service = checkpoint_state_service.clone();
302                let request = zebra_state::Request::BestChainBlockHash(*height);
303
304                match checkpoint_state_service.oneshot(request).await {
305                    Ok(zebra_state::Response::BlockHash(Some(state_hash))) => assert_eq!(
306                        *checkpoint_hash, state_hash,
307                        "invalid block in state: a previous Zebra instance followed an \
308                         incorrect chain. Delete and re-sync your state to use the best chain"
309                    ),
310
311                    Ok(zebra_state::Response::BlockHash(None)) => {
312                        if checkpoint_sync {
313                            tracing::info!(
314                                "state is not fully synced yet, remaining checkpoints will be \
315                                 verified during syncing"
316                            );
317                        } else {
318                            tracing::warn!(
319                                "state is not fully synced yet, remaining checkpoints will be \
320                                 verified next time Zebra starts up. Zebra will be less secure \
321                                 until it is restarted. Use consensus.checkpoint_sync = true \
322                                 in zebrad.toml to make sure you are following a valid chain"
323                            );
324                        }
325
326                        break;
327                    }
328
329                    Ok(response) => {
330                        unreachable!("unexpected response type: {response:?} from state request")
331                    }
332                    Err(e) => {
333                        // This error happens a lot in some tests, and it could happen to users.
334                        if !already_warned {
335                            tracing::warn!(
336                                "unexpected error: {e:?} in state request while verifying previous \
337                                 state checkpoints. Is Zebra shutting down?"
338                            );
339                            already_warned = true;
340                        }
341                    }
342                }
343            }
344
345            tracing::info!("finished state checkpoint validation");
346        }
347        .instrument(Span::current()),
348    );
349
350    // transaction verification
351
352    let transaction = transaction::Verifier::new(network, state_service.clone(), mempool);
353    let transaction = Buffer::new(BoxService::new(transaction), VERIFIER_BUFFER_BOUND);
354
355    // block verification
356    let (list, max_checkpoint_height) = init_checkpoint_list(config, network);
357
358    let tip = match state_service
359        .ready()
360        .await
361        .unwrap()
362        .call(zs::Request::Tip)
363        .await
364        .unwrap()
365    {
366        zs::Response::Tip(tip) => tip,
367        _ => unreachable!("wrong response to Request::Tip"),
368    };
369    tracing::info!(
370        ?tip,
371        ?max_checkpoint_height,
372        "initializing block verifier router"
373    );
374
375    let block = SemanticBlockVerifier::new(network, state_service.clone(), transaction.clone());
376    let checkpoint = CheckpointVerifier::from_checkpoint_list(list, network, tip, state_service);
377    let router = BlockVerifierRouter {
378        checkpoint,
379        max_checkpoint_height,
380        block,
381    };
382
383    let router = Buffer::new(BoxService::new(router), VERIFIER_BUFFER_BOUND);
384
385    let task_handles = BackgroundTaskHandles {
386        state_checkpoint_verify_handle,
387    };
388
389    (router, transaction, task_handles, max_checkpoint_height)
390}
391
392/// Parses the checkpoint list for `network` and `config`.
393/// Returns the checkpoint list and maximum checkpoint height.
394pub fn init_checkpoint_list(config: Config, network: &Network) -> (Arc<CheckpointList>, Height) {
395    // TODO: Zebra parses the checkpoint list three times at startup.
396    //       Instead, cache the checkpoint list for each `network`.
397    let list = network.checkpoint_list();
398
399    let max_checkpoint_height = if config.checkpoint_sync {
400        list.max_height()
401    } else {
402        list.min_height_in_range(network.mandatory_checkpoint_height()..)
403            .expect("hardcoded checkpoint list extends past canopy activation")
404    };
405
406    (list, max_checkpoint_height)
407}
408
409/// The background task handles for `zebra-consensus` verifier initialization.
410#[derive(Debug)]
411pub struct BackgroundTaskHandles {
412    /// A handle to the state checkpoint verify task.
413    /// Finishes when all the checkpoints are verified, or when the state tip is reached.
414    pub state_checkpoint_verify_handle: JoinHandle<()>,
415}
416
417/// Calls [`init`] with a closed mempool setup channel for conciseness in tests.
418///
419/// See [`init`] for more details.
420#[cfg(any(test, feature = "proptest-impl"))]
421pub async fn init_test<S>(
422    config: Config,
423    network: &Network,
424    state_service: S,
425) -> (
426    Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
427    Buffer<
428        BoxService<transaction::Request, transaction::Response, TransactionError>,
429        transaction::Request,
430    >,
431    BackgroundTaskHandles,
432    Height,
433)
434where
435    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
436    S::Future: Send + 'static,
437{
438    init(
439        config.clone(),
440        network,
441        state_service.clone(),
442        oneshot::channel::<
443            Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
444        >()
445        .1,
446    )
447    .await
448}