zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    convert,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24    time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::oneshot;
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36    block::{self, CountedHeader, HeightDiff},
37    diagnostic::{task::WaitForPanics, CodeTimer},
38    parameters::{Network, NetworkUpgrade},
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45    constants::{
46        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47    },
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        watch_receiver::WatchReceiver,
57    },
58    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
59    ReadResponse, Request, Response, SemanticallyVerifiedBlock, ValidateContextError,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod write;
74
75#[cfg(any(test, feature = "proptest-impl"))]
76pub mod arbitrary;
77
78#[cfg(test)]
79mod tests;
80
81pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
82use write::NonFinalizedWriteMessage;
83
84use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
85
86/// A read-write service for Zebra's cached blockchain state.
87///
88/// This service modifies and provides access to:
89/// - the non-finalized state: the ~100 most recent blocks.
90///   Zebra allows chain forks in the non-finalized state,
91///   stores it in memory, and re-downloads it when restarted.
92/// - the finalized state: older blocks that have many confirmations.
93///   Zebra stores the single best chain in the finalized state,
94///   and re-loads it from disk when restarted.
95///
96/// Read requests to this service are buffered, then processed concurrently.
97/// Block write requests are buffered, then queued, then processed in order by a separate task.
98///
99/// Most state users can get faster read responses using the [`ReadStateService`],
100/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
101///
102/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
103/// They can read the latest block directly, without queueing any requests.
104#[derive(Debug)]
105pub(crate) struct StateService {
106    // Configuration
107    //
108    /// The configured Zcash network.
109    network: Network,
110
111    /// The height that we start storing UTXOs from finalized blocks.
112    ///
113    /// This height should be lower than the last few checkpoints,
114    /// so the full verifier can verify UTXO spends from those blocks,
115    /// even if they haven't been committed to the finalized state yet.
116    full_verifier_utxo_lookahead: block::Height,
117
118    // Queued Blocks
119    //
120    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
121    /// These blocks are awaiting their parent blocks before they can do contextual verification.
122    non_finalized_state_queued_blocks: QueuedBlocks,
123
124    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
125    /// These blocks are awaiting their parent blocks before they can do contextual verification.
126    ///
127    /// Indexed by their parent block hash.
128    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
129
130    /// Channels to send blocks to the block write task.
131    block_write_sender: write::BlockWriteSender,
132
133    /// The [`block::Hash`] of the most recent block sent on
134    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
135    ///
136    /// On startup, this is:
137    /// - the finalized tip, if there are stored blocks, or
138    /// - the genesis block's parent hash, if the database is empty.
139    ///
140    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
141    /// - the hash of the last valid committed block (the parent of the invalid block).
142    finalized_block_write_last_sent_hash: block::Hash,
143
144    /// A set of block hashes that have been sent to the block write task.
145    /// Hashes of blocks below the finalized tip height are periodically pruned.
146    non_finalized_block_write_sent_hashes: SentHashes,
147
148    /// If an invalid block is sent on `finalized_block_write_sender`
149    /// or `non_finalized_block_write_sender`,
150    /// this channel gets the [`block::Hash`] of the valid tip.
151    //
152    // TODO: add tests for finalized and non-finalized resets (#2654)
153    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
154
155    // Pending UTXO Request Tracking
156    //
157    /// The set of outpoints with pending requests for their associated transparent::Output.
158    pending_utxos: PendingUtxos,
159
160    /// Instant tracking the last time `pending_utxos` was pruned.
161    last_prune: Instant,
162
163    // Updating Concurrently Readable State
164    //
165    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
166    ///
167    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
168    read_service: ReadStateService,
169
170    // Metrics
171    //
172    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
173    ///
174    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
175    /// as a break in the graph.
176    max_finalized_queue_height: f64,
177}
178
179/// A read-only service for accessing Zebra's cached blockchain state.
180///
181/// This service provides read-only access to:
182/// - the non-finalized state: the ~100 most recent blocks.
183/// - the finalized state: older blocks that have many confirmations.
184///
185/// Requests to this service are processed in parallel,
186/// ignoring any blocks queued by the read-write [`StateService`].
187///
188/// This quick response behavior is better for most state users.
189/// It allows other async tasks to make progress while concurrently reading data from disk.
190#[derive(Clone, Debug)]
191pub struct ReadStateService {
192    // Configuration
193    //
194    /// The configured Zcash network.
195    network: Network,
196
197    // Shared Concurrently Readable State
198    //
199    /// A watch channel with a cached copy of the [`NonFinalizedState`].
200    ///
201    /// This state is only updated between requests,
202    /// so it might include some block data that is also on `disk`.
203    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
204
205    /// The shared inner on-disk database for the finalized state.
206    ///
207    /// RocksDB allows reads and writes via a shared reference,
208    /// but [`ZebraDb`] doesn't expose any write methods or types.
209    ///
210    /// This chain is updated concurrently with requests,
211    /// so it might include some block data that is also in `best_mem`.
212    db: ZebraDb,
213
214    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
215    /// once the queues have received all their parent blocks.
216    ///
217    /// Used to check for panics when writing blocks.
218    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
219}
220
221impl Drop for StateService {
222    fn drop(&mut self) {
223        // The state service owns the state, tasks, and channels,
224        // so dropping it should shut down everything.
225
226        // Close the channels (non-blocking)
227        // This makes the block write thread exit the next time it checks the channels.
228        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
229        self.invalid_block_write_reset_receiver.close();
230
231        std::mem::drop(self.block_write_sender.finalized.take());
232        std::mem::drop(self.block_write_sender.non_finalized.take());
233
234        self.clear_finalized_block_queue(
235            "dropping the state: dropped unused finalized state queue block",
236        );
237        self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::from(
238            ValidateContextError::DroppedUnusedBlock,
239        ));
240
241        // Log database metrics before shutting down
242        info!("dropping the state: logging database metrics");
243        self.log_db_metrics();
244
245        // Then drop self.read_service, which checks the block write task for panics,
246        // and tries to shut down the database.
247    }
248}
249
250impl Drop for ReadStateService {
251    fn drop(&mut self) {
252        // The read state service shares the state,
253        // so dropping it should check if we can shut down.
254
255        // TODO: move this into a try_shutdown() method
256        if let Some(block_write_task) = self.block_write_task.take() {
257            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258                // We're the last database user, so we can tell it to shut down (blocking):
259                // - flushes the database to disk, and
260                // - drops the database, which cleans up any database tasks correctly.
261                self.db.shutdown(true);
262
263                // We are the last state with a reference to this thread, so we can
264                // wait until the block write task finishes, then check for panics (blocking).
265                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267                // This log is verbose during tests.
268                #[cfg(not(test))]
269                info!("waiting for the block write task to finish");
270                #[cfg(test)]
271                debug!("waiting for the block write task to finish");
272
273                // TODO: move this into a check_for_panics() method
274                if let Err(thread_panic) = block_write_task_handle.join() {
275                    std::panic::resume_unwind(thread_panic);
276                } else {
277                    debug!("shutting down the state because the block write task has finished");
278                }
279            }
280        } else {
281            // Even if we're not the last database user, try shutting it down.
282            //
283            // TODO: rename this to try_shutdown()?
284            self.db.shutdown(false);
285        }
286    }
287}
288
289impl StateService {
290    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292    /// Creates a new state service for the state `config` and `network`.
293    ///
294    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295    /// to work out when it is near the final checkpoint.
296    ///
297    /// Returns the read-write and read-only state services,
298    /// and read-only watch channels for its best chain tip.
299    pub async fn new(
300        config: Config,
301        network: &Network,
302        max_checkpoint_height: block::Height,
303        checkpoint_verify_concurrency_limit: usize,
304    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305        let (finalized_state, finalized_tip, timer) = {
306            let config = config.clone();
307            let network = network.clone();
308            tokio::task::spawn_blocking(move || {
309                let timer = CodeTimer::start();
310                let finalized_state = FinalizedState::new(
311                    &config,
312                    &network,
313                    #[cfg(feature = "elasticsearch")]
314                    true,
315                );
316                timer.finish(module_path!(), line!(), "opening finalized state database");
317
318                let timer = CodeTimer::start();
319                let finalized_tip = finalized_state.db.tip_block();
320
321                (finalized_state, finalized_tip, timer)
322            })
323            .await
324            .expect("failed to join blocking task")
325        };
326
327        // # Correctness
328        //
329        // The state service must set the finalized block write sender to `None`
330        // if there are blocks in the restored non-finalized state that are above
331        // the max checkpoint height so that non-finalized blocks can be written, otherwise,
332        // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
333        //
334        // The state service must not set the finalized block write sender to `None` if there
335        // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
336        // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
337        // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
338        let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
339            tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
340        } else {
341            false
342        };
343        let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
344            NonFinalizedState::new(network)
345                .with_backup(
346                    config.non_finalized_state_backup_dir(network),
347                    &finalized_state.db,
348                    is_finalized_tip_past_max_checkpoint,
349                )
350                .await;
351
352        let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
353        let initial_tip = non_finalized_state
354            .best_tip_block()
355            .map(|cv_block| cv_block.block.clone())
356            .or(finalized_tip)
357            .map(CheckpointVerifiedBlock::from)
358            .map(ChainTipBlock::from);
359
360        tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
361
362        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
363            ChainTipSender::new(initial_tip, network);
364
365        let finalized_state_for_writing = finalized_state.clone();
366        let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
367        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
368            write::BlockWriteSender::spawn(
369                finalized_state_for_writing,
370                non_finalized_state,
371                chain_tip_sender,
372                non_finalized_state_sender,
373                should_use_finalized_block_write_sender,
374            );
375
376        let read_service = ReadStateService::new(
377            &finalized_state,
378            block_write_task,
379            non_finalized_state_receiver,
380        );
381
382        let full_verifier_utxo_lookahead = max_checkpoint_height
383            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
384                .expect("fits in HeightDiff");
385        let full_verifier_utxo_lookahead =
386            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
387        let non_finalized_state_queued_blocks = QueuedBlocks::default();
388        let pending_utxos = PendingUtxos::default();
389
390        let finalized_block_write_last_sent_hash =
391            tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
392                .await
393                .expect("failed to join blocking task");
394
395        let state = Self {
396            network: network.clone(),
397            full_verifier_utxo_lookahead,
398            non_finalized_state_queued_blocks,
399            finalized_state_queued_blocks: HashMap::new(),
400            block_write_sender,
401            finalized_block_write_last_sent_hash,
402            non_finalized_block_write_sent_hashes,
403            invalid_block_write_reset_receiver,
404            pending_utxos,
405            last_prune: Instant::now(),
406            read_service: read_service.clone(),
407            max_finalized_queue_height: f64::NAN,
408        };
409        timer.finish(module_path!(), line!(), "initializing state service");
410
411        tracing::info!("starting legacy chain check");
412        let timer = CodeTimer::start();
413
414        if let (Some(tip), Some(nu5_activation_height)) = (
415            {
416                let read_state = state.read_service.clone();
417                tokio::task::spawn_blocking(move || read_state.best_tip())
418                    .await
419                    .expect("task should not panic")
420            },
421            NetworkUpgrade::Nu5.activation_height(network),
422        ) {
423            if let Err(error) = check::legacy_chain(
424                nu5_activation_height,
425                any_ancestor_blocks(
426                    &state.read_service.latest_non_finalized_state(),
427                    &state.read_service.db,
428                    tip.1,
429                ),
430                &state.network,
431                MAX_LEGACY_CHAIN_BLOCKS,
432            ) {
433                let legacy_db_path = state.read_service.db.path().to_path_buf();
434                panic!(
435                    "Cached state contains a legacy chain.\n\
436                     An outdated Zebra version did not know about a recent network upgrade,\n\
437                     so it followed a legacy chain using outdated consensus branch rules.\n\
438                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
439                     Database path: {legacy_db_path:?}\n\
440                     Error: {error:?}",
441                );
442            }
443        }
444
445        tracing::info!("cached state consensus branch is valid: no legacy chain found");
446        timer.finish(module_path!(), line!(), "legacy chain check");
447
448        (state, read_service, latest_chain_tip, chain_tip_change)
449    }
450
451    /// Call read only state service to log rocksdb database metrics.
452    pub fn log_db_metrics(&self) {
453        self.read_service.db.print_db_metrics();
454    }
455
456    /// Queue a checkpoint verified block for verification and storage in the finalized state.
457    ///
458    /// Returns a channel receiver that provides the result of the block commit.
459    fn queue_and_commit_to_finalized_state(
460        &mut self,
461        checkpoint_verified: CheckpointVerifiedBlock,
462    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
463        // # Correctness & Performance
464        //
465        // This method must not block, access the database, or perform CPU-intensive tasks,
466        // because it is called directly from the tokio executor's Future threads.
467
468        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
469        let queued_height = checkpoint_verified.height;
470
471        // If we're close to the final checkpoint, make the block's UTXOs available for
472        // semantic block verification, even when it is in the channel.
473        if self.is_close_to_final_checkpoint(queued_height) {
474            self.non_finalized_block_write_sent_hashes
475                .add_finalized(&checkpoint_verified)
476        }
477
478        let (rsp_tx, rsp_rx) = oneshot::channel();
479        let queued = (checkpoint_verified, rsp_tx);
480
481        if self.block_write_sender.finalized.is_some() {
482            // We're still committing checkpoint verified blocks
483            if let Some(duplicate_queued) = self
484                .finalized_state_queued_blocks
485                .insert(queued_prev_hash, queued)
486            {
487                Self::send_checkpoint_verified_block_error(
488                    duplicate_queued,
489                    "dropping older checkpoint verified block: got newer duplicate block",
490                );
491            }
492
493            self.drain_finalized_queue_and_commit();
494        } else {
495            // We've finished committing checkpoint verified blocks to the finalized state,
496            // so drop any repeated queued blocks, and return an error.
497            //
498            // TODO: track the latest sent height, and drop any blocks under that height
499            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
500            Self::send_checkpoint_verified_block_error(
501                queued,
502                "already finished committing checkpoint verified blocks: dropped duplicate block, \
503                 block is already committed to the state",
504            );
505
506            self.clear_finalized_block_queue(
507                "already finished committing checkpoint verified blocks: dropped duplicate block, \
508                 block is already committed to the state",
509            );
510        }
511
512        if self.finalized_state_queued_blocks.is_empty() {
513            self.max_finalized_queue_height = f64::NAN;
514        } else if self.max_finalized_queue_height.is_nan()
515            || self.max_finalized_queue_height < queued_height.0 as f64
516        {
517            // if there are still blocks in the queue, then either:
518            //   - the new block was lower than the old maximum, and there was a gap before it,
519            //     so the maximum is still the same (and we skip this code), or
520            //   - the new block is higher than the old maximum, and there is at least one gap
521            //     between the finalized tip and the new maximum
522            self.max_finalized_queue_height = queued_height.0 as f64;
523        }
524
525        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
526        metrics::gauge!("state.checkpoint.queued.block.count")
527            .set(self.finalized_state_queued_blocks.len() as f64);
528
529        rsp_rx
530    }
531
532    /// Finds finalized state queue blocks to be committed to the state in order,
533    /// removes them from the queue, and sends them to the block commit task.
534    ///
535    /// After queueing a finalized block, this method checks whether the newly
536    /// queued block (and any of its descendants) can be committed to the state.
537    ///
538    /// Returns an error if the block commit channel has been closed.
539    pub fn drain_finalized_queue_and_commit(&mut self) {
540        use tokio::sync::mpsc::error::{SendError, TryRecvError};
541
542        // # Correctness & Performance
543        //
544        // This method must not block, access the database, or perform CPU-intensive tasks,
545        // because it is called directly from the tokio executor's Future threads.
546
547        // If a block failed, we need to start again from a valid tip.
548        match self.invalid_block_write_reset_receiver.try_recv() {
549            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
550            Err(TryRecvError::Disconnected) => {
551                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
552                return;
553            }
554            // There are no errors, so we can just use the last block hash we sent
555            Err(TryRecvError::Empty) => {}
556        }
557
558        while let Some(queued_block) = self
559            .finalized_state_queued_blocks
560            .remove(&self.finalized_block_write_last_sent_hash)
561        {
562            let last_sent_finalized_block_height = queued_block.0.height;
563
564            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
565
566            // If we've finished sending finalized blocks, ignore any repeated blocks.
567            // (Blocks can be repeated after a syncer reset.)
568            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
569                let send_result = finalized_block_write_sender.send(queued_block);
570
571                // If the receiver is closed, we can't send any more blocks.
572                if let Err(SendError(queued)) = send_result {
573                    // If Zebra is shutting down, drop blocks and return an error.
574                    Self::send_checkpoint_verified_block_error(
575                        queued,
576                        "block commit task exited. Is Zebra shutting down?",
577                    );
578
579                    self.clear_finalized_block_queue(
580                        "block commit task exited. Is Zebra shutting down?",
581                    );
582                } else {
583                    metrics::gauge!("state.checkpoint.sent.block.height")
584                        .set(last_sent_finalized_block_height.0 as f64);
585                };
586            }
587        }
588    }
589
590    /// Drops all finalized state queue blocks, and sends an error on their result channels.
591    fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
592        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
593            Self::send_checkpoint_verified_block_error(queued, error.clone());
594        }
595    }
596
597    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
598    fn send_checkpoint_verified_block_error(
599        queued: QueuedCheckpointVerified,
600        error: impl Into<BoxError>,
601    ) {
602        let (finalized, rsp_tx) = queued;
603
604        // The block sender might have already given up on this block,
605        // so ignore any channel send errors.
606        let _ = rsp_tx.send(Err(error.into()));
607        std::mem::drop(finalized);
608    }
609
610    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
611    fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
612        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
613            Self::send_semantically_verified_block_error(queued, error.clone());
614        }
615    }
616
617    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
618    fn send_semantically_verified_block_error(
619        queued: QueuedSemanticallyVerified,
620        error: CommitSemanticallyVerifiedError,
621    ) {
622        let (finalized, rsp_tx) = queued;
623
624        // The block sender might have already given up on this block,
625        // so ignore any channel send errors.
626        let _ = rsp_tx.send(Err(error));
627        std::mem::drop(finalized);
628    }
629
630    /// Queue a semantically verified block for contextual verification and check if any queued
631    /// blocks are ready to be verified and committed to the state.
632    ///
633    /// This function encodes the logic for [committing non-finalized blocks][1]
634    /// in RFC0005.
635    ///
636    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
637    #[instrument(level = "debug", skip(self, semantically_verrified))]
638    fn queue_and_commit_to_non_finalized_state(
639        &mut self,
640        semantically_verrified: SemanticallyVerifiedBlock,
641    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
642        tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
643        let parent_hash = semantically_verrified.block.header.previous_block_hash;
644
645        if self
646            .non_finalized_block_write_sent_hashes
647            .contains(&semantically_verrified.hash)
648        {
649            let (rsp_tx, rsp_rx) = oneshot::channel();
650            let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
651                ValidateContextError::DuplicateCommitRequest {
652                    block_hash: semantically_verrified.hash,
653                },
654            )));
655            return rsp_rx;
656        }
657
658        if self
659            .read_service
660            .db
661            .contains_height(semantically_verrified.height)
662        {
663            let (rsp_tx, rsp_rx) = oneshot::channel();
664            let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
665                ValidateContextError::AlreadyFinalized {
666                    block_height: semantically_verrified.height,
667                },
668            )));
669            return rsp_rx;
670        }
671
672        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
673        // has been queued but not yet committed to the state fails the older request and replaces
674        // it with the newer request.
675        let rsp_rx = if let Some((_, old_rsp_tx)) = self
676            .non_finalized_state_queued_blocks
677            .get_mut(&semantically_verrified.hash)
678        {
679            tracing::debug!("replacing older queued request with new request");
680            let (mut rsp_tx, rsp_rx) = oneshot::channel();
681            std::mem::swap(old_rsp_tx, &mut rsp_tx);
682            let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
683                ValidateContextError::ReplacedByNewerRequest {
684                    block_hash: semantically_verrified.hash,
685                },
686            )));
687            rsp_rx
688        } else {
689            let (rsp_tx, rsp_rx) = oneshot::channel();
690            self.non_finalized_state_queued_blocks
691                .queue((semantically_verrified, rsp_tx));
692            rsp_rx
693        };
694
695        // We've finished sending checkpoint verified blocks when:
696        // - we've sent the verified block for the last checkpoint, and
697        // - it has been successfully written to disk.
698        //
699        // We detect the last checkpoint by looking for non-finalized blocks
700        // that are a child of the last block we sent.
701        //
702        // TODO: configure the state with the last checkpoint hash instead?
703        if self.block_write_sender.finalized.is_some()
704            && self
705                .non_finalized_state_queued_blocks
706                .has_queued_children(self.finalized_block_write_last_sent_hash)
707            && self.read_service.db.finalized_tip_hash()
708                == self.finalized_block_write_last_sent_hash
709        {
710            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
711            // and move on to committing semantically verified blocks to the non-finalized state.
712            std::mem::drop(self.block_write_sender.finalized.take());
713            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
714            self.non_finalized_block_write_sent_hashes = SentHashes::default();
715            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
716            self.non_finalized_block_write_sent_hashes
717                .can_fork_chain_at_hashes = true;
718            // Send blocks from non-finalized queue
719            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
720            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
721            self.clear_finalized_block_queue(
722                "already finished committing checkpoint verified blocks: dropped duplicate block, \
723                 block is already committed to the state",
724            );
725        } else if !self.can_fork_chain_at(&parent_hash) {
726            tracing::trace!("unready to verify, returning early");
727        } else if self.block_write_sender.finalized.is_none() {
728            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
729            self.send_ready_non_finalized_queued(parent_hash);
730
731            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
732                "Finalized state must have at least one block before committing non-finalized state",
733            );
734
735            self.non_finalized_state_queued_blocks
736                .prune_by_height(finalized_tip_height);
737
738            self.non_finalized_block_write_sent_hashes
739                .prune_by_height(finalized_tip_height);
740        }
741
742        rsp_rx
743    }
744
745    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
746    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
747        self.non_finalized_block_write_sent_hashes
748            .can_fork_chain_at(hash)
749            || &self.read_service.db.finalized_tip_hash() == hash
750    }
751
752    /// Returns `true` if `queued_height` is near the final checkpoint.
753    ///
754    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
755    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
756    ///
757    /// If it doesn't have the required UTXOs, some blocks will time out,
758    /// but succeed after a syncer restart.
759    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
760        queued_height >= self.full_verifier_utxo_lookahead
761    }
762
763    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
764    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
765    #[tracing::instrument(level = "debug", skip(self, new_parent))]
766    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
767        use tokio::sync::mpsc::error::SendError;
768        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
769            let mut new_parents: Vec<block::Hash> = vec![new_parent];
770
771            while let Some(parent_hash) = new_parents.pop() {
772                let queued_children = self
773                    .non_finalized_state_queued_blocks
774                    .dequeue_children(parent_hash);
775
776                for queued_child in queued_children {
777                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
778
779                    self.non_finalized_block_write_sent_hashes
780                        .add(&queued_child.0);
781                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
782
783                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
784                        // If Zebra is shutting down, drop blocks and return an error.
785                        Self::send_semantically_verified_block_error(
786                            queued,
787                            CommitSemanticallyVerifiedError::from(
788                                ValidateContextError::CommitTaskExited,
789                            ),
790                        );
791
792                        self.clear_non_finalized_block_queue(
793                            CommitSemanticallyVerifiedError::from(
794                                ValidateContextError::CommitTaskExited,
795                            ),
796                        );
797
798                        return;
799                    };
800
801                    new_parents.push(hash);
802                }
803            }
804
805            self.non_finalized_block_write_sent_hashes.finish_batch();
806        };
807    }
808
809    /// Return the tip of the current best chain.
810    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
811        self.read_service.best_tip()
812    }
813
814    fn send_invalidate_block(
815        &self,
816        hash: block::Hash,
817    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
818        let (rsp_tx, rsp_rx) = oneshot::channel();
819
820        let Some(sender) = &self.block_write_sender.non_finalized else {
821            let _ = rsp_tx.send(Err(
822                "cannot invalidate blocks while still committing checkpointed blocks".into(),
823            ));
824            return rsp_rx;
825        };
826
827        if let Err(tokio::sync::mpsc::error::SendError(error)) =
828            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
829        {
830            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
831                unreachable!("should return the same Invalidate message could not be sent");
832            };
833
834            let _ = rsp_tx.send(Err(
835                "failed to send invalidate block request to block write task".into(),
836            ));
837        }
838
839        rsp_rx
840    }
841
842    fn send_reconsider_block(
843        &self,
844        hash: block::Hash,
845    ) -> oneshot::Receiver<Result<Vec<block::Hash>, BoxError>> {
846        let (rsp_tx, rsp_rx) = oneshot::channel();
847
848        let Some(sender) = &self.block_write_sender.non_finalized else {
849            let _ = rsp_tx.send(Err(
850                "cannot reconsider blocks while still committing checkpointed blocks".into(),
851            ));
852            return rsp_rx;
853        };
854
855        if let Err(tokio::sync::mpsc::error::SendError(error)) =
856            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
857        {
858            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
859                unreachable!("should return the same Reconsider message could not be sent");
860            };
861
862            let _ = rsp_tx.send(Err(
863                "failed to send reconsider block request to block write task".into(),
864            ));
865        }
866
867        rsp_rx
868    }
869
870    /// Assert some assumptions about the semantically verified `block` before it is queued.
871    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
872        // required by `Request::CommitSemanticallyVerifiedBlock` call
873        assert!(
874            block.height > self.network.mandatory_checkpoint_height(),
875            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
876            blocks, and the canopy activation block, must be committed to the state as finalized \
877            blocks"
878        );
879    }
880}
881
882impl ReadStateService {
883    /// Creates a new read-only state service, using the provided finalized state and
884    /// block write task handle.
885    ///
886    /// Returns the newly created service,
887    /// and a watch channel for updating the shared recent non-finalized chain.
888    pub(crate) fn new(
889        finalized_state: &FinalizedState,
890        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
891        non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
892    ) -> Self {
893        let read_service = Self {
894            network: finalized_state.network(),
895            db: finalized_state.db.clone(),
896            non_finalized_state_receiver,
897            block_write_task,
898        };
899
900        tracing::debug!("created new read-only state service");
901
902        read_service
903    }
904
905    /// Return the tip of the current best chain.
906    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
907        read::best_tip(&self.latest_non_finalized_state(), &self.db)
908    }
909
910    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
911    fn latest_non_finalized_state(&self) -> NonFinalizedState {
912        self.non_finalized_state_receiver.cloned_watch_data()
913    }
914
915    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
916    #[allow(dead_code)]
917    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
918        self.latest_non_finalized_state().best_chain().cloned()
919    }
920
921    /// Test-only access to the inner database.
922    /// Can be used to modify the database without doing any consensus checks.
923    #[cfg(any(test, feature = "proptest-impl"))]
924    pub fn db(&self) -> &ZebraDb {
925        &self.db
926    }
927
928    /// Logs rocksdb metrics using the read only state service.
929    pub fn log_db_metrics(&self) {
930        self.db.print_db_metrics();
931    }
932}
933
934impl Service<Request> for StateService {
935    type Response = Response;
936    type Error = BoxError;
937    type Future =
938        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
939
940    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
941        // Check for panics in the block write task
942        let poll = self.read_service.poll_ready(cx);
943
944        // Prune outdated UTXO requests
945        let now = Instant::now();
946
947        if self.last_prune + Self::PRUNE_INTERVAL < now {
948            let tip = self.best_tip();
949            let old_len = self.pending_utxos.len();
950
951            self.pending_utxos.prune();
952            self.last_prune = now;
953
954            let new_len = self.pending_utxos.len();
955            let prune_count = old_len
956                .checked_sub(new_len)
957                .expect("prune does not add any utxo requests");
958            if prune_count > 0 {
959                tracing::debug!(
960                    ?old_len,
961                    ?new_len,
962                    ?prune_count,
963                    ?tip,
964                    "pruned utxo requests"
965                );
966            } else {
967                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
968            }
969        }
970
971        poll
972    }
973
974    #[instrument(name = "state", skip(self, req))]
975    fn call(&mut self, req: Request) -> Self::Future {
976        req.count_metric();
977        let timer = CodeTimer::start();
978        let span = Span::current();
979
980        match req {
981            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
982            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
983            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
984                self.assert_block_can_be_validated(&semantically_verified);
985
986                self.pending_utxos
987                    .check_against_ordered(&semantically_verified.new_outputs);
988
989                // # Performance
990                //
991                // Allow other async tasks to make progress while blocks are being verified
992                // and written to disk. But wait for the blocks to finish committing,
993                // so that `StateService` multi-block queries always observe a consistent state.
994                //
995                // Since each block is spawned into its own task,
996                // there shouldn't be any other code running in the same task,
997                // so we don't need to worry about blocking it:
998                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
999
1000                let rsp_rx = tokio::task::block_in_place(move || {
1001                    span.in_scope(|| {
1002                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
1003                    })
1004                });
1005
1006                // TODO:
1007                //   - check for panics in the block write task here,
1008                //     as well as in poll_ready()
1009
1010                // The work is all done, the future just waits on a channel for the result
1011                timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
1012
1013                // Await the channel response, mapping any receive error into a BoxError.
1014                // Then flatten the nested Result by converting the inner CommitSemanticallyVerifiedError into a BoxError.
1015                let span = Span::current();
1016                async move {
1017                    rsp_rx
1018                        .await
1019                        .map_err(|_recv_error| {
1020                            BoxError::from(CommitSemanticallyVerifiedError::from(
1021                                ValidateContextError::NotReadyToBeCommitted,
1022                            ))
1023                        })
1024                        // TODO: replace with Result::flatten once it stabilises
1025                        // https://github.com/rust-lang/rust/issues/70142
1026                        .and_then(|res| res.map_err(BoxError::from))
1027                        .map(Response::Committed)
1028                }
1029                .instrument(span)
1030                .boxed()
1031            }
1032
1033            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1034            // Accesses shared writeable state in the StateService.
1035            Request::CommitCheckpointVerifiedBlock(finalized) => {
1036                // # Consensus
1037                //
1038                // A semantic block verification could have called AwaitUtxo
1039                // before this checkpoint verified block arrived in the state.
1040                // So we need to check for pending UTXO requests sent by running
1041                // semantic block verifications.
1042                //
1043                // This check is redundant for most checkpoint verified blocks,
1044                // because semantic verification can only succeed near the final
1045                // checkpoint, when all the UTXOs are available for the verifying block.
1046                //
1047                // (Checkpoint block UTXOs are verified using block hash checkpoints
1048                // and transaction merkle tree block header commitments.)
1049                self.pending_utxos
1050                    .check_against_ordered(&finalized.new_outputs);
1051
1052                // # Performance
1053                //
1054                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1055                // so we can run it directly in the tokio executor's Future threads.
1056                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1057
1058                // TODO:
1059                //   - check for panics in the block write task here,
1060                //     as well as in poll_ready()
1061
1062                // The work is all done, the future just waits on a channel for the result
1063                timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1064
1065                async move {
1066                    rsp_rx
1067                        .await
1068                        .map_err(|_recv_error| {
1069                            BoxError::from("block was dropped from the queue of finalized blocks")
1070                        })
1071                        // TODO: replace with Result::flatten once it stabilises
1072                        // https://github.com/rust-lang/rust/issues/70142
1073                        .and_then(convert::identity)
1074                        .map(Response::Committed)
1075                }
1076                .instrument(span)
1077                .boxed()
1078            }
1079
1080            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1081            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1082            Request::AwaitUtxo(outpoint) => {
1083                // Prepare the AwaitUtxo future from PendingUxtos.
1084                let response_fut = self.pending_utxos.queue(outpoint);
1085                // Only instrument `response_fut`, the ReadStateService already
1086                // instruments its requests with the same span.
1087
1088                let response_fut = response_fut.instrument(span).boxed();
1089
1090                // Check the non-finalized block queue outside the returned future,
1091                // so we can access mutable state fields.
1092                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1093                    self.pending_utxos.respond(&outpoint, utxo);
1094
1095                    // We're finished, the returned future gets the UTXO from the respond() channel.
1096                    timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1097
1098                    return response_fut;
1099                }
1100
1101                // Check the sent non-finalized blocks
1102                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1103                    self.pending_utxos.respond(&outpoint, utxo);
1104
1105                    // We're finished, the returned future gets the UTXO from the respond() channel.
1106                    timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1107
1108                    return response_fut;
1109                }
1110
1111                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1112                // because it is only used during checkpoint verification.
1113                //
1114                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1115                // See #5126 for details.
1116
1117                // Manually send a request to the ReadStateService,
1118                // to get UTXOs from any non-finalized chain or the finalized chain.
1119                let read_service = self.read_service.clone();
1120
1121                // Run the request in an async block, so we can await the response.
1122                async move {
1123                    let req = ReadRequest::AnyChainUtxo(outpoint);
1124
1125                    let rsp = read_service.oneshot(req).await?;
1126
1127                    // Optional TODO:
1128                    //  - make pending_utxos.respond() async using a channel,
1129                    //    so we can respond to all waiting requests here
1130                    //
1131                    // This change is not required for correctness, because:
1132                    // - any waiting requests should have returned when the block was sent to the state
1133                    // - otherwise, the request returns immediately if:
1134                    //   - the block is in the non-finalized queue, or
1135                    //   - the block is in any non-finalized chain or the finalized state
1136                    //
1137                    // And if the block is in the finalized queue,
1138                    // that's rare enough that a retry is ok.
1139                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1140                        // We got a UTXO, so we replace the response future with the result own.
1141                        timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1142
1143                        return Ok(Response::Utxo(utxo));
1144                    }
1145
1146                    // We're finished, but the returned future is waiting on the respond() channel.
1147                    timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1148
1149                    response_fut.await
1150                }
1151                .boxed()
1152            }
1153
1154            // Used by sync, inbound, and block verifier to check if a block is already in the state
1155            // before downloading or validating it.
1156            Request::KnownBlock(hash) => {
1157                let timer = CodeTimer::start();
1158
1159                let read_service = self.read_service.clone();
1160
1161                async move {
1162                    let response = read::non_finalized_state_contains_block_hash(
1163                        &read_service.latest_non_finalized_state(),
1164                        hash,
1165                    )
1166                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1167
1168                    // The work is done in the future.
1169                    timer.finish(module_path!(), line!(), "Request::KnownBlock");
1170
1171                    Ok(Response::KnownBlock(response))
1172                }
1173                .boxed()
1174            }
1175
1176            Request::InvalidateBlock(block_hash) => {
1177                let rsp_rx = tokio::task::block_in_place(move || {
1178                    span.in_scope(|| self.send_invalidate_block(block_hash))
1179                });
1180
1181                let span = Span::current();
1182                async move {
1183                    rsp_rx
1184                        .await
1185                        .map_err(|_recv_error| {
1186                            BoxError::from("invalidate block request was unexpectedly dropped")
1187                        })
1188                        // TODO: replace with Result::flatten once it stabilises
1189                        // https://github.com/rust-lang/rust/issues/70142
1190                        .and_then(convert::identity)
1191                        .map(Response::Invalidated)
1192                }
1193                .instrument(span)
1194                .boxed()
1195            }
1196
1197            Request::ReconsiderBlock(block_hash) => {
1198                let rsp_rx = tokio::task::block_in_place(move || {
1199                    span.in_scope(|| self.send_reconsider_block(block_hash))
1200                });
1201
1202                let span = Span::current();
1203                async move {
1204                    rsp_rx
1205                        .await
1206                        .map_err(|_recv_error| {
1207                            BoxError::from("reconsider block request was unexpectedly dropped")
1208                        })
1209                        // TODO: replace with Result::flatten once it stabilises
1210                        // https://github.com/rust-lang/rust/issues/70142
1211                        .and_then(convert::identity)
1212                        .map(Response::Reconsidered)
1213                }
1214                .instrument(span)
1215                .boxed()
1216            }
1217
1218            // Runs concurrently using the ReadStateService
1219            Request::Tip
1220            | Request::Depth(_)
1221            | Request::BestChainNextMedianTimePast
1222            | Request::BestChainBlockHash(_)
1223            | Request::BlockLocator
1224            | Request::Transaction(_)
1225            | Request::UnspentBestChainUtxo(_)
1226            | Request::Block(_)
1227            | Request::BlockAndSize(_)
1228            | Request::BlockHeader(_)
1229            | Request::FindBlockHashes { .. }
1230            | Request::FindBlockHeaders { .. }
1231            | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1232                // Redirect the request to the concurrent ReadStateService
1233                let read_service = self.read_service.clone();
1234
1235                async move {
1236                    let req = req
1237                        .try_into()
1238                        .expect("ReadRequest conversion should not fail");
1239
1240                    let rsp = read_service.oneshot(req).await?;
1241                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1242
1243                    Ok(rsp)
1244                }
1245                .boxed()
1246            }
1247
1248            Request::CheckBlockProposalValidity(_) => {
1249                // Redirect the request to the concurrent ReadStateService
1250                let read_service = self.read_service.clone();
1251
1252                async move {
1253                    let req = req
1254                        .try_into()
1255                        .expect("ReadRequest conversion should not fail");
1256
1257                    let rsp = read_service.oneshot(req).await?;
1258                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1259
1260                    Ok(rsp)
1261                }
1262                .boxed()
1263            }
1264        }
1265    }
1266}
1267
1268impl Service<ReadRequest> for ReadStateService {
1269    type Response = ReadResponse;
1270    type Error = BoxError;
1271    type Future =
1272        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1273
1274    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1275        // Check for panics in the block write task
1276        //
1277        // TODO: move into a check_for_panics() method
1278        let block_write_task = self.block_write_task.take();
1279
1280        if let Some(block_write_task) = block_write_task {
1281            if block_write_task.is_finished() {
1282                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1283                    // We are the last state with a reference to this task, so we can propagate any panics
1284                    if let Err(thread_panic) = block_write_task.join() {
1285                        std::panic::resume_unwind(thread_panic);
1286                    }
1287                }
1288            } else {
1289                // It hasn't finished, so we need to put it back
1290                self.block_write_task = Some(block_write_task);
1291            }
1292        }
1293
1294        self.db.check_for_panics();
1295
1296        Poll::Ready(Ok(()))
1297    }
1298
1299    #[instrument(name = "read_state", skip(self, req))]
1300    fn call(&mut self, req: ReadRequest) -> Self::Future {
1301        req.count_metric();
1302        let timer = CodeTimer::start();
1303        let span = Span::current();
1304
1305        match req {
1306            // Used by the `getblockchaininfo` RPC.
1307            ReadRequest::UsageInfo => {
1308                let db = self.db.clone();
1309
1310                tokio::task::spawn_blocking(move || {
1311                    span.in_scope(move || {
1312                        // The work is done in the future.
1313
1314                        let db_size = db.size();
1315
1316                        timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1317
1318                        Ok(ReadResponse::UsageInfo(db_size))
1319                    })
1320                })
1321                .wait_for_panics()
1322            }
1323
1324            // Used by the StateService.
1325            ReadRequest::Tip => {
1326                let state = self.clone();
1327
1328                tokio::task::spawn_blocking(move || {
1329                    span.in_scope(move || {
1330                        let tip = state.non_finalized_state_receiver.with_watch_data(
1331                            |non_finalized_state| {
1332                                read::tip(non_finalized_state.best_chain(), &state.db)
1333                            },
1334                        );
1335
1336                        // The work is done in the future.
1337                        timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1338
1339                        Ok(ReadResponse::Tip(tip))
1340                    })
1341                })
1342                .wait_for_panics()
1343            }
1344
1345            // Used by `getblockchaininfo` RPC method.
1346            ReadRequest::TipPoolValues => {
1347                let state = self.clone();
1348
1349                tokio::task::spawn_blocking(move || {
1350                    span.in_scope(move || {
1351                        let tip_with_value_balance = state
1352                            .non_finalized_state_receiver
1353                            .with_watch_data(|non_finalized_state| {
1354                                read::tip_with_value_balance(
1355                                    non_finalized_state.best_chain(),
1356                                    &state.db,
1357                                )
1358                            });
1359
1360                        // The work is done in the future.
1361                        // TODO: Do this in the Drop impl with the variant name?
1362                        timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1363
1364                        let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1365                            .ok_or(BoxError::from("no chain tip available yet"))?;
1366
1367                        Ok(ReadResponse::TipPoolValues {
1368                            tip_height,
1369                            tip_hash,
1370                            value_balance,
1371                        })
1372                    })
1373                })
1374                .wait_for_panics()
1375            }
1376
1377            // Used by getblock
1378            ReadRequest::BlockInfo(hash_or_height) => {
1379                let state = self.clone();
1380
1381                tokio::task::spawn_blocking(move || {
1382                    span.in_scope(move || {
1383                        let value_balance = state.non_finalized_state_receiver.with_watch_data(
1384                            |non_finalized_state| {
1385                                read::block_info(
1386                                    non_finalized_state.best_chain(),
1387                                    &state.db,
1388                                    hash_or_height,
1389                                )
1390                            },
1391                        );
1392
1393                        // The work is done in the future.
1394                        // TODO: Do this in the Drop impl with the variant name?
1395                        timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1396
1397                        Ok(ReadResponse::BlockInfo(value_balance))
1398                    })
1399                })
1400                .wait_for_panics()
1401            }
1402
1403            // Used by the StateService.
1404            ReadRequest::Depth(hash) => {
1405                let state = self.clone();
1406
1407                tokio::task::spawn_blocking(move || {
1408                    span.in_scope(move || {
1409                        let depth = state.non_finalized_state_receiver.with_watch_data(
1410                            |non_finalized_state| {
1411                                read::depth(non_finalized_state.best_chain(), &state.db, hash)
1412                            },
1413                        );
1414
1415                        // The work is done in the future.
1416                        timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1417
1418                        Ok(ReadResponse::Depth(depth))
1419                    })
1420                })
1421                .wait_for_panics()
1422            }
1423
1424            // Used by the StateService.
1425            ReadRequest::BestChainNextMedianTimePast => {
1426                let state = self.clone();
1427
1428                tokio::task::spawn_blocking(move || {
1429                    span.in_scope(move || {
1430                        let non_finalized_state = state.latest_non_finalized_state();
1431                        let median_time_past =
1432                            read::next_median_time_past(&non_finalized_state, &state.db);
1433
1434                        // The work is done in the future.
1435                        timer.finish(
1436                            module_path!(),
1437                            line!(),
1438                            "ReadRequest::BestChainNextMedianTimePast",
1439                        );
1440
1441                        Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1442                    })
1443                })
1444                .wait_for_panics()
1445            }
1446
1447            // Used by the get_block (raw) RPC and the StateService.
1448            ReadRequest::Block(hash_or_height) => {
1449                let state = self.clone();
1450
1451                tokio::task::spawn_blocking(move || {
1452                    span.in_scope(move || {
1453                        let block = state.non_finalized_state_receiver.with_watch_data(
1454                            |non_finalized_state| {
1455                                read::block(
1456                                    non_finalized_state.best_chain(),
1457                                    &state.db,
1458                                    hash_or_height,
1459                                )
1460                            },
1461                        );
1462
1463                        // The work is done in the future.
1464                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1465
1466                        Ok(ReadResponse::Block(block))
1467                    })
1468                })
1469                .wait_for_panics()
1470            }
1471
1472            // Used by the get_block (raw) RPC and the StateService.
1473            ReadRequest::BlockAndSize(hash_or_height) => {
1474                let state = self.clone();
1475
1476                tokio::task::spawn_blocking(move || {
1477                    span.in_scope(move || {
1478                        let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1479                            |non_finalized_state| {
1480                                read::block_and_size(
1481                                    non_finalized_state.best_chain(),
1482                                    &state.db,
1483                                    hash_or_height,
1484                                )
1485                            },
1486                        );
1487
1488                        // The work is done in the future.
1489                        timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1490
1491                        Ok(ReadResponse::BlockAndSize(block_and_size))
1492                    })
1493                })
1494                .wait_for_panics()
1495            }
1496
1497            // Used by the get_block (verbose) RPC and the StateService.
1498            ReadRequest::BlockHeader(hash_or_height) => {
1499                let state = self.clone();
1500
1501                tokio::task::spawn_blocking(move || {
1502                    span.in_scope(move || {
1503                        let best_chain = state.latest_best_chain();
1504
1505                        let height = hash_or_height
1506                            .height_or_else(|hash| {
1507                                read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1508                            })
1509                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1510
1511                        let hash = hash_or_height
1512                            .hash_or_else(|height| {
1513                                read::find::hash_by_height(best_chain.clone(), &state.db, height)
1514                            })
1515                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1516
1517                        let next_height = height.next()?;
1518                        let next_block_hash =
1519                            read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1520
1521                        let header = read::block_header(best_chain, &state.db, height.into())
1522                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1523
1524                        // The work is done in the future.
1525                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1526
1527                        Ok(ReadResponse::BlockHeader {
1528                            header,
1529                            hash,
1530                            height,
1531                            next_block_hash,
1532                        })
1533                    })
1534                })
1535                .wait_for_panics()
1536            }
1537
1538            // For the get_raw_transaction RPC and the StateService.
1539            ReadRequest::Transaction(hash) => {
1540                let state = self.clone();
1541
1542                tokio::task::spawn_blocking(move || {
1543                    span.in_scope(move || {
1544                        let response =
1545                            read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1546
1547                        // The work is done in the future.
1548                        timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1549
1550                        Ok(ReadResponse::Transaction(response))
1551                    })
1552                })
1553                .wait_for_panics()
1554            }
1555
1556            // Used by the getblock (verbose) RPC.
1557            ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1558                let state = self.clone();
1559
1560                tokio::task::spawn_blocking(move || {
1561                    span.in_scope(move || {
1562                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1563                            |non_finalized_state| {
1564                                read::transaction_hashes_for_block(
1565                                    non_finalized_state.best_chain(),
1566                                    &state.db,
1567                                    hash_or_height,
1568                                )
1569                            },
1570                        );
1571
1572                        // The work is done in the future.
1573                        timer.finish(
1574                            module_path!(),
1575                            line!(),
1576                            "ReadRequest::TransactionIdsForBlock",
1577                        );
1578
1579                        Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1580                    })
1581                })
1582                .wait_for_panics()
1583            }
1584
1585            #[cfg(feature = "indexer")]
1586            ReadRequest::SpendingTransactionId(spend) => {
1587                let state = self.clone();
1588
1589                tokio::task::spawn_blocking(move || {
1590                    span.in_scope(move || {
1591                        let spending_transaction_id = state
1592                            .non_finalized_state_receiver
1593                            .with_watch_data(|non_finalized_state| {
1594                                read::spending_transaction_hash(
1595                                    non_finalized_state.best_chain(),
1596                                    &state.db,
1597                                    spend,
1598                                )
1599                            });
1600
1601                        // The work is done in the future.
1602                        timer.finish(
1603                            module_path!(),
1604                            line!(),
1605                            "ReadRequest::TransactionIdForSpentOutPoint",
1606                        );
1607
1608                        Ok(ReadResponse::TransactionId(spending_transaction_id))
1609                    })
1610                })
1611                .wait_for_panics()
1612            }
1613
1614            ReadRequest::UnspentBestChainUtxo(outpoint) => {
1615                let state = self.clone();
1616
1617                tokio::task::spawn_blocking(move || {
1618                    span.in_scope(move || {
1619                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1620                            |non_finalized_state| {
1621                                read::unspent_utxo(
1622                                    non_finalized_state.best_chain(),
1623                                    &state.db,
1624                                    outpoint,
1625                                )
1626                            },
1627                        );
1628
1629                        // The work is done in the future.
1630                        timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1631
1632                        Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1633                    })
1634                })
1635                .wait_for_panics()
1636            }
1637
1638            // Manually used by the StateService to implement part of AwaitUtxo.
1639            ReadRequest::AnyChainUtxo(outpoint) => {
1640                let state = self.clone();
1641
1642                tokio::task::spawn_blocking(move || {
1643                    span.in_scope(move || {
1644                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1645                            |non_finalized_state| {
1646                                read::any_utxo(non_finalized_state, &state.db, outpoint)
1647                            },
1648                        );
1649
1650                        // The work is done in the future.
1651                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1652
1653                        Ok(ReadResponse::AnyChainUtxo(utxo))
1654                    })
1655                })
1656                .wait_for_panics()
1657            }
1658
1659            // Used by the StateService.
1660            ReadRequest::BlockLocator => {
1661                let state = self.clone();
1662
1663                tokio::task::spawn_blocking(move || {
1664                    span.in_scope(move || {
1665                        let block_locator = state.non_finalized_state_receiver.with_watch_data(
1666                            |non_finalized_state| {
1667                                read::block_locator(non_finalized_state.best_chain(), &state.db)
1668                            },
1669                        );
1670
1671                        // The work is done in the future.
1672                        timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1673
1674                        Ok(ReadResponse::BlockLocator(
1675                            block_locator.unwrap_or_default(),
1676                        ))
1677                    })
1678                })
1679                .wait_for_panics()
1680            }
1681
1682            // Used by the StateService.
1683            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1684                let state = self.clone();
1685
1686                tokio::task::spawn_blocking(move || {
1687                    span.in_scope(move || {
1688                        let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1689                            |non_finalized_state| {
1690                                read::find_chain_hashes(
1691                                    non_finalized_state.best_chain(),
1692                                    &state.db,
1693                                    known_blocks,
1694                                    stop,
1695                                    MAX_FIND_BLOCK_HASHES_RESULTS,
1696                                )
1697                            },
1698                        );
1699
1700                        // The work is done in the future.
1701                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1702
1703                        Ok(ReadResponse::BlockHashes(block_hashes))
1704                    })
1705                })
1706                .wait_for_panics()
1707            }
1708
1709            // Used by the StateService.
1710            ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1711                let state = self.clone();
1712
1713                tokio::task::spawn_blocking(move || {
1714                    span.in_scope(move || {
1715                        let block_headers = state.non_finalized_state_receiver.with_watch_data(
1716                            |non_finalized_state| {
1717                                read::find_chain_headers(
1718                                    non_finalized_state.best_chain(),
1719                                    &state.db,
1720                                    known_blocks,
1721                                    stop,
1722                                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1723                                )
1724                            },
1725                        );
1726
1727                        let block_headers = block_headers
1728                            .into_iter()
1729                            .map(|header| CountedHeader { header })
1730                            .collect();
1731
1732                        // The work is done in the future.
1733                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1734
1735                        Ok(ReadResponse::BlockHeaders(block_headers))
1736                    })
1737                })
1738                .wait_for_panics()
1739            }
1740
1741            ReadRequest::SaplingTree(hash_or_height) => {
1742                let state = self.clone();
1743
1744                tokio::task::spawn_blocking(move || {
1745                    span.in_scope(move || {
1746                        let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1747                            |non_finalized_state| {
1748                                read::sapling_tree(
1749                                    non_finalized_state.best_chain(),
1750                                    &state.db,
1751                                    hash_or_height,
1752                                )
1753                            },
1754                        );
1755
1756                        // The work is done in the future.
1757                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1758
1759                        Ok(ReadResponse::SaplingTree(sapling_tree))
1760                    })
1761                })
1762                .wait_for_panics()
1763            }
1764
1765            ReadRequest::OrchardTree(hash_or_height) => {
1766                let state = self.clone();
1767
1768                tokio::task::spawn_blocking(move || {
1769                    span.in_scope(move || {
1770                        let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1771                            |non_finalized_state| {
1772                                read::orchard_tree(
1773                                    non_finalized_state.best_chain(),
1774                                    &state.db,
1775                                    hash_or_height,
1776                                )
1777                            },
1778                        );
1779
1780                        // The work is done in the future.
1781                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1782
1783                        Ok(ReadResponse::OrchardTree(orchard_tree))
1784                    })
1785                })
1786                .wait_for_panics()
1787            }
1788
1789            ReadRequest::SaplingSubtrees { start_index, limit } => {
1790                let state = self.clone();
1791
1792                tokio::task::spawn_blocking(move || {
1793                    span.in_scope(move || {
1794                        let end_index = limit
1795                            .and_then(|limit| start_index.0.checked_add(limit.0))
1796                            .map(NoteCommitmentSubtreeIndex);
1797
1798                        let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1799                            |non_finalized_state| {
1800                                if let Some(end_index) = end_index {
1801                                    read::sapling_subtrees(
1802                                        non_finalized_state.best_chain(),
1803                                        &state.db,
1804                                        start_index..end_index,
1805                                    )
1806                                } else {
1807                                    // If there is no end bound, just return all the trees.
1808                                    // If the end bound would overflow, just returns all the trees, because that's what
1809                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1810                                    // the trees run out.)
1811                                    read::sapling_subtrees(
1812                                        non_finalized_state.best_chain(),
1813                                        &state.db,
1814                                        start_index..,
1815                                    )
1816                                }
1817                            },
1818                        );
1819
1820                        // The work is done in the future.
1821                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1822
1823                        Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1824                    })
1825                })
1826                .wait_for_panics()
1827            }
1828
1829            ReadRequest::OrchardSubtrees { start_index, limit } => {
1830                let state = self.clone();
1831
1832                tokio::task::spawn_blocking(move || {
1833                    span.in_scope(move || {
1834                        let end_index = limit
1835                            .and_then(|limit| start_index.0.checked_add(limit.0))
1836                            .map(NoteCommitmentSubtreeIndex);
1837
1838                        let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1839                            |non_finalized_state| {
1840                                if let Some(end_index) = end_index {
1841                                    read::orchard_subtrees(
1842                                        non_finalized_state.best_chain(),
1843                                        &state.db,
1844                                        start_index..end_index,
1845                                    )
1846                                } else {
1847                                    // If there is no end bound, just return all the trees.
1848                                    // If the end bound would overflow, just returns all the trees, because that's what
1849                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1850                                    // the trees run out.)
1851                                    read::orchard_subtrees(
1852                                        non_finalized_state.best_chain(),
1853                                        &state.db,
1854                                        start_index..,
1855                                    )
1856                                }
1857                            },
1858                        );
1859
1860                        // The work is done in the future.
1861                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1862
1863                        Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1864                    })
1865                })
1866                .wait_for_panics()
1867            }
1868
1869            // For the get_address_balance RPC.
1870            ReadRequest::AddressBalance(addresses) => {
1871                let state = self.clone();
1872
1873                tokio::task::spawn_blocking(move || {
1874                    span.in_scope(move || {
1875                        let (balance, received) = state
1876                            .non_finalized_state_receiver
1877                            .with_watch_data(|non_finalized_state| {
1878                                read::transparent_balance(
1879                                    non_finalized_state.best_chain().cloned(),
1880                                    &state.db,
1881                                    addresses,
1882                                )
1883                            })?;
1884
1885                        // The work is done in the future.
1886                        timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1887
1888                        Ok(ReadResponse::AddressBalance { balance, received })
1889                    })
1890                })
1891                .wait_for_panics()
1892            }
1893
1894            // For the get_address_tx_ids RPC.
1895            ReadRequest::TransactionIdsByAddresses {
1896                addresses,
1897                height_range,
1898            } => {
1899                let state = self.clone();
1900
1901                tokio::task::spawn_blocking(move || {
1902                    span.in_scope(move || {
1903                        let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1904                            |non_finalized_state| {
1905                                read::transparent_tx_ids(
1906                                    non_finalized_state.best_chain(),
1907                                    &state.db,
1908                                    addresses,
1909                                    height_range,
1910                                )
1911                            },
1912                        );
1913
1914                        // The work is done in the future.
1915                        timer.finish(
1916                            module_path!(),
1917                            line!(),
1918                            "ReadRequest::TransactionIdsByAddresses",
1919                        );
1920
1921                        tx_ids.map(ReadResponse::AddressesTransactionIds)
1922                    })
1923                })
1924                .wait_for_panics()
1925            }
1926
1927            // For the get_address_utxos RPC.
1928            ReadRequest::UtxosByAddresses(addresses) => {
1929                let state = self.clone();
1930
1931                tokio::task::spawn_blocking(move || {
1932                    span.in_scope(move || {
1933                        let utxos = state.non_finalized_state_receiver.with_watch_data(
1934                            |non_finalized_state| {
1935                                read::address_utxos(
1936                                    &state.network,
1937                                    non_finalized_state.best_chain(),
1938                                    &state.db,
1939                                    addresses,
1940                                )
1941                            },
1942                        );
1943
1944                        // The work is done in the future.
1945                        timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1946
1947                        utxos.map(ReadResponse::AddressUtxos)
1948                    })
1949                })
1950                .wait_for_panics()
1951            }
1952
1953            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1954                let state = self.clone();
1955
1956                tokio::task::spawn_blocking(move || {
1957                    span.in_scope(move || {
1958                        let latest_non_finalized_best_chain =
1959                            state.latest_non_finalized_state().best_chain().cloned();
1960
1961                        check::nullifier::tx_no_duplicates_in_chain(
1962                            &state.db,
1963                            latest_non_finalized_best_chain.as_ref(),
1964                            &unmined_tx.transaction,
1965                        )?;
1966
1967                        check::anchors::tx_anchors_refer_to_final_treestates(
1968                            &state.db,
1969                            latest_non_finalized_best_chain.as_ref(),
1970                            &unmined_tx,
1971                        )?;
1972
1973                        // The work is done in the future.
1974                        timer.finish(
1975                            module_path!(),
1976                            line!(),
1977                            "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1978                        );
1979
1980                        Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1981                    })
1982                })
1983                .wait_for_panics()
1984            }
1985
1986            // Used by the get_block and get_block_hash RPCs.
1987            ReadRequest::BestChainBlockHash(height) => {
1988                let state = self.clone();
1989
1990                // # Performance
1991                //
1992                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1993
1994                tokio::task::spawn_blocking(move || {
1995                    span.in_scope(move || {
1996                        let hash = state.non_finalized_state_receiver.with_watch_data(
1997                            |non_finalized_state| {
1998                                read::hash_by_height(
1999                                    non_finalized_state.best_chain(),
2000                                    &state.db,
2001                                    height,
2002                                )
2003                            },
2004                        );
2005
2006                        // The work is done in the future.
2007                        timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2008
2009                        Ok(ReadResponse::BlockHash(hash))
2010                    })
2011                })
2012                .wait_for_panics()
2013            }
2014
2015            // Used by get_block_template and getblockchaininfo RPCs.
2016            ReadRequest::ChainInfo => {
2017                let state = self.clone();
2018                let latest_non_finalized_state = self.latest_non_finalized_state();
2019
2020                // # Performance
2021                //
2022                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2023
2024                tokio::task::spawn_blocking(move || {
2025                    span.in_scope(move || {
2026                        // # Correctness
2027                        //
2028                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2029                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2030                        //
2031                        // If there is a large overlap between the non-finalized and finalized states,
2032                        // where the finalized tip is above the non-finalized tip,
2033                        // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2034                        //
2035                        // In that case, the `getblocktemplate` RPC will return an error because Zebra
2036                        // is not synced to the tip. That check happens before the RPC makes this request.
2037                        let get_block_template_info =
2038                            read::difficulty::get_block_template_chain_info(
2039                                &latest_non_finalized_state,
2040                                &state.db,
2041                                &state.network,
2042                            );
2043
2044                        // The work is done in the future.
2045                        timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2046
2047                        get_block_template_info.map(ReadResponse::ChainInfo)
2048                    })
2049                })
2050                .wait_for_panics()
2051            }
2052
2053            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2054            ReadRequest::SolutionRate { num_blocks, height } => {
2055                let state = self.clone();
2056
2057                // # Performance
2058                //
2059                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2060
2061                tokio::task::spawn_blocking(move || {
2062                    span.in_scope(move || {
2063                        let latest_non_finalized_state = state.latest_non_finalized_state();
2064                        // # Correctness
2065                        //
2066                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2067                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2068                        //
2069                        // The worst that can happen here is that the default `start_hash` will be below
2070                        // the chain tip.
2071                        let (tip_height, tip_hash) =
2072                            match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2073                                Some(tip_hash) => tip_hash,
2074                                None => return Ok(ReadResponse::SolutionRate(None)),
2075                            };
2076
2077                        let start_hash = match height {
2078                            Some(height) if height < tip_height => read::hash_by_height(
2079                                latest_non_finalized_state.best_chain(),
2080                                &state.db,
2081                                height,
2082                            ),
2083                            // use the chain tip hash if height is above it or not provided.
2084                            _ => Some(tip_hash),
2085                        };
2086
2087                        let solution_rate = start_hash.and_then(|start_hash| {
2088                            read::difficulty::solution_rate(
2089                                &latest_non_finalized_state,
2090                                &state.db,
2091                                num_blocks,
2092                                start_hash,
2093                            )
2094                        });
2095
2096                        // The work is done in the future.
2097                        timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2098
2099                        Ok(ReadResponse::SolutionRate(solution_rate))
2100                    })
2101                })
2102                .wait_for_panics()
2103            }
2104
2105            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2106                let state = self.clone();
2107
2108                // # Performance
2109                //
2110                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2111
2112                tokio::task::spawn_blocking(move || {
2113                    span.in_scope(move || {
2114                        tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2115                        let mut latest_non_finalized_state = state.latest_non_finalized_state();
2116
2117                        // The previous block of a valid proposal must be on the best chain tip.
2118                        let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2119                            return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2120                        };
2121
2122                        if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2123                            return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2124                        }
2125
2126                        // This clone of the non-finalized state is dropped when this closure returns.
2127                        // The non-finalized state that's used in the rest of the state (including finalizing
2128                        // blocks into the db) is not mutated here.
2129                        //
2130                        // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2131                        latest_non_finalized_state.disable_metrics();
2132
2133                        write::validate_and_commit_non_finalized(
2134                            &state.db,
2135                            &mut latest_non_finalized_state,
2136                            semantically_verified,
2137                        )?;
2138
2139                        // The work is done in the future.
2140                        timer.finish(
2141                            module_path!(),
2142                            line!(),
2143                            "ReadRequest::CheckBlockProposalValidity",
2144                        );
2145
2146                        Ok(ReadResponse::ValidBlockProposal)
2147                    })
2148                })
2149                .wait_for_panics()
2150            }
2151
2152            ReadRequest::TipBlockSize => {
2153                let state = self.clone();
2154
2155                tokio::task::spawn_blocking(move || {
2156                    span.in_scope(move || {
2157                        // Get the best chain tip height.
2158                        let tip_height = state
2159                            .non_finalized_state_receiver
2160                            .with_watch_data(|non_finalized_state| {
2161                                read::tip_height(non_finalized_state.best_chain(), &state.db)
2162                            })
2163                            .unwrap_or(Height(0));
2164
2165                        // Get the block at the best chain tip height.
2166                        let block = state.non_finalized_state_receiver.with_watch_data(
2167                            |non_finalized_state| {
2168                                read::block(
2169                                    non_finalized_state.best_chain(),
2170                                    &state.db,
2171                                    tip_height.into(),
2172                                )
2173                            },
2174                        );
2175
2176                        // The work is done in the future.
2177                        timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2178
2179                        // Respond with the length of the obtained block if any.
2180                        match block {
2181                            Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2182                                b.zcash_serialize_to_vec()?.len(),
2183                            ))),
2184                            None => Ok(ReadResponse::TipBlockSize(None)),
2185                        }
2186                    })
2187                })
2188                .wait_for_panics()
2189            }
2190
2191            ReadRequest::NonFinalizedBlocksListener => {
2192                // The non-finalized blocks listener is used to notify the state service
2193                // about new blocks that have been added to the non-finalized state.
2194                let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2195                    self.network.clone(),
2196                    self.non_finalized_state_receiver.clone(),
2197                );
2198
2199                async move {
2200                    timer.finish(
2201                        module_path!(),
2202                        line!(),
2203                        "ReadRequest::NonFinalizedBlocksListener",
2204                    );
2205
2206                    Ok(ReadResponse::NonFinalizedBlocksListener(
2207                        non_finalized_blocks_listener,
2208                    ))
2209                }
2210                .boxed()
2211            }
2212        }
2213    }
2214}
2215
2216/// Initialize a state service from the provided [`Config`].
2217/// Returns a boxed state service, a read-only state service,
2218/// and receivers for state chain tip updates.
2219///
2220/// Each `network` has its own separate on-disk database.
2221///
2222/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2223/// to work out when it is near the final checkpoint.
2224///
2225/// To share access to the state, wrap the returned service in a `Buffer`,
2226/// or clone the returned [`ReadStateService`].
2227///
2228/// It's possible to construct multiple state services in the same application (as
2229/// long as they, e.g., use different storage locations), but doing so is
2230/// probably not what you want.
2231pub async fn init(
2232    config: Config,
2233    network: &Network,
2234    max_checkpoint_height: block::Height,
2235    checkpoint_verify_concurrency_limit: usize,
2236) -> (
2237    BoxService<Request, Response, BoxError>,
2238    ReadStateService,
2239    LatestChainTip,
2240    ChainTipChange,
2241) {
2242    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2243        StateService::new(
2244            config,
2245            network,
2246            max_checkpoint_height,
2247            checkpoint_verify_concurrency_limit,
2248        )
2249        .await;
2250
2251    (
2252        BoxService::new(state_service),
2253        read_only_state_service,
2254        latest_chain_tip,
2255        chain_tip_change,
2256    )
2257}
2258
2259/// Initialize a read state service from the provided [`Config`].
2260/// Returns a read-only state service,
2261///
2262/// Each `network` has its own separate on-disk database.
2263///
2264/// To share access to the state, clone the returned [`ReadStateService`].
2265pub fn init_read_only(
2266    config: Config,
2267    network: &Network,
2268) -> (
2269    ReadStateService,
2270    ZebraDb,
2271    tokio::sync::watch::Sender<NonFinalizedState>,
2272) {
2273    let finalized_state = FinalizedState::new_with_debug(
2274        &config,
2275        network,
2276        true,
2277        #[cfg(feature = "elasticsearch")]
2278        false,
2279        true,
2280    );
2281    let (non_finalized_state_sender, non_finalized_state_receiver) =
2282        tokio::sync::watch::channel(NonFinalizedState::new(network));
2283
2284    (
2285        ReadStateService::new(
2286            &finalized_state,
2287            None,
2288            WatchReceiver::new(non_finalized_state_receiver),
2289        ),
2290        finalized_state.db.clone(),
2291        non_finalized_state_sender,
2292    )
2293}
2294
2295/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2296/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2297pub fn spawn_init_read_only(
2298    config: Config,
2299    network: &Network,
2300) -> tokio::task::JoinHandle<(
2301    ReadStateService,
2302    ZebraDb,
2303    tokio::sync::watch::Sender<NonFinalizedState>,
2304)> {
2305    let network = network.clone();
2306    tokio::task::spawn_blocking(move || init_read_only(config, &network))
2307}
2308
2309/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2310///
2311/// This can be used to create a state service for testing. See also [`init`].
2312#[cfg(any(test, feature = "proptest-impl"))]
2313pub async fn init_test(
2314    network: &Network,
2315) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2316    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2317    //       if we ever need to test final checkpoint sent UTXO queries
2318    let (state_service, _, _, _) =
2319        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2320
2321    Buffer::new(BoxService::new(state_service), 1)
2322}
2323
2324/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2325/// then returns the read-write service, read-only service, and tip watch channels.
2326///
2327/// This can be used to create a state service for testing. See also [`init`].
2328#[cfg(any(test, feature = "proptest-impl"))]
2329pub async fn init_test_services(
2330    network: &Network,
2331) -> (
2332    Buffer<BoxService<Request, Response, BoxError>, Request>,
2333    ReadStateService,
2334    LatestChainTip,
2335    ChainTipChange,
2336) {
2337    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2338    //       if we ever need to test final checkpoint sent UTXO queries
2339    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2340        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2341
2342    let state_service = Buffer::new(BoxService::new(state_service), 1);
2343
2344    (
2345        state_service,
2346        read_state_service,
2347        latest_chain_tip,
2348        chain_tip_change,
2349    )
2350}