zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    convert,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24    time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::{oneshot, watch};
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36    block::{self, CountedHeader, HeightDiff},
37    diagnostic::{task::WaitForPanics, CodeTimer},
38    parameters::{Network, NetworkUpgrade},
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45    constants::{
46        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47    },
48    response::NonFinalizedBlocksListener,
49    service::{
50        block_iter::any_ancestor_blocks,
51        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52        finalized_state::{FinalizedState, ZebraDb},
53        non_finalized_state::{Chain, NonFinalizedState},
54        pending_utxos::PendingUtxos,
55        queued_blocks::QueuedBlocks,
56        watch_receiver::WatchReceiver,
57    },
58    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
59    ReadResponse, Request, Response, SemanticallyVerifiedBlock, ValidateContextError,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod write;
74
75#[cfg(any(test, feature = "proptest-impl"))]
76pub mod arbitrary;
77
78#[cfg(test)]
79mod tests;
80
81pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
82use write::NonFinalizedWriteMessage;
83
84use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
85
86/// A read-write service for Zebra's cached blockchain state.
87///
88/// This service modifies and provides access to:
89/// - the non-finalized state: the ~100 most recent blocks.
90///   Zebra allows chain forks in the non-finalized state,
91///   stores it in memory, and re-downloads it when restarted.
92/// - the finalized state: older blocks that have many confirmations.
93///   Zebra stores the single best chain in the finalized state,
94///   and re-loads it from disk when restarted.
95///
96/// Read requests to this service are buffered, then processed concurrently.
97/// Block write requests are buffered, then queued, then processed in order by a separate task.
98///
99/// Most state users can get faster read responses using the [`ReadStateService`],
100/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
101///
102/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
103/// They can read the latest block directly, without queueing any requests.
104#[derive(Debug)]
105pub(crate) struct StateService {
106    // Configuration
107    //
108    /// The configured Zcash network.
109    network: Network,
110
111    /// The height that we start storing UTXOs from finalized blocks.
112    ///
113    /// This height should be lower than the last few checkpoints,
114    /// so the full verifier can verify UTXO spends from those blocks,
115    /// even if they haven't been committed to the finalized state yet.
116    full_verifier_utxo_lookahead: block::Height,
117
118    // Queued Blocks
119    //
120    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
121    /// These blocks are awaiting their parent blocks before they can do contextual verification.
122    non_finalized_state_queued_blocks: QueuedBlocks,
123
124    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
125    /// These blocks are awaiting their parent blocks before they can do contextual verification.
126    ///
127    /// Indexed by their parent block hash.
128    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
129
130    /// Channels to send blocks to the block write task.
131    block_write_sender: write::BlockWriteSender,
132
133    /// The [`block::Hash`] of the most recent block sent on
134    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
135    ///
136    /// On startup, this is:
137    /// - the finalized tip, if there are stored blocks, or
138    /// - the genesis block's parent hash, if the database is empty.
139    ///
140    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
141    /// - the hash of the last valid committed block (the parent of the invalid block).
142    finalized_block_write_last_sent_hash: block::Hash,
143
144    /// A set of block hashes that have been sent to the block write task.
145    /// Hashes of blocks below the finalized tip height are periodically pruned.
146    non_finalized_block_write_sent_hashes: SentHashes,
147
148    /// If an invalid block is sent on `finalized_block_write_sender`
149    /// or `non_finalized_block_write_sender`,
150    /// this channel gets the [`block::Hash`] of the valid tip.
151    //
152    // TODO: add tests for finalized and non-finalized resets (#2654)
153    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
154
155    // Pending UTXO Request Tracking
156    //
157    /// The set of outpoints with pending requests for their associated transparent::Output.
158    pending_utxos: PendingUtxos,
159
160    /// Instant tracking the last time `pending_utxos` was pruned.
161    last_prune: Instant,
162
163    // Updating Concurrently Readable State
164    //
165    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
166    ///
167    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
168    read_service: ReadStateService,
169
170    // Metrics
171    //
172    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
173    ///
174    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
175    /// as a break in the graph.
176    max_finalized_queue_height: f64,
177}
178
179/// A read-only service for accessing Zebra's cached blockchain state.
180///
181/// This service provides read-only access to:
182/// - the non-finalized state: the ~100 most recent blocks.
183/// - the finalized state: older blocks that have many confirmations.
184///
185/// Requests to this service are processed in parallel,
186/// ignoring any blocks queued by the read-write [`StateService`].
187///
188/// This quick response behavior is better for most state users.
189/// It allows other async tasks to make progress while concurrently reading data from disk.
190#[derive(Clone, Debug)]
191pub struct ReadStateService {
192    // Configuration
193    //
194    /// The configured Zcash network.
195    network: Network,
196
197    // Shared Concurrently Readable State
198    //
199    /// A watch channel with a cached copy of the [`NonFinalizedState`].
200    ///
201    /// This state is only updated between requests,
202    /// so it might include some block data that is also on `disk`.
203    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
204
205    /// The shared inner on-disk database for the finalized state.
206    ///
207    /// RocksDB allows reads and writes via a shared reference,
208    /// but [`ZebraDb`] doesn't expose any write methods or types.
209    ///
210    /// This chain is updated concurrently with requests,
211    /// so it might include some block data that is also in `best_mem`.
212    db: ZebraDb,
213
214    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
215    /// once the queues have received all their parent blocks.
216    ///
217    /// Used to check for panics when writing blocks.
218    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
219}
220
221impl Drop for StateService {
222    fn drop(&mut self) {
223        // The state service owns the state, tasks, and channels,
224        // so dropping it should shut down everything.
225
226        // Close the channels (non-blocking)
227        // This makes the block write thread exit the next time it checks the channels.
228        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
229        self.invalid_block_write_reset_receiver.close();
230
231        std::mem::drop(self.block_write_sender.finalized.take());
232        std::mem::drop(self.block_write_sender.non_finalized.take());
233
234        self.clear_finalized_block_queue(
235            "dropping the state: dropped unused finalized state queue block",
236        );
237        self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::from(
238            ValidateContextError::DroppedUnusedBlock,
239        ));
240
241        // Log database metrics before shutting down
242        info!("dropping the state: logging database metrics");
243        self.log_db_metrics();
244
245        // Then drop self.read_service, which checks the block write task for panics,
246        // and tries to shut down the database.
247    }
248}
249
250impl Drop for ReadStateService {
251    fn drop(&mut self) {
252        // The read state service shares the state,
253        // so dropping it should check if we can shut down.
254
255        // TODO: move this into a try_shutdown() method
256        if let Some(block_write_task) = self.block_write_task.take() {
257            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258                // We're the last database user, so we can tell it to shut down (blocking):
259                // - flushes the database to disk, and
260                // - drops the database, which cleans up any database tasks correctly.
261                self.db.shutdown(true);
262
263                // We are the last state with a reference to this thread, so we can
264                // wait until the block write task finishes, then check for panics (blocking).
265                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267                // This log is verbose during tests.
268                #[cfg(not(test))]
269                info!("waiting for the block write task to finish");
270                #[cfg(test)]
271                debug!("waiting for the block write task to finish");
272
273                // TODO: move this into a check_for_panics() method
274                if let Err(thread_panic) = block_write_task_handle.join() {
275                    std::panic::resume_unwind(thread_panic);
276                } else {
277                    debug!("shutting down the state because the block write task has finished");
278                }
279            }
280        } else {
281            // Even if we're not the last database user, try shutting it down.
282            //
283            // TODO: rename this to try_shutdown()?
284            self.db.shutdown(false);
285        }
286    }
287}
288
289impl StateService {
290    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292    /// Creates a new state service for the state `config` and `network`.
293    ///
294    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295    /// to work out when it is near the final checkpoint.
296    ///
297    /// Returns the read-write and read-only state services,
298    /// and read-only watch channels for its best chain tip.
299    pub fn new(
300        config: Config,
301        network: &Network,
302        max_checkpoint_height: block::Height,
303        checkpoint_verify_concurrency_limit: usize,
304    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305        let timer = CodeTimer::start();
306        let finalized_state = FinalizedState::new(
307            &config,
308            network,
309            #[cfg(feature = "elasticsearch")]
310            true,
311        );
312        timer.finish(module_path!(), line!(), "opening finalized state database");
313
314        let timer = CodeTimer::start();
315        let initial_tip = finalized_state
316            .db
317            .tip_block()
318            .map(CheckpointVerifiedBlock::from)
319            .map(ChainTipBlock::from);
320
321        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
322            ChainTipSender::new(initial_tip, network);
323
324        let non_finalized_state = NonFinalizedState::new(network);
325
326        let (non_finalized_state_sender, non_finalized_state_receiver) =
327            watch::channel(NonFinalizedState::new(&finalized_state.network()));
328
329        let finalized_state_for_writing = finalized_state.clone();
330        let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
331            write::BlockWriteSender::spawn(
332                finalized_state_for_writing,
333                non_finalized_state,
334                chain_tip_sender,
335                non_finalized_state_sender,
336            );
337
338        let read_service = ReadStateService::new(
339            &finalized_state,
340            block_write_task,
341            non_finalized_state_receiver,
342        );
343
344        let full_verifier_utxo_lookahead = max_checkpoint_height
345            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
346                .expect("fits in HeightDiff");
347        let full_verifier_utxo_lookahead =
348            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
349        let non_finalized_state_queued_blocks = QueuedBlocks::default();
350        let pending_utxos = PendingUtxos::default();
351
352        let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash();
353
354        let state = Self {
355            network: network.clone(),
356            full_verifier_utxo_lookahead,
357            non_finalized_state_queued_blocks,
358            finalized_state_queued_blocks: HashMap::new(),
359            block_write_sender,
360            finalized_block_write_last_sent_hash,
361            non_finalized_block_write_sent_hashes: SentHashes::default(),
362            invalid_block_write_reset_receiver,
363            pending_utxos,
364            last_prune: Instant::now(),
365            read_service: read_service.clone(),
366            max_finalized_queue_height: f64::NAN,
367        };
368        timer.finish(module_path!(), line!(), "initializing state service");
369
370        tracing::info!("starting legacy chain check");
371        let timer = CodeTimer::start();
372
373        if let (Some(tip), Some(nu5_activation_height)) = (
374            state.best_tip(),
375            NetworkUpgrade::Nu5.activation_height(network),
376        ) {
377            if let Err(error) = check::legacy_chain(
378                nu5_activation_height,
379                any_ancestor_blocks(
380                    &state.read_service.latest_non_finalized_state(),
381                    &state.read_service.db,
382                    tip.1,
383                ),
384                &state.network,
385                MAX_LEGACY_CHAIN_BLOCKS,
386            ) {
387                let legacy_db_path = state.read_service.db.path().to_path_buf();
388                panic!(
389                    "Cached state contains a legacy chain.\n\
390                     An outdated Zebra version did not know about a recent network upgrade,\n\
391                     so it followed a legacy chain using outdated consensus branch rules.\n\
392                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
393                     Database path: {legacy_db_path:?}\n\
394                     Error: {error:?}",
395                );
396            }
397        }
398
399        tracing::info!("cached state consensus branch is valid: no legacy chain found");
400        timer.finish(module_path!(), line!(), "legacy chain check");
401
402        (state, read_service, latest_chain_tip, chain_tip_change)
403    }
404
405    /// Call read only state service to log rocksdb database metrics.
406    pub fn log_db_metrics(&self) {
407        self.read_service.db.print_db_metrics();
408    }
409
410    /// Queue a checkpoint verified block for verification and storage in the finalized state.
411    ///
412    /// Returns a channel receiver that provides the result of the block commit.
413    fn queue_and_commit_to_finalized_state(
414        &mut self,
415        checkpoint_verified: CheckpointVerifiedBlock,
416    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
417        // # Correctness & Performance
418        //
419        // This method must not block, access the database, or perform CPU-intensive tasks,
420        // because it is called directly from the tokio executor's Future threads.
421
422        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
423        let queued_height = checkpoint_verified.height;
424
425        // If we're close to the final checkpoint, make the block's UTXOs available for
426        // semantic block verification, even when it is in the channel.
427        if self.is_close_to_final_checkpoint(queued_height) {
428            self.non_finalized_block_write_sent_hashes
429                .add_finalized(&checkpoint_verified)
430        }
431
432        let (rsp_tx, rsp_rx) = oneshot::channel();
433        let queued = (checkpoint_verified, rsp_tx);
434
435        if self.block_write_sender.finalized.is_some() {
436            // We're still committing checkpoint verified blocks
437            if let Some(duplicate_queued) = self
438                .finalized_state_queued_blocks
439                .insert(queued_prev_hash, queued)
440            {
441                Self::send_checkpoint_verified_block_error(
442                    duplicate_queued,
443                    "dropping older checkpoint verified block: got newer duplicate block",
444                );
445            }
446
447            self.drain_finalized_queue_and_commit();
448        } else {
449            // We've finished committing checkpoint verified blocks to the finalized state,
450            // so drop any repeated queued blocks, and return an error.
451            //
452            // TODO: track the latest sent height, and drop any blocks under that height
453            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
454            Self::send_checkpoint_verified_block_error(
455                queued,
456                "already finished committing checkpoint verified blocks: dropped duplicate block, \
457                 block is already committed to the state",
458            );
459
460            self.clear_finalized_block_queue(
461                "already finished committing checkpoint verified blocks: dropped duplicate block, \
462                 block is already committed to the state",
463            );
464        }
465
466        if self.finalized_state_queued_blocks.is_empty() {
467            self.max_finalized_queue_height = f64::NAN;
468        } else if self.max_finalized_queue_height.is_nan()
469            || self.max_finalized_queue_height < queued_height.0 as f64
470        {
471            // if there are still blocks in the queue, then either:
472            //   - the new block was lower than the old maximum, and there was a gap before it,
473            //     so the maximum is still the same (and we skip this code), or
474            //   - the new block is higher than the old maximum, and there is at least one gap
475            //     between the finalized tip and the new maximum
476            self.max_finalized_queue_height = queued_height.0 as f64;
477        }
478
479        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
480        metrics::gauge!("state.checkpoint.queued.block.count")
481            .set(self.finalized_state_queued_blocks.len() as f64);
482
483        rsp_rx
484    }
485
486    /// Finds finalized state queue blocks to be committed to the state in order,
487    /// removes them from the queue, and sends them to the block commit task.
488    ///
489    /// After queueing a finalized block, this method checks whether the newly
490    /// queued block (and any of its descendants) can be committed to the state.
491    ///
492    /// Returns an error if the block commit channel has been closed.
493    pub fn drain_finalized_queue_and_commit(&mut self) {
494        use tokio::sync::mpsc::error::{SendError, TryRecvError};
495
496        // # Correctness & Performance
497        //
498        // This method must not block, access the database, or perform CPU-intensive tasks,
499        // because it is called directly from the tokio executor's Future threads.
500
501        // If a block failed, we need to start again from a valid tip.
502        match self.invalid_block_write_reset_receiver.try_recv() {
503            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
504            Err(TryRecvError::Disconnected) => {
505                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
506                return;
507            }
508            // There are no errors, so we can just use the last block hash we sent
509            Err(TryRecvError::Empty) => {}
510        }
511
512        while let Some(queued_block) = self
513            .finalized_state_queued_blocks
514            .remove(&self.finalized_block_write_last_sent_hash)
515        {
516            let last_sent_finalized_block_height = queued_block.0.height;
517
518            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
519
520            // If we've finished sending finalized blocks, ignore any repeated blocks.
521            // (Blocks can be repeated after a syncer reset.)
522            if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
523                let send_result = finalized_block_write_sender.send(queued_block);
524
525                // If the receiver is closed, we can't send any more blocks.
526                if let Err(SendError(queued)) = send_result {
527                    // If Zebra is shutting down, drop blocks and return an error.
528                    Self::send_checkpoint_verified_block_error(
529                        queued,
530                        "block commit task exited. Is Zebra shutting down?",
531                    );
532
533                    self.clear_finalized_block_queue(
534                        "block commit task exited. Is Zebra shutting down?",
535                    );
536                } else {
537                    metrics::gauge!("state.checkpoint.sent.block.height")
538                        .set(last_sent_finalized_block_height.0 as f64);
539                };
540            }
541        }
542    }
543
544    /// Drops all finalized state queue blocks, and sends an error on their result channels.
545    fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
546        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
547            Self::send_checkpoint_verified_block_error(queued, error.clone());
548        }
549    }
550
551    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
552    fn send_checkpoint_verified_block_error(
553        queued: QueuedCheckpointVerified,
554        error: impl Into<BoxError>,
555    ) {
556        let (finalized, rsp_tx) = queued;
557
558        // The block sender might have already given up on this block,
559        // so ignore any channel send errors.
560        let _ = rsp_tx.send(Err(error.into()));
561        std::mem::drop(finalized);
562    }
563
564    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
565    fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
566        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
567            Self::send_semantically_verified_block_error(queued, error.clone());
568        }
569    }
570
571    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
572    fn send_semantically_verified_block_error(
573        queued: QueuedSemanticallyVerified,
574        error: CommitSemanticallyVerifiedError,
575    ) {
576        let (finalized, rsp_tx) = queued;
577
578        // The block sender might have already given up on this block,
579        // so ignore any channel send errors.
580        let _ = rsp_tx.send(Err(error));
581        std::mem::drop(finalized);
582    }
583
584    /// Queue a semantically verified block for contextual verification and check if any queued
585    /// blocks are ready to be verified and committed to the state.
586    ///
587    /// This function encodes the logic for [committing non-finalized blocks][1]
588    /// in RFC0005.
589    ///
590    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
591    #[instrument(level = "debug", skip(self, semantically_verrified))]
592    fn queue_and_commit_to_non_finalized_state(
593        &mut self,
594        semantically_verrified: SemanticallyVerifiedBlock,
595    ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
596        tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
597        let parent_hash = semantically_verrified.block.header.previous_block_hash;
598
599        if self
600            .non_finalized_block_write_sent_hashes
601            .contains(&semantically_verrified.hash)
602        {
603            let (rsp_tx, rsp_rx) = oneshot::channel();
604            let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
605                ValidateContextError::DuplicateCommitRequest {
606                    block_hash: semantically_verrified.hash,
607                },
608            )));
609            return rsp_rx;
610        }
611
612        if self
613            .read_service
614            .db
615            .contains_height(semantically_verrified.height)
616        {
617            let (rsp_tx, rsp_rx) = oneshot::channel();
618            let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
619                ValidateContextError::AlreadyFinalized {
620                    block_height: semantically_verrified.height,
621                },
622            )));
623            return rsp_rx;
624        }
625
626        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
627        // has been queued but not yet committed to the state fails the older request and replaces
628        // it with the newer request.
629        let rsp_rx = if let Some((_, old_rsp_tx)) = self
630            .non_finalized_state_queued_blocks
631            .get_mut(&semantically_verrified.hash)
632        {
633            tracing::debug!("replacing older queued request with new request");
634            let (mut rsp_tx, rsp_rx) = oneshot::channel();
635            std::mem::swap(old_rsp_tx, &mut rsp_tx);
636            let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
637                ValidateContextError::ReplacedByNewerRequest {
638                    block_hash: semantically_verrified.hash,
639                },
640            )));
641            rsp_rx
642        } else {
643            let (rsp_tx, rsp_rx) = oneshot::channel();
644            self.non_finalized_state_queued_blocks
645                .queue((semantically_verrified, rsp_tx));
646            rsp_rx
647        };
648
649        // We've finished sending checkpoint verified blocks when:
650        // - we've sent the verified block for the last checkpoint, and
651        // - it has been successfully written to disk.
652        //
653        // We detect the last checkpoint by looking for non-finalized blocks
654        // that are a child of the last block we sent.
655        //
656        // TODO: configure the state with the last checkpoint hash instead?
657        if self.block_write_sender.finalized.is_some()
658            && self
659                .non_finalized_state_queued_blocks
660                .has_queued_children(self.finalized_block_write_last_sent_hash)
661            && self.read_service.db.finalized_tip_hash()
662                == self.finalized_block_write_last_sent_hash
663        {
664            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
665            // and move on to committing semantically verified blocks to the non-finalized state.
666            std::mem::drop(self.block_write_sender.finalized.take());
667            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
668            self.non_finalized_block_write_sent_hashes = SentHashes::default();
669            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
670            self.non_finalized_block_write_sent_hashes
671                .can_fork_chain_at_hashes = true;
672            // Send blocks from non-finalized queue
673            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
674            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
675            self.clear_finalized_block_queue(
676                "already finished committing checkpoint verified blocks: dropped duplicate block, \
677                 block is already committed to the state",
678            );
679        } else if !self.can_fork_chain_at(&parent_hash) {
680            tracing::trace!("unready to verify, returning early");
681        } else if self.block_write_sender.finalized.is_none() {
682            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
683            self.send_ready_non_finalized_queued(parent_hash);
684
685            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
686                "Finalized state must have at least one block before committing non-finalized state",
687            );
688
689            self.non_finalized_state_queued_blocks
690                .prune_by_height(finalized_tip_height);
691
692            self.non_finalized_block_write_sent_hashes
693                .prune_by_height(finalized_tip_height);
694        }
695
696        rsp_rx
697    }
698
699    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
700    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
701        self.non_finalized_block_write_sent_hashes
702            .can_fork_chain_at(hash)
703            || &self.read_service.db.finalized_tip_hash() == hash
704    }
705
706    /// Returns `true` if `queued_height` is near the final checkpoint.
707    ///
708    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
709    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
710    ///
711    /// If it doesn't have the required UTXOs, some blocks will time out,
712    /// but succeed after a syncer restart.
713    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
714        queued_height >= self.full_verifier_utxo_lookahead
715    }
716
717    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
718    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
719    #[tracing::instrument(level = "debug", skip(self, new_parent))]
720    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
721        use tokio::sync::mpsc::error::SendError;
722        if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
723            let mut new_parents: Vec<block::Hash> = vec![new_parent];
724
725            while let Some(parent_hash) = new_parents.pop() {
726                let queued_children = self
727                    .non_finalized_state_queued_blocks
728                    .dequeue_children(parent_hash);
729
730                for queued_child in queued_children {
731                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
732
733                    self.non_finalized_block_write_sent_hashes
734                        .add(&queued_child.0);
735                    let send_result = non_finalized_block_write_sender.send(queued_child.into());
736
737                    if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
738                        // If Zebra is shutting down, drop blocks and return an error.
739                        Self::send_semantically_verified_block_error(
740                            queued,
741                            CommitSemanticallyVerifiedError::from(
742                                ValidateContextError::CommitTaskExited,
743                            ),
744                        );
745
746                        self.clear_non_finalized_block_queue(
747                            CommitSemanticallyVerifiedError::from(
748                                ValidateContextError::CommitTaskExited,
749                            ),
750                        );
751
752                        return;
753                    };
754
755                    new_parents.push(hash);
756                }
757            }
758
759            self.non_finalized_block_write_sent_hashes.finish_batch();
760        };
761    }
762
763    /// Return the tip of the current best chain.
764    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
765        self.read_service.best_tip()
766    }
767
768    fn send_invalidate_block(
769        &self,
770        hash: block::Hash,
771    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
772        let (rsp_tx, rsp_rx) = oneshot::channel();
773
774        let Some(sender) = &self.block_write_sender.non_finalized else {
775            let _ = rsp_tx.send(Err(
776                "cannot invalidate blocks while still committing checkpointed blocks".into(),
777            ));
778            return rsp_rx;
779        };
780
781        if let Err(tokio::sync::mpsc::error::SendError(error)) =
782            sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
783        {
784            let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
785                unreachable!("should return the same Invalidate message could not be sent");
786            };
787
788            let _ = rsp_tx.send(Err(
789                "failed to send invalidate block request to block write task".into(),
790            ));
791        }
792
793        rsp_rx
794    }
795
796    fn send_reconsider_block(
797        &self,
798        hash: block::Hash,
799    ) -> oneshot::Receiver<Result<Vec<block::Hash>, BoxError>> {
800        let (rsp_tx, rsp_rx) = oneshot::channel();
801
802        let Some(sender) = &self.block_write_sender.non_finalized else {
803            let _ = rsp_tx.send(Err(
804                "cannot reconsider blocks while still committing checkpointed blocks".into(),
805            ));
806            return rsp_rx;
807        };
808
809        if let Err(tokio::sync::mpsc::error::SendError(error)) =
810            sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
811        {
812            let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
813                unreachable!("should return the same Reconsider message could not be sent");
814            };
815
816            let _ = rsp_tx.send(Err(
817                "failed to send reconsider block request to block write task".into(),
818            ));
819        }
820
821        rsp_rx
822    }
823
824    /// Assert some assumptions about the semantically verified `block` before it is queued.
825    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
826        // required by `Request::CommitSemanticallyVerifiedBlock` call
827        assert!(
828            block.height > self.network.mandatory_checkpoint_height(),
829            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
830            blocks, and the canopy activation block, must be committed to the state as finalized \
831            blocks"
832        );
833    }
834}
835
836impl ReadStateService {
837    /// Creates a new read-only state service, using the provided finalized state and
838    /// block write task handle.
839    ///
840    /// Returns the newly created service,
841    /// and a watch channel for updating the shared recent non-finalized chain.
842    pub(crate) fn new(
843        finalized_state: &FinalizedState,
844        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
845        non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
846    ) -> Self {
847        let read_service = Self {
848            network: finalized_state.network(),
849            db: finalized_state.db.clone(),
850            non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
851            block_write_task,
852        };
853
854        tracing::debug!("created new read-only state service");
855
856        read_service
857    }
858
859    /// Return the tip of the current best chain.
860    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
861        read::best_tip(&self.latest_non_finalized_state(), &self.db)
862    }
863
864    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
865    fn latest_non_finalized_state(&self) -> NonFinalizedState {
866        self.non_finalized_state_receiver.cloned_watch_data()
867    }
868
869    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
870    #[allow(dead_code)]
871    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
872        self.latest_non_finalized_state().best_chain().cloned()
873    }
874
875    /// Test-only access to the inner database.
876    /// Can be used to modify the database without doing any consensus checks.
877    #[cfg(any(test, feature = "proptest-impl"))]
878    pub fn db(&self) -> &ZebraDb {
879        &self.db
880    }
881
882    /// Logs rocksdb metrics using the read only state service.
883    pub fn log_db_metrics(&self) {
884        self.db.print_db_metrics();
885    }
886}
887
888impl Service<Request> for StateService {
889    type Response = Response;
890    type Error = BoxError;
891    type Future =
892        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
893
894    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
895        // Check for panics in the block write task
896        let poll = self.read_service.poll_ready(cx);
897
898        // Prune outdated UTXO requests
899        let now = Instant::now();
900
901        if self.last_prune + Self::PRUNE_INTERVAL < now {
902            let tip = self.best_tip();
903            let old_len = self.pending_utxos.len();
904
905            self.pending_utxos.prune();
906            self.last_prune = now;
907
908            let new_len = self.pending_utxos.len();
909            let prune_count = old_len
910                .checked_sub(new_len)
911                .expect("prune does not add any utxo requests");
912            if prune_count > 0 {
913                tracing::debug!(
914                    ?old_len,
915                    ?new_len,
916                    ?prune_count,
917                    ?tip,
918                    "pruned utxo requests"
919                );
920            } else {
921                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
922            }
923        }
924
925        poll
926    }
927
928    #[instrument(name = "state", skip(self, req))]
929    fn call(&mut self, req: Request) -> Self::Future {
930        req.count_metric();
931        let timer = CodeTimer::start();
932        let span = Span::current();
933
934        match req {
935            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
936            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
937            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
938                self.assert_block_can_be_validated(&semantically_verified);
939
940                self.pending_utxos
941                    .check_against_ordered(&semantically_verified.new_outputs);
942
943                // # Performance
944                //
945                // Allow other async tasks to make progress while blocks are being verified
946                // and written to disk. But wait for the blocks to finish committing,
947                // so that `StateService` multi-block queries always observe a consistent state.
948                //
949                // Since each block is spawned into its own task,
950                // there shouldn't be any other code running in the same task,
951                // so we don't need to worry about blocking it:
952                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
953
954                let rsp_rx = tokio::task::block_in_place(move || {
955                    span.in_scope(|| {
956                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
957                    })
958                });
959
960                // TODO:
961                //   - check for panics in the block write task here,
962                //     as well as in poll_ready()
963
964                // The work is all done, the future just waits on a channel for the result
965                timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
966
967                // Await the channel response, mapping any receive error into a BoxError.
968                // Then flatten the nested Result by converting the inner CommitSemanticallyVerifiedError into a BoxError.
969                let span = Span::current();
970                async move {
971                    rsp_rx
972                        .await
973                        .map_err(|_recv_error| {
974                            BoxError::from(CommitSemanticallyVerifiedError::from(
975                                ValidateContextError::NotReadyToBeCommitted,
976                            ))
977                        })
978                        // TODO: replace with Result::flatten once it stabilises
979                        // https://github.com/rust-lang/rust/issues/70142
980                        .and_then(|res| res.map_err(BoxError::from))
981                        .map(Response::Committed)
982                }
983                .instrument(span)
984                .boxed()
985            }
986
987            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
988            // Accesses shared writeable state in the StateService.
989            Request::CommitCheckpointVerifiedBlock(finalized) => {
990                // # Consensus
991                //
992                // A semantic block verification could have called AwaitUtxo
993                // before this checkpoint verified block arrived in the state.
994                // So we need to check for pending UTXO requests sent by running
995                // semantic block verifications.
996                //
997                // This check is redundant for most checkpoint verified blocks,
998                // because semantic verification can only succeed near the final
999                // checkpoint, when all the UTXOs are available for the verifying block.
1000                //
1001                // (Checkpoint block UTXOs are verified using block hash checkpoints
1002                // and transaction merkle tree block header commitments.)
1003                self.pending_utxos
1004                    .check_against_ordered(&finalized.new_outputs);
1005
1006                // # Performance
1007                //
1008                // This method doesn't block, access the database, or perform CPU-intensive tasks,
1009                // so we can run it directly in the tokio executor's Future threads.
1010                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1011
1012                // TODO:
1013                //   - check for panics in the block write task here,
1014                //     as well as in poll_ready()
1015
1016                // The work is all done, the future just waits on a channel for the result
1017                timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1018
1019                async move {
1020                    rsp_rx
1021                        .await
1022                        .map_err(|_recv_error| {
1023                            BoxError::from("block was dropped from the queue of finalized blocks")
1024                        })
1025                        // TODO: replace with Result::flatten once it stabilises
1026                        // https://github.com/rust-lang/rust/issues/70142
1027                        .and_then(convert::identity)
1028                        .map(Response::Committed)
1029                }
1030                .instrument(span)
1031                .boxed()
1032            }
1033
1034            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1035            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1036            Request::AwaitUtxo(outpoint) => {
1037                // Prepare the AwaitUtxo future from PendingUxtos.
1038                let response_fut = self.pending_utxos.queue(outpoint);
1039                // Only instrument `response_fut`, the ReadStateService already
1040                // instruments its requests with the same span.
1041
1042                let response_fut = response_fut.instrument(span).boxed();
1043
1044                // Check the non-finalized block queue outside the returned future,
1045                // so we can access mutable state fields.
1046                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1047                    self.pending_utxos.respond(&outpoint, utxo);
1048
1049                    // We're finished, the returned future gets the UTXO from the respond() channel.
1050                    timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1051
1052                    return response_fut;
1053                }
1054
1055                // Check the sent non-finalized blocks
1056                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1057                    self.pending_utxos.respond(&outpoint, utxo);
1058
1059                    // We're finished, the returned future gets the UTXO from the respond() channel.
1060                    timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1061
1062                    return response_fut;
1063                }
1064
1065                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1066                // because it is only used during checkpoint verification.
1067                //
1068                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1069                // See #5126 for details.
1070
1071                // Manually send a request to the ReadStateService,
1072                // to get UTXOs from any non-finalized chain or the finalized chain.
1073                let read_service = self.read_service.clone();
1074
1075                // Run the request in an async block, so we can await the response.
1076                async move {
1077                    let req = ReadRequest::AnyChainUtxo(outpoint);
1078
1079                    let rsp = read_service.oneshot(req).await?;
1080
1081                    // Optional TODO:
1082                    //  - make pending_utxos.respond() async using a channel,
1083                    //    so we can respond to all waiting requests here
1084                    //
1085                    // This change is not required for correctness, because:
1086                    // - any waiting requests should have returned when the block was sent to the state
1087                    // - otherwise, the request returns immediately if:
1088                    //   - the block is in the non-finalized queue, or
1089                    //   - the block is in any non-finalized chain or the finalized state
1090                    //
1091                    // And if the block is in the finalized queue,
1092                    // that's rare enough that a retry is ok.
1093                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1094                        // We got a UTXO, so we replace the response future with the result own.
1095                        timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1096
1097                        return Ok(Response::Utxo(utxo));
1098                    }
1099
1100                    // We're finished, but the returned future is waiting on the respond() channel.
1101                    timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1102
1103                    response_fut.await
1104                }
1105                .boxed()
1106            }
1107
1108            // Used by sync, inbound, and block verifier to check if a block is already in the state
1109            // before downloading or validating it.
1110            Request::KnownBlock(hash) => {
1111                let timer = CodeTimer::start();
1112
1113                let read_service = self.read_service.clone();
1114
1115                async move {
1116                    let response = read::non_finalized_state_contains_block_hash(
1117                        &read_service.latest_non_finalized_state(),
1118                        hash,
1119                    )
1120                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1121
1122                    // The work is done in the future.
1123                    timer.finish(module_path!(), line!(), "Request::KnownBlock");
1124
1125                    Ok(Response::KnownBlock(response))
1126                }
1127                .boxed()
1128            }
1129
1130            Request::InvalidateBlock(block_hash) => {
1131                let rsp_rx = tokio::task::block_in_place(move || {
1132                    span.in_scope(|| self.send_invalidate_block(block_hash))
1133                });
1134
1135                let span = Span::current();
1136                async move {
1137                    rsp_rx
1138                        .await
1139                        .map_err(|_recv_error| {
1140                            BoxError::from("invalidate block request was unexpectedly dropped")
1141                        })
1142                        // TODO: replace with Result::flatten once it stabilises
1143                        // https://github.com/rust-lang/rust/issues/70142
1144                        .and_then(convert::identity)
1145                        .map(Response::Invalidated)
1146                }
1147                .instrument(span)
1148                .boxed()
1149            }
1150
1151            Request::ReconsiderBlock(block_hash) => {
1152                let rsp_rx = tokio::task::block_in_place(move || {
1153                    span.in_scope(|| self.send_reconsider_block(block_hash))
1154                });
1155
1156                let span = Span::current();
1157                async move {
1158                    rsp_rx
1159                        .await
1160                        .map_err(|_recv_error| {
1161                            BoxError::from("reconsider block request was unexpectedly dropped")
1162                        })
1163                        // TODO: replace with Result::flatten once it stabilises
1164                        // https://github.com/rust-lang/rust/issues/70142
1165                        .and_then(convert::identity)
1166                        .map(Response::Reconsidered)
1167                }
1168                .instrument(span)
1169                .boxed()
1170            }
1171
1172            // Runs concurrently using the ReadStateService
1173            Request::Tip
1174            | Request::Depth(_)
1175            | Request::BestChainNextMedianTimePast
1176            | Request::BestChainBlockHash(_)
1177            | Request::BlockLocator
1178            | Request::Transaction(_)
1179            | Request::UnspentBestChainUtxo(_)
1180            | Request::Block(_)
1181            | Request::BlockAndSize(_)
1182            | Request::BlockHeader(_)
1183            | Request::FindBlockHashes { .. }
1184            | Request::FindBlockHeaders { .. }
1185            | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1186                // Redirect the request to the concurrent ReadStateService
1187                let read_service = self.read_service.clone();
1188
1189                async move {
1190                    let req = req
1191                        .try_into()
1192                        .expect("ReadRequest conversion should not fail");
1193
1194                    let rsp = read_service.oneshot(req).await?;
1195                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1196
1197                    Ok(rsp)
1198                }
1199                .boxed()
1200            }
1201
1202            Request::CheckBlockProposalValidity(_) => {
1203                // Redirect the request to the concurrent ReadStateService
1204                let read_service = self.read_service.clone();
1205
1206                async move {
1207                    let req = req
1208                        .try_into()
1209                        .expect("ReadRequest conversion should not fail");
1210
1211                    let rsp = read_service.oneshot(req).await?;
1212                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1213
1214                    Ok(rsp)
1215                }
1216                .boxed()
1217            }
1218        }
1219    }
1220}
1221
1222impl Service<ReadRequest> for ReadStateService {
1223    type Response = ReadResponse;
1224    type Error = BoxError;
1225    type Future =
1226        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1227
1228    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1229        // Check for panics in the block write task
1230        //
1231        // TODO: move into a check_for_panics() method
1232        let block_write_task = self.block_write_task.take();
1233
1234        if let Some(block_write_task) = block_write_task {
1235            if block_write_task.is_finished() {
1236                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1237                    // We are the last state with a reference to this task, so we can propagate any panics
1238                    if let Err(thread_panic) = block_write_task.join() {
1239                        std::panic::resume_unwind(thread_panic);
1240                    }
1241                }
1242            } else {
1243                // It hasn't finished, so we need to put it back
1244                self.block_write_task = Some(block_write_task);
1245            }
1246        }
1247
1248        self.db.check_for_panics();
1249
1250        Poll::Ready(Ok(()))
1251    }
1252
1253    #[instrument(name = "read_state", skip(self, req))]
1254    fn call(&mut self, req: ReadRequest) -> Self::Future {
1255        req.count_metric();
1256        let timer = CodeTimer::start();
1257        let span = Span::current();
1258
1259        match req {
1260            // Used by the `getblockchaininfo` RPC.
1261            ReadRequest::UsageInfo => {
1262                let db = self.db.clone();
1263
1264                tokio::task::spawn_blocking(move || {
1265                    span.in_scope(move || {
1266                        // The work is done in the future.
1267
1268                        let db_size = db.size();
1269
1270                        timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1271
1272                        Ok(ReadResponse::UsageInfo(db_size))
1273                    })
1274                })
1275                .wait_for_panics()
1276            }
1277
1278            // Used by the StateService.
1279            ReadRequest::Tip => {
1280                let state = self.clone();
1281
1282                tokio::task::spawn_blocking(move || {
1283                    span.in_scope(move || {
1284                        let tip = state.non_finalized_state_receiver.with_watch_data(
1285                            |non_finalized_state| {
1286                                read::tip(non_finalized_state.best_chain(), &state.db)
1287                            },
1288                        );
1289
1290                        // The work is done in the future.
1291                        timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1292
1293                        Ok(ReadResponse::Tip(tip))
1294                    })
1295                })
1296                .wait_for_panics()
1297            }
1298
1299            // Used by `getblockchaininfo` RPC method.
1300            ReadRequest::TipPoolValues => {
1301                let state = self.clone();
1302
1303                tokio::task::spawn_blocking(move || {
1304                    span.in_scope(move || {
1305                        let tip_with_value_balance = state
1306                            .non_finalized_state_receiver
1307                            .with_watch_data(|non_finalized_state| {
1308                                read::tip_with_value_balance(
1309                                    non_finalized_state.best_chain(),
1310                                    &state.db,
1311                                )
1312                            });
1313
1314                        // The work is done in the future.
1315                        // TODO: Do this in the Drop impl with the variant name?
1316                        timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1317
1318                        let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1319                            .ok_or(BoxError::from("no chain tip available yet"))?;
1320
1321                        Ok(ReadResponse::TipPoolValues {
1322                            tip_height,
1323                            tip_hash,
1324                            value_balance,
1325                        })
1326                    })
1327                })
1328                .wait_for_panics()
1329            }
1330
1331            // Used by getblock
1332            ReadRequest::BlockInfo(hash_or_height) => {
1333                let state = self.clone();
1334
1335                tokio::task::spawn_blocking(move || {
1336                    span.in_scope(move || {
1337                        let value_balance = state.non_finalized_state_receiver.with_watch_data(
1338                            |non_finalized_state| {
1339                                read::block_info(
1340                                    non_finalized_state.best_chain(),
1341                                    &state.db,
1342                                    hash_or_height,
1343                                )
1344                            },
1345                        );
1346
1347                        // The work is done in the future.
1348                        // TODO: Do this in the Drop impl with the variant name?
1349                        timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1350
1351                        Ok(ReadResponse::BlockInfo(value_balance))
1352                    })
1353                })
1354                .wait_for_panics()
1355            }
1356
1357            // Used by the StateService.
1358            ReadRequest::Depth(hash) => {
1359                let state = self.clone();
1360
1361                tokio::task::spawn_blocking(move || {
1362                    span.in_scope(move || {
1363                        let depth = state.non_finalized_state_receiver.with_watch_data(
1364                            |non_finalized_state| {
1365                                read::depth(non_finalized_state.best_chain(), &state.db, hash)
1366                            },
1367                        );
1368
1369                        // The work is done in the future.
1370                        timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1371
1372                        Ok(ReadResponse::Depth(depth))
1373                    })
1374                })
1375                .wait_for_panics()
1376            }
1377
1378            // Used by the StateService.
1379            ReadRequest::BestChainNextMedianTimePast => {
1380                let state = self.clone();
1381
1382                tokio::task::spawn_blocking(move || {
1383                    span.in_scope(move || {
1384                        let non_finalized_state = state.latest_non_finalized_state();
1385                        let median_time_past =
1386                            read::next_median_time_past(&non_finalized_state, &state.db);
1387
1388                        // The work is done in the future.
1389                        timer.finish(
1390                            module_path!(),
1391                            line!(),
1392                            "ReadRequest::BestChainNextMedianTimePast",
1393                        );
1394
1395                        Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1396                    })
1397                })
1398                .wait_for_panics()
1399            }
1400
1401            // Used by the get_block (raw) RPC and the StateService.
1402            ReadRequest::Block(hash_or_height) => {
1403                let state = self.clone();
1404
1405                tokio::task::spawn_blocking(move || {
1406                    span.in_scope(move || {
1407                        let block = state.non_finalized_state_receiver.with_watch_data(
1408                            |non_finalized_state| {
1409                                read::block(
1410                                    non_finalized_state.best_chain(),
1411                                    &state.db,
1412                                    hash_or_height,
1413                                )
1414                            },
1415                        );
1416
1417                        // The work is done in the future.
1418                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1419
1420                        Ok(ReadResponse::Block(block))
1421                    })
1422                })
1423                .wait_for_panics()
1424            }
1425
1426            // Used by the get_block (raw) RPC and the StateService.
1427            ReadRequest::BlockAndSize(hash_or_height) => {
1428                let state = self.clone();
1429
1430                tokio::task::spawn_blocking(move || {
1431                    span.in_scope(move || {
1432                        let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1433                            |non_finalized_state| {
1434                                read::block_and_size(
1435                                    non_finalized_state.best_chain(),
1436                                    &state.db,
1437                                    hash_or_height,
1438                                )
1439                            },
1440                        );
1441
1442                        // The work is done in the future.
1443                        timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1444
1445                        Ok(ReadResponse::BlockAndSize(block_and_size))
1446                    })
1447                })
1448                .wait_for_panics()
1449            }
1450
1451            // Used by the get_block (verbose) RPC and the StateService.
1452            ReadRequest::BlockHeader(hash_or_height) => {
1453                let state = self.clone();
1454
1455                tokio::task::spawn_blocking(move || {
1456                    span.in_scope(move || {
1457                        let best_chain = state.latest_best_chain();
1458
1459                        let height = hash_or_height
1460                            .height_or_else(|hash| {
1461                                read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1462                            })
1463                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1464
1465                        let hash = hash_or_height
1466                            .hash_or_else(|height| {
1467                                read::find::hash_by_height(best_chain.clone(), &state.db, height)
1468                            })
1469                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1470
1471                        let next_height = height.next()?;
1472                        let next_block_hash =
1473                            read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1474
1475                        let header = read::block_header(best_chain, &state.db, height.into())
1476                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1477
1478                        // The work is done in the future.
1479                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1480
1481                        Ok(ReadResponse::BlockHeader {
1482                            header,
1483                            hash,
1484                            height,
1485                            next_block_hash,
1486                        })
1487                    })
1488                })
1489                .wait_for_panics()
1490            }
1491
1492            // For the get_raw_transaction RPC and the StateService.
1493            ReadRequest::Transaction(hash) => {
1494                let state = self.clone();
1495
1496                tokio::task::spawn_blocking(move || {
1497                    span.in_scope(move || {
1498                        let response =
1499                            read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1500
1501                        // The work is done in the future.
1502                        timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1503
1504                        Ok(ReadResponse::Transaction(response))
1505                    })
1506                })
1507                .wait_for_panics()
1508            }
1509
1510            // Used by the getblock (verbose) RPC.
1511            ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1512                let state = self.clone();
1513
1514                tokio::task::spawn_blocking(move || {
1515                    span.in_scope(move || {
1516                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1517                            |non_finalized_state| {
1518                                read::transaction_hashes_for_block(
1519                                    non_finalized_state.best_chain(),
1520                                    &state.db,
1521                                    hash_or_height,
1522                                )
1523                            },
1524                        );
1525
1526                        // The work is done in the future.
1527                        timer.finish(
1528                            module_path!(),
1529                            line!(),
1530                            "ReadRequest::TransactionIdsForBlock",
1531                        );
1532
1533                        Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1534                    })
1535                })
1536                .wait_for_panics()
1537            }
1538
1539            #[cfg(feature = "indexer")]
1540            ReadRequest::SpendingTransactionId(spend) => {
1541                let state = self.clone();
1542
1543                tokio::task::spawn_blocking(move || {
1544                    span.in_scope(move || {
1545                        let spending_transaction_id = state
1546                            .non_finalized_state_receiver
1547                            .with_watch_data(|non_finalized_state| {
1548                                read::spending_transaction_hash(
1549                                    non_finalized_state.best_chain(),
1550                                    &state.db,
1551                                    spend,
1552                                )
1553                            });
1554
1555                        // The work is done in the future.
1556                        timer.finish(
1557                            module_path!(),
1558                            line!(),
1559                            "ReadRequest::TransactionIdForSpentOutPoint",
1560                        );
1561
1562                        Ok(ReadResponse::TransactionId(spending_transaction_id))
1563                    })
1564                })
1565                .wait_for_panics()
1566            }
1567
1568            ReadRequest::UnspentBestChainUtxo(outpoint) => {
1569                let state = self.clone();
1570
1571                tokio::task::spawn_blocking(move || {
1572                    span.in_scope(move || {
1573                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1574                            |non_finalized_state| {
1575                                read::unspent_utxo(
1576                                    non_finalized_state.best_chain(),
1577                                    &state.db,
1578                                    outpoint,
1579                                )
1580                            },
1581                        );
1582
1583                        // The work is done in the future.
1584                        timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1585
1586                        Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1587                    })
1588                })
1589                .wait_for_panics()
1590            }
1591
1592            // Manually used by the StateService to implement part of AwaitUtxo.
1593            ReadRequest::AnyChainUtxo(outpoint) => {
1594                let state = self.clone();
1595
1596                tokio::task::spawn_blocking(move || {
1597                    span.in_scope(move || {
1598                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1599                            |non_finalized_state| {
1600                                read::any_utxo(non_finalized_state, &state.db, outpoint)
1601                            },
1602                        );
1603
1604                        // The work is done in the future.
1605                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1606
1607                        Ok(ReadResponse::AnyChainUtxo(utxo))
1608                    })
1609                })
1610                .wait_for_panics()
1611            }
1612
1613            // Used by the StateService.
1614            ReadRequest::BlockLocator => {
1615                let state = self.clone();
1616
1617                tokio::task::spawn_blocking(move || {
1618                    span.in_scope(move || {
1619                        let block_locator = state.non_finalized_state_receiver.with_watch_data(
1620                            |non_finalized_state| {
1621                                read::block_locator(non_finalized_state.best_chain(), &state.db)
1622                            },
1623                        );
1624
1625                        // The work is done in the future.
1626                        timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1627
1628                        Ok(ReadResponse::BlockLocator(
1629                            block_locator.unwrap_or_default(),
1630                        ))
1631                    })
1632                })
1633                .wait_for_panics()
1634            }
1635
1636            // Used by the StateService.
1637            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1638                let state = self.clone();
1639
1640                tokio::task::spawn_blocking(move || {
1641                    span.in_scope(move || {
1642                        let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1643                            |non_finalized_state| {
1644                                read::find_chain_hashes(
1645                                    non_finalized_state.best_chain(),
1646                                    &state.db,
1647                                    known_blocks,
1648                                    stop,
1649                                    MAX_FIND_BLOCK_HASHES_RESULTS,
1650                                )
1651                            },
1652                        );
1653
1654                        // The work is done in the future.
1655                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1656
1657                        Ok(ReadResponse::BlockHashes(block_hashes))
1658                    })
1659                })
1660                .wait_for_panics()
1661            }
1662
1663            // Used by the StateService.
1664            ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1665                let state = self.clone();
1666
1667                tokio::task::spawn_blocking(move || {
1668                    span.in_scope(move || {
1669                        let block_headers = state.non_finalized_state_receiver.with_watch_data(
1670                            |non_finalized_state| {
1671                                read::find_chain_headers(
1672                                    non_finalized_state.best_chain(),
1673                                    &state.db,
1674                                    known_blocks,
1675                                    stop,
1676                                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1677                                )
1678                            },
1679                        );
1680
1681                        let block_headers = block_headers
1682                            .into_iter()
1683                            .map(|header| CountedHeader { header })
1684                            .collect();
1685
1686                        // The work is done in the future.
1687                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1688
1689                        Ok(ReadResponse::BlockHeaders(block_headers))
1690                    })
1691                })
1692                .wait_for_panics()
1693            }
1694
1695            ReadRequest::SaplingTree(hash_or_height) => {
1696                let state = self.clone();
1697
1698                tokio::task::spawn_blocking(move || {
1699                    span.in_scope(move || {
1700                        let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1701                            |non_finalized_state| {
1702                                read::sapling_tree(
1703                                    non_finalized_state.best_chain(),
1704                                    &state.db,
1705                                    hash_or_height,
1706                                )
1707                            },
1708                        );
1709
1710                        // The work is done in the future.
1711                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1712
1713                        Ok(ReadResponse::SaplingTree(sapling_tree))
1714                    })
1715                })
1716                .wait_for_panics()
1717            }
1718
1719            ReadRequest::OrchardTree(hash_or_height) => {
1720                let state = self.clone();
1721
1722                tokio::task::spawn_blocking(move || {
1723                    span.in_scope(move || {
1724                        let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1725                            |non_finalized_state| {
1726                                read::orchard_tree(
1727                                    non_finalized_state.best_chain(),
1728                                    &state.db,
1729                                    hash_or_height,
1730                                )
1731                            },
1732                        );
1733
1734                        // The work is done in the future.
1735                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1736
1737                        Ok(ReadResponse::OrchardTree(orchard_tree))
1738                    })
1739                })
1740                .wait_for_panics()
1741            }
1742
1743            ReadRequest::SaplingSubtrees { start_index, limit } => {
1744                let state = self.clone();
1745
1746                tokio::task::spawn_blocking(move || {
1747                    span.in_scope(move || {
1748                        let end_index = limit
1749                            .and_then(|limit| start_index.0.checked_add(limit.0))
1750                            .map(NoteCommitmentSubtreeIndex);
1751
1752                        let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1753                            |non_finalized_state| {
1754                                if let Some(end_index) = end_index {
1755                                    read::sapling_subtrees(
1756                                        non_finalized_state.best_chain(),
1757                                        &state.db,
1758                                        start_index..end_index,
1759                                    )
1760                                } else {
1761                                    // If there is no end bound, just return all the trees.
1762                                    // If the end bound would overflow, just returns all the trees, because that's what
1763                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1764                                    // the trees run out.)
1765                                    read::sapling_subtrees(
1766                                        non_finalized_state.best_chain(),
1767                                        &state.db,
1768                                        start_index..,
1769                                    )
1770                                }
1771                            },
1772                        );
1773
1774                        // The work is done in the future.
1775                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1776
1777                        Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1778                    })
1779                })
1780                .wait_for_panics()
1781            }
1782
1783            ReadRequest::OrchardSubtrees { start_index, limit } => {
1784                let state = self.clone();
1785
1786                tokio::task::spawn_blocking(move || {
1787                    span.in_scope(move || {
1788                        let end_index = limit
1789                            .and_then(|limit| start_index.0.checked_add(limit.0))
1790                            .map(NoteCommitmentSubtreeIndex);
1791
1792                        let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1793                            |non_finalized_state| {
1794                                if let Some(end_index) = end_index {
1795                                    read::orchard_subtrees(
1796                                        non_finalized_state.best_chain(),
1797                                        &state.db,
1798                                        start_index..end_index,
1799                                    )
1800                                } else {
1801                                    // If there is no end bound, just return all the trees.
1802                                    // If the end bound would overflow, just returns all the trees, because that's what
1803                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1804                                    // the trees run out.)
1805                                    read::orchard_subtrees(
1806                                        non_finalized_state.best_chain(),
1807                                        &state.db,
1808                                        start_index..,
1809                                    )
1810                                }
1811                            },
1812                        );
1813
1814                        // The work is done in the future.
1815                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1816
1817                        Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1818                    })
1819                })
1820                .wait_for_panics()
1821            }
1822
1823            // For the get_address_balance RPC.
1824            ReadRequest::AddressBalance(addresses) => {
1825                let state = self.clone();
1826
1827                tokio::task::spawn_blocking(move || {
1828                    span.in_scope(move || {
1829                        let (balance, received) = state
1830                            .non_finalized_state_receiver
1831                            .with_watch_data(|non_finalized_state| {
1832                                read::transparent_balance(
1833                                    non_finalized_state.best_chain().cloned(),
1834                                    &state.db,
1835                                    addresses,
1836                                )
1837                            })?;
1838
1839                        // The work is done in the future.
1840                        timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1841
1842                        Ok(ReadResponse::AddressBalance { balance, received })
1843                    })
1844                })
1845                .wait_for_panics()
1846            }
1847
1848            // For the get_address_tx_ids RPC.
1849            ReadRequest::TransactionIdsByAddresses {
1850                addresses,
1851                height_range,
1852            } => {
1853                let state = self.clone();
1854
1855                tokio::task::spawn_blocking(move || {
1856                    span.in_scope(move || {
1857                        let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1858                            |non_finalized_state| {
1859                                read::transparent_tx_ids(
1860                                    non_finalized_state.best_chain(),
1861                                    &state.db,
1862                                    addresses,
1863                                    height_range,
1864                                )
1865                            },
1866                        );
1867
1868                        // The work is done in the future.
1869                        timer.finish(
1870                            module_path!(),
1871                            line!(),
1872                            "ReadRequest::TransactionIdsByAddresses",
1873                        );
1874
1875                        tx_ids.map(ReadResponse::AddressesTransactionIds)
1876                    })
1877                })
1878                .wait_for_panics()
1879            }
1880
1881            // For the get_address_utxos RPC.
1882            ReadRequest::UtxosByAddresses(addresses) => {
1883                let state = self.clone();
1884
1885                tokio::task::spawn_blocking(move || {
1886                    span.in_scope(move || {
1887                        let utxos = state.non_finalized_state_receiver.with_watch_data(
1888                            |non_finalized_state| {
1889                                read::address_utxos(
1890                                    &state.network,
1891                                    non_finalized_state.best_chain(),
1892                                    &state.db,
1893                                    addresses,
1894                                )
1895                            },
1896                        );
1897
1898                        // The work is done in the future.
1899                        timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1900
1901                        utxos.map(ReadResponse::AddressUtxos)
1902                    })
1903                })
1904                .wait_for_panics()
1905            }
1906
1907            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1908                let state = self.clone();
1909
1910                tokio::task::spawn_blocking(move || {
1911                    span.in_scope(move || {
1912                        let latest_non_finalized_best_chain =
1913                            state.latest_non_finalized_state().best_chain().cloned();
1914
1915                        check::nullifier::tx_no_duplicates_in_chain(
1916                            &state.db,
1917                            latest_non_finalized_best_chain.as_ref(),
1918                            &unmined_tx.transaction,
1919                        )?;
1920
1921                        check::anchors::tx_anchors_refer_to_final_treestates(
1922                            &state.db,
1923                            latest_non_finalized_best_chain.as_ref(),
1924                            &unmined_tx,
1925                        )?;
1926
1927                        // The work is done in the future.
1928                        timer.finish(
1929                            module_path!(),
1930                            line!(),
1931                            "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1932                        );
1933
1934                        Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1935                    })
1936                })
1937                .wait_for_panics()
1938            }
1939
1940            // Used by the get_block and get_block_hash RPCs.
1941            ReadRequest::BestChainBlockHash(height) => {
1942                let state = self.clone();
1943
1944                // # Performance
1945                //
1946                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1947
1948                tokio::task::spawn_blocking(move || {
1949                    span.in_scope(move || {
1950                        let hash = state.non_finalized_state_receiver.with_watch_data(
1951                            |non_finalized_state| {
1952                                read::hash_by_height(
1953                                    non_finalized_state.best_chain(),
1954                                    &state.db,
1955                                    height,
1956                                )
1957                            },
1958                        );
1959
1960                        // The work is done in the future.
1961                        timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
1962
1963                        Ok(ReadResponse::BlockHash(hash))
1964                    })
1965                })
1966                .wait_for_panics()
1967            }
1968
1969            // Used by get_block_template and getblockchaininfo RPCs.
1970            ReadRequest::ChainInfo => {
1971                let state = self.clone();
1972                let latest_non_finalized_state = self.latest_non_finalized_state();
1973
1974                // # Performance
1975                //
1976                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1977
1978                tokio::task::spawn_blocking(move || {
1979                    span.in_scope(move || {
1980                        // # Correctness
1981                        //
1982                        // It is ok to do these lookups using multiple database calls. Finalized state updates
1983                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
1984                        //
1985                        // If there is a large overlap between the non-finalized and finalized states,
1986                        // where the finalized tip is above the non-finalized tip,
1987                        // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1988                        //
1989                        // In that case, the `getblocktemplate` RPC will return an error because Zebra
1990                        // is not synced to the tip. That check happens before the RPC makes this request.
1991                        let get_block_template_info =
1992                            read::difficulty::get_block_template_chain_info(
1993                                &latest_non_finalized_state,
1994                                &state.db,
1995                                &state.network,
1996                            );
1997
1998                        // The work is done in the future.
1999                        timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2000
2001                        get_block_template_info.map(ReadResponse::ChainInfo)
2002                    })
2003                })
2004                .wait_for_panics()
2005            }
2006
2007            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2008            ReadRequest::SolutionRate { num_blocks, height } => {
2009                let state = self.clone();
2010
2011                // # Performance
2012                //
2013                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2014
2015                tokio::task::spawn_blocking(move || {
2016                    span.in_scope(move || {
2017                        let latest_non_finalized_state = state.latest_non_finalized_state();
2018                        // # Correctness
2019                        //
2020                        // It is ok to do these lookups using multiple database calls. Finalized state updates
2021                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
2022                        //
2023                        // The worst that can happen here is that the default `start_hash` will be below
2024                        // the chain tip.
2025                        let (tip_height, tip_hash) =
2026                            match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2027                                Some(tip_hash) => tip_hash,
2028                                None => return Ok(ReadResponse::SolutionRate(None)),
2029                            };
2030
2031                        let start_hash = match height {
2032                            Some(height) if height < tip_height => read::hash_by_height(
2033                                latest_non_finalized_state.best_chain(),
2034                                &state.db,
2035                                height,
2036                            ),
2037                            // use the chain tip hash if height is above it or not provided.
2038                            _ => Some(tip_hash),
2039                        };
2040
2041                        let solution_rate = start_hash.and_then(|start_hash| {
2042                            read::difficulty::solution_rate(
2043                                &latest_non_finalized_state,
2044                                &state.db,
2045                                num_blocks,
2046                                start_hash,
2047                            )
2048                        });
2049
2050                        // The work is done in the future.
2051                        timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2052
2053                        Ok(ReadResponse::SolutionRate(solution_rate))
2054                    })
2055                })
2056                .wait_for_panics()
2057            }
2058
2059            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2060                let state = self.clone();
2061
2062                // # Performance
2063                //
2064                // Allow other async tasks to make progress while concurrently reading blocks from disk.
2065
2066                tokio::task::spawn_blocking(move || {
2067                    span.in_scope(move || {
2068                        tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2069                        let mut latest_non_finalized_state = state.latest_non_finalized_state();
2070
2071                        // The previous block of a valid proposal must be on the best chain tip.
2072                        let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2073                            return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2074                        };
2075
2076                        if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2077                            return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2078                        }
2079
2080                        // This clone of the non-finalized state is dropped when this closure returns.
2081                        // The non-finalized state that's used in the rest of the state (including finalizing
2082                        // blocks into the db) is not mutated here.
2083                        //
2084                        // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2085                        latest_non_finalized_state.disable_metrics();
2086
2087                        write::validate_and_commit_non_finalized(
2088                            &state.db,
2089                            &mut latest_non_finalized_state,
2090                            semantically_verified,
2091                        )?;
2092
2093                        // The work is done in the future.
2094                        timer.finish(
2095                            module_path!(),
2096                            line!(),
2097                            "ReadRequest::CheckBlockProposalValidity",
2098                        );
2099
2100                        Ok(ReadResponse::ValidBlockProposal)
2101                    })
2102                })
2103                .wait_for_panics()
2104            }
2105
2106            ReadRequest::TipBlockSize => {
2107                let state = self.clone();
2108
2109                tokio::task::spawn_blocking(move || {
2110                    span.in_scope(move || {
2111                        // Get the best chain tip height.
2112                        let tip_height = state
2113                            .non_finalized_state_receiver
2114                            .with_watch_data(|non_finalized_state| {
2115                                read::tip_height(non_finalized_state.best_chain(), &state.db)
2116                            })
2117                            .unwrap_or(Height(0));
2118
2119                        // Get the block at the best chain tip height.
2120                        let block = state.non_finalized_state_receiver.with_watch_data(
2121                            |non_finalized_state| {
2122                                read::block(
2123                                    non_finalized_state.best_chain(),
2124                                    &state.db,
2125                                    tip_height.into(),
2126                                )
2127                            },
2128                        );
2129
2130                        // The work is done in the future.
2131                        timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2132
2133                        // Respond with the length of the obtained block if any.
2134                        match block {
2135                            Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2136                                b.zcash_serialize_to_vec()?.len(),
2137                            ))),
2138                            None => Ok(ReadResponse::TipBlockSize(None)),
2139                        }
2140                    })
2141                })
2142                .wait_for_panics()
2143            }
2144
2145            ReadRequest::NonFinalizedBlocksListener => {
2146                // The non-finalized blocks listener is used to notify the state service
2147                // about new blocks that have been added to the non-finalized state.
2148                let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2149                    self.network.clone(),
2150                    self.non_finalized_state_receiver.clone(),
2151                );
2152
2153                async move {
2154                    timer.finish(
2155                        module_path!(),
2156                        line!(),
2157                        "ReadRequest::NonFinalizedBlocksListener",
2158                    );
2159
2160                    Ok(ReadResponse::NonFinalizedBlocksListener(
2161                        non_finalized_blocks_listener,
2162                    ))
2163                }
2164                .boxed()
2165            }
2166        }
2167    }
2168}
2169
2170/// Initialize a state service from the provided [`Config`].
2171/// Returns a boxed state service, a read-only state service,
2172/// and receivers for state chain tip updates.
2173///
2174/// Each `network` has its own separate on-disk database.
2175///
2176/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2177/// to work out when it is near the final checkpoint.
2178///
2179/// To share access to the state, wrap the returned service in a `Buffer`,
2180/// or clone the returned [`ReadStateService`].
2181///
2182/// It's possible to construct multiple state services in the same application (as
2183/// long as they, e.g., use different storage locations), but doing so is
2184/// probably not what you want.
2185pub fn init(
2186    config: Config,
2187    network: &Network,
2188    max_checkpoint_height: block::Height,
2189    checkpoint_verify_concurrency_limit: usize,
2190) -> (
2191    BoxService<Request, Response, BoxError>,
2192    ReadStateService,
2193    LatestChainTip,
2194    ChainTipChange,
2195) {
2196    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2197        StateService::new(
2198            config,
2199            network,
2200            max_checkpoint_height,
2201            checkpoint_verify_concurrency_limit,
2202        );
2203
2204    (
2205        BoxService::new(state_service),
2206        read_only_state_service,
2207        latest_chain_tip,
2208        chain_tip_change,
2209    )
2210}
2211
2212/// Initialize a read state service from the provided [`Config`].
2213/// Returns a read-only state service,
2214///
2215/// Each `network` has its own separate on-disk database.
2216///
2217/// To share access to the state, clone the returned [`ReadStateService`].
2218pub fn init_read_only(
2219    config: Config,
2220    network: &Network,
2221) -> (
2222    ReadStateService,
2223    ZebraDb,
2224    tokio::sync::watch::Sender<NonFinalizedState>,
2225) {
2226    let finalized_state = FinalizedState::new_with_debug(
2227        &config,
2228        network,
2229        true,
2230        #[cfg(feature = "elasticsearch")]
2231        false,
2232        true,
2233    );
2234    let (non_finalized_state_sender, non_finalized_state_receiver) =
2235        tokio::sync::watch::channel(NonFinalizedState::new(network));
2236
2237    (
2238        ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
2239        finalized_state.db.clone(),
2240        non_finalized_state_sender,
2241    )
2242}
2243
2244/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2245/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2246pub fn spawn_init_read_only(
2247    config: Config,
2248    network: &Network,
2249) -> tokio::task::JoinHandle<(
2250    ReadStateService,
2251    ZebraDb,
2252    tokio::sync::watch::Sender<NonFinalizedState>,
2253)> {
2254    let network = network.clone();
2255    tokio::task::spawn_blocking(move || init_read_only(config, &network))
2256}
2257
2258/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
2259/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
2260/// a read state service, and receivers for state chain tip updates.
2261pub fn spawn_init(
2262    config: Config,
2263    network: &Network,
2264    max_checkpoint_height: block::Height,
2265    checkpoint_verify_concurrency_limit: usize,
2266) -> tokio::task::JoinHandle<(
2267    BoxService<Request, Response, BoxError>,
2268    ReadStateService,
2269    LatestChainTip,
2270    ChainTipChange,
2271)> {
2272    let network = network.clone();
2273    tokio::task::spawn_blocking(move || {
2274        init(
2275            config,
2276            &network,
2277            max_checkpoint_height,
2278            checkpoint_verify_concurrency_limit,
2279        )
2280    })
2281}
2282
2283/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2284///
2285/// This can be used to create a state service for testing. See also [`init`].
2286#[cfg(any(test, feature = "proptest-impl"))]
2287pub fn init_test(network: &Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2288    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2289    //       if we ever need to test final checkpoint sent UTXO queries
2290    let (state_service, _, _, _) =
2291        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2292
2293    Buffer::new(BoxService::new(state_service), 1)
2294}
2295
2296/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2297/// then returns the read-write service, read-only service, and tip watch channels.
2298///
2299/// This can be used to create a state service for testing. See also [`init`].
2300#[cfg(any(test, feature = "proptest-impl"))]
2301pub fn init_test_services(
2302    network: &Network,
2303) -> (
2304    Buffer<BoxService<Request, Response, BoxError>, Request>,
2305    ReadStateService,
2306    LatestChainTip,
2307    ChainTipChange,
2308) {
2309    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2310    //       if we ever need to test final checkpoint sent UTXO queries
2311    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2312        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2313
2314    let state_service = Buffer::new(BoxService::new(state_service), 1);
2315
2316    (
2317        state_service,
2318        read_state_service,
2319        latest_chain_tip,
2320        chain_tip_change,
2321    )
2322}