zebra_state/
service.rs

1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//!   and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//!   recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//!   tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//!   chain tip changes.
16
17use std::{
18    collections::HashMap,
19    convert,
20    future::Future,
21    pin::Pin,
22    sync::Arc,
23    task::{Context, Poll},
24    time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::{oneshot, watch};
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36    block::{self, CountedHeader, HeightDiff},
37    diagnostic::{task::WaitForPanics, CodeTimer},
38    parameters::{Network, NetworkUpgrade},
39    subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45    constants::{
46        MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47    },
48    service::{
49        block_iter::any_ancestor_blocks,
50        chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
51        finalized_state::{FinalizedState, ZebraDb},
52        non_finalized_state::{Chain, NonFinalizedState},
53        pending_utxos::PendingUtxos,
54        queued_blocks::QueuedBlocks,
55        watch_receiver::WatchReceiver,
56    },
57    BoxError, CheckpointVerifiedBlock, CloneError, Config, ReadRequest, ReadResponse, Request,
58    Response, SemanticallyVerifiedBlock,
59};
60
61pub mod block_iter;
62pub mod chain_tip;
63pub mod watch_receiver;
64
65pub mod check;
66
67pub(crate) mod finalized_state;
68pub(crate) mod non_finalized_state;
69mod pending_utxos;
70mod queued_blocks;
71pub(crate) mod read;
72mod write;
73
74#[cfg(any(test, feature = "proptest-impl"))]
75pub mod arbitrary;
76
77#[cfg(test)]
78mod tests;
79
80pub use finalized_state::{OutputIndex, OutputLocation, TransactionIndex, TransactionLocation};
81
82use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
83
84/// A read-write service for Zebra's cached blockchain state.
85///
86/// This service modifies and provides access to:
87/// - the non-finalized state: the ~100 most recent blocks.
88///   Zebra allows chain forks in the non-finalized state,
89///   stores it in memory, and re-downloads it when restarted.
90/// - the finalized state: older blocks that have many confirmations.
91///   Zebra stores the single best chain in the finalized state,
92///   and re-loads it from disk when restarted.
93///
94/// Read requests to this service are buffered, then processed concurrently.
95/// Block write requests are buffered, then queued, then processed in order by a separate task.
96///
97/// Most state users can get faster read responses using the [`ReadStateService`],
98/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
99///
100/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
101/// They can read the latest block directly, without queueing any requests.
102#[derive(Debug)]
103pub(crate) struct StateService {
104    // Configuration
105    //
106    /// The configured Zcash network.
107    network: Network,
108
109    /// The height that we start storing UTXOs from finalized blocks.
110    ///
111    /// This height should be lower than the last few checkpoints,
112    /// so the full verifier can verify UTXO spends from those blocks,
113    /// even if they haven't been committed to the finalized state yet.
114    full_verifier_utxo_lookahead: block::Height,
115
116    // Queued Blocks
117    //
118    /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
119    /// These blocks are awaiting their parent blocks before they can do contextual verification.
120    non_finalized_state_queued_blocks: QueuedBlocks,
121
122    /// Queued blocks for the [`FinalizedState`] that arrived out of order.
123    /// These blocks are awaiting their parent blocks before they can do contextual verification.
124    ///
125    /// Indexed by their parent block hash.
126    finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
127
128    /// A channel to send blocks to the `block_write_task`,
129    /// so they can be written to the [`NonFinalizedState`].
130    non_finalized_block_write_sender:
131        Option<tokio::sync::mpsc::UnboundedSender<QueuedSemanticallyVerified>>,
132
133    /// A channel to send blocks to the `block_write_task`,
134    /// so they can be written to the [`FinalizedState`].
135    ///
136    /// This sender is dropped after the state has finished sending all the checkpointed blocks,
137    /// and the lowest semantically verified block arrives.
138    finalized_block_write_sender:
139        Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
140
141    /// The [`block::Hash`] of the most recent block sent on
142    /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
143    ///
144    /// On startup, this is:
145    /// - the finalized tip, if there are stored blocks, or
146    /// - the genesis block's parent hash, if the database is empty.
147    ///
148    /// If `invalid_block_write_reset_receiver` gets a reset, this is:
149    /// - the hash of the last valid committed block (the parent of the invalid block).
150    finalized_block_write_last_sent_hash: block::Hash,
151
152    /// A set of block hashes that have been sent to the block write task.
153    /// Hashes of blocks below the finalized tip height are periodically pruned.
154    non_finalized_block_write_sent_hashes: SentHashes,
155
156    /// If an invalid block is sent on `finalized_block_write_sender`
157    /// or `non_finalized_block_write_sender`,
158    /// this channel gets the [`block::Hash`] of the valid tip.
159    //
160    // TODO: add tests for finalized and non-finalized resets (#2654)
161    invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
162
163    // Pending UTXO Request Tracking
164    //
165    /// The set of outpoints with pending requests for their associated transparent::Output.
166    pending_utxos: PendingUtxos,
167
168    /// Instant tracking the last time `pending_utxos` was pruned.
169    last_prune: Instant,
170
171    // Updating Concurrently Readable State
172    //
173    /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
174    ///
175    /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
176    read_service: ReadStateService,
177
178    // Metrics
179    //
180    /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
181    ///
182    /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
183    /// as a break in the graph.
184    max_finalized_queue_height: f64,
185}
186
187/// A read-only service for accessing Zebra's cached blockchain state.
188///
189/// This service provides read-only access to:
190/// - the non-finalized state: the ~100 most recent blocks.
191/// - the finalized state: older blocks that have many confirmations.
192///
193/// Requests to this service are processed in parallel,
194/// ignoring any blocks queued by the read-write [`StateService`].
195///
196/// This quick response behavior is better for most state users.
197/// It allows other async tasks to make progress while concurrently reading data from disk.
198#[derive(Clone, Debug)]
199pub struct ReadStateService {
200    // Configuration
201    //
202    /// The configured Zcash network.
203    network: Network,
204
205    // Shared Concurrently Readable State
206    //
207    /// A watch channel with a cached copy of the [`NonFinalizedState`].
208    ///
209    /// This state is only updated between requests,
210    /// so it might include some block data that is also on `disk`.
211    non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
212
213    /// The shared inner on-disk database for the finalized state.
214    ///
215    /// RocksDB allows reads and writes via a shared reference,
216    /// but [`ZebraDb`] doesn't expose any write methods or types.
217    ///
218    /// This chain is updated concurrently with requests,
219    /// so it might include some block data that is also in `best_mem`.
220    db: ZebraDb,
221
222    /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
223    /// once the queues have received all their parent blocks.
224    ///
225    /// Used to check for panics when writing blocks.
226    block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
227}
228
229impl Drop for StateService {
230    fn drop(&mut self) {
231        // The state service owns the state, tasks, and channels,
232        // so dropping it should shut down everything.
233
234        // Close the channels (non-blocking)
235        // This makes the block write thread exit the next time it checks the channels.
236        // We want to do this here so we get any errors or panics from the block write task before it shuts down.
237        self.invalid_block_write_reset_receiver.close();
238
239        std::mem::drop(self.finalized_block_write_sender.take());
240        std::mem::drop(self.non_finalized_block_write_sender.take());
241
242        self.clear_finalized_block_queue(
243            "dropping the state: dropped unused finalized state queue block",
244        );
245        self.clear_non_finalized_block_queue(
246            "dropping the state: dropped unused non-finalized state queue block",
247        );
248
249        // Log database metrics before shutting down
250        info!("dropping the state: logging database metrics");
251        self.log_db_metrics();
252
253        // Then drop self.read_service, which checks the block write task for panics,
254        // and tries to shut down the database.
255    }
256}
257
258impl Drop for ReadStateService {
259    fn drop(&mut self) {
260        // The read state service shares the state,
261        // so dropping it should check if we can shut down.
262
263        // TODO: move this into a try_shutdown() method
264        if let Some(block_write_task) = self.block_write_task.take() {
265            if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
266                // We're the last database user, so we can tell it to shut down (blocking):
267                // - flushes the database to disk, and
268                // - drops the database, which cleans up any database tasks correctly.
269                self.db.shutdown(true);
270
271                // We are the last state with a reference to this thread, so we can
272                // wait until the block write task finishes, then check for panics (blocking).
273                // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
274
275                // This log is verbose during tests.
276                #[cfg(not(test))]
277                info!("waiting for the block write task to finish");
278                #[cfg(test)]
279                debug!("waiting for the block write task to finish");
280
281                // TODO: move this into a check_for_panics() method
282                if let Err(thread_panic) = block_write_task_handle.join() {
283                    std::panic::resume_unwind(thread_panic);
284                } else {
285                    debug!("shutting down the state because the block write task has finished");
286                }
287            }
288        } else {
289            // Even if we're not the last database user, try shutting it down.
290            //
291            // TODO: rename this to try_shutdown()?
292            self.db.shutdown(false);
293        }
294    }
295}
296
297impl StateService {
298    const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
299
300    /// Creates a new state service for the state `config` and `network`.
301    ///
302    /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
303    /// to work out when it is near the final checkpoint.
304    ///
305    /// Returns the read-write and read-only state services,
306    /// and read-only watch channels for its best chain tip.
307    pub fn new(
308        config: Config,
309        network: &Network,
310        max_checkpoint_height: block::Height,
311        checkpoint_verify_concurrency_limit: usize,
312    ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
313        let timer = CodeTimer::start();
314        let finalized_state = FinalizedState::new(
315            &config,
316            network,
317            #[cfg(feature = "elasticsearch")]
318            true,
319        );
320        timer.finish(module_path!(), line!(), "opening finalized state database");
321
322        let timer = CodeTimer::start();
323        let initial_tip = finalized_state
324            .db
325            .tip_block()
326            .map(CheckpointVerifiedBlock::from)
327            .map(ChainTipBlock::from);
328
329        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
330            ChainTipSender::new(initial_tip, network);
331
332        let non_finalized_state = NonFinalizedState::new(network);
333
334        let (non_finalized_state_sender, non_finalized_state_receiver) =
335            watch::channel(NonFinalizedState::new(&finalized_state.network()));
336
337        // Security: The number of blocks in these channels is limited by
338        //           the syncer and inbound lookahead limits.
339        let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
340            tokio::sync::mpsc::unbounded_channel();
341        let (finalized_block_write_sender, finalized_block_write_receiver) =
342            tokio::sync::mpsc::unbounded_channel();
343        let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
344            tokio::sync::mpsc::unbounded_channel();
345
346        let finalized_state_for_writing = finalized_state.clone();
347        let span = Span::current();
348        let block_write_task = std::thread::spawn(move || {
349            span.in_scope(move || {
350                write::write_blocks_from_channels(
351                    finalized_block_write_receiver,
352                    non_finalized_block_write_receiver,
353                    finalized_state_for_writing,
354                    non_finalized_state,
355                    invalid_block_reset_sender,
356                    chain_tip_sender,
357                    non_finalized_state_sender,
358                )
359            })
360        });
361        let block_write_task = Arc::new(block_write_task);
362
363        let read_service = ReadStateService::new(
364            &finalized_state,
365            Some(block_write_task),
366            non_finalized_state_receiver,
367        );
368
369        let full_verifier_utxo_lookahead = max_checkpoint_height
370            - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
371                .expect("fits in HeightDiff");
372        let full_verifier_utxo_lookahead =
373            full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
374        let non_finalized_state_queued_blocks = QueuedBlocks::default();
375        let pending_utxos = PendingUtxos::default();
376
377        let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash();
378
379        let state = Self {
380            network: network.clone(),
381            full_verifier_utxo_lookahead,
382            non_finalized_state_queued_blocks,
383            finalized_state_queued_blocks: HashMap::new(),
384            non_finalized_block_write_sender: Some(non_finalized_block_write_sender),
385            finalized_block_write_sender: Some(finalized_block_write_sender),
386            finalized_block_write_last_sent_hash,
387            non_finalized_block_write_sent_hashes: SentHashes::default(),
388            invalid_block_write_reset_receiver,
389            pending_utxos,
390            last_prune: Instant::now(),
391            read_service: read_service.clone(),
392            max_finalized_queue_height: f64::NAN,
393        };
394        timer.finish(module_path!(), line!(), "initializing state service");
395
396        tracing::info!("starting legacy chain check");
397        let timer = CodeTimer::start();
398
399        if let (Some(tip), Some(nu5_activation_height)) = (
400            state.best_tip(),
401            NetworkUpgrade::Nu5.activation_height(network),
402        ) {
403            if let Err(error) = check::legacy_chain(
404                nu5_activation_height,
405                any_ancestor_blocks(
406                    &state.read_service.latest_non_finalized_state(),
407                    &state.read_service.db,
408                    tip.1,
409                ),
410                &state.network,
411                MAX_LEGACY_CHAIN_BLOCKS,
412            ) {
413                let legacy_db_path = state.read_service.db.path().to_path_buf();
414                panic!(
415                    "Cached state contains a legacy chain.\n\
416                     An outdated Zebra version did not know about a recent network upgrade,\n\
417                     so it followed a legacy chain using outdated consensus branch rules.\n\
418                     Hint: Delete your database, and restart Zebra to do a full sync.\n\
419                     Database path: {legacy_db_path:?}\n\
420                     Error: {error:?}",
421                );
422            }
423        }
424
425        tracing::info!("cached state consensus branch is valid: no legacy chain found");
426        timer.finish(module_path!(), line!(), "legacy chain check");
427
428        (state, read_service, latest_chain_tip, chain_tip_change)
429    }
430
431    /// Call read only state service to log rocksdb database metrics.
432    pub fn log_db_metrics(&self) {
433        self.read_service.db.print_db_metrics();
434    }
435
436    /// Queue a checkpoint verified block for verification and storage in the finalized state.
437    ///
438    /// Returns a channel receiver that provides the result of the block commit.
439    fn queue_and_commit_to_finalized_state(
440        &mut self,
441        checkpoint_verified: CheckpointVerifiedBlock,
442    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
443        // # Correctness & Performance
444        //
445        // This method must not block, access the database, or perform CPU-intensive tasks,
446        // because it is called directly from the tokio executor's Future threads.
447
448        let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
449        let queued_height = checkpoint_verified.height;
450
451        // If we're close to the final checkpoint, make the block's UTXOs available for
452        // semantic block verification, even when it is in the channel.
453        if self.is_close_to_final_checkpoint(queued_height) {
454            self.non_finalized_block_write_sent_hashes
455                .add_finalized(&checkpoint_verified)
456        }
457
458        let (rsp_tx, rsp_rx) = oneshot::channel();
459        let queued = (checkpoint_verified, rsp_tx);
460
461        if self.finalized_block_write_sender.is_some() {
462            // We're still committing checkpoint verified blocks
463            if let Some(duplicate_queued) = self
464                .finalized_state_queued_blocks
465                .insert(queued_prev_hash, queued)
466            {
467                Self::send_checkpoint_verified_block_error(
468                    duplicate_queued,
469                    "dropping older checkpoint verified block: got newer duplicate block",
470                );
471            }
472
473            self.drain_finalized_queue_and_commit();
474        } else {
475            // We've finished committing checkpoint verified blocks to the finalized state,
476            // so drop any repeated queued blocks, and return an error.
477            //
478            // TODO: track the latest sent height, and drop any blocks under that height
479            //       every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
480            Self::send_checkpoint_verified_block_error(
481                queued,
482                "already finished committing checkpoint verified blocks: dropped duplicate block, \
483                 block is already committed to the state",
484            );
485
486            self.clear_finalized_block_queue(
487                "already finished committing checkpoint verified blocks: dropped duplicate block, \
488                 block is already committed to the state",
489            );
490        }
491
492        if self.finalized_state_queued_blocks.is_empty() {
493            self.max_finalized_queue_height = f64::NAN;
494        } else if self.max_finalized_queue_height.is_nan()
495            || self.max_finalized_queue_height < queued_height.0 as f64
496        {
497            // if there are still blocks in the queue, then either:
498            //   - the new block was lower than the old maximum, and there was a gap before it,
499            //     so the maximum is still the same (and we skip this code), or
500            //   - the new block is higher than the old maximum, and there is at least one gap
501            //     between the finalized tip and the new maximum
502            self.max_finalized_queue_height = queued_height.0 as f64;
503        }
504
505        metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
506        metrics::gauge!("state.checkpoint.queued.block.count")
507            .set(self.finalized_state_queued_blocks.len() as f64);
508
509        rsp_rx
510    }
511
512    /// Finds finalized state queue blocks to be committed to the state in order,
513    /// removes them from the queue, and sends them to the block commit task.
514    ///
515    /// After queueing a finalized block, this method checks whether the newly
516    /// queued block (and any of its descendants) can be committed to the state.
517    ///
518    /// Returns an error if the block commit channel has been closed.
519    pub fn drain_finalized_queue_and_commit(&mut self) {
520        use tokio::sync::mpsc::error::{SendError, TryRecvError};
521
522        // # Correctness & Performance
523        //
524        // This method must not block, access the database, or perform CPU-intensive tasks,
525        // because it is called directly from the tokio executor's Future threads.
526
527        // If a block failed, we need to start again from a valid tip.
528        match self.invalid_block_write_reset_receiver.try_recv() {
529            Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
530            Err(TryRecvError::Disconnected) => {
531                info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
532                return;
533            }
534            // There are no errors, so we can just use the last block hash we sent
535            Err(TryRecvError::Empty) => {}
536        }
537
538        while let Some(queued_block) = self
539            .finalized_state_queued_blocks
540            .remove(&self.finalized_block_write_last_sent_hash)
541        {
542            let last_sent_finalized_block_height = queued_block.0.height;
543
544            self.finalized_block_write_last_sent_hash = queued_block.0.hash;
545
546            // If we've finished sending finalized blocks, ignore any repeated blocks.
547            // (Blocks can be repeated after a syncer reset.)
548            if let Some(finalized_block_write_sender) = &self.finalized_block_write_sender {
549                let send_result = finalized_block_write_sender.send(queued_block);
550
551                // If the receiver is closed, we can't send any more blocks.
552                if let Err(SendError(queued)) = send_result {
553                    // If Zebra is shutting down, drop blocks and return an error.
554                    Self::send_checkpoint_verified_block_error(
555                        queued,
556                        "block commit task exited. Is Zebra shutting down?",
557                    );
558
559                    self.clear_finalized_block_queue(
560                        "block commit task exited. Is Zebra shutting down?",
561                    );
562                } else {
563                    metrics::gauge!("state.checkpoint.sent.block.height")
564                        .set(last_sent_finalized_block_height.0 as f64);
565                };
566            }
567        }
568    }
569
570    /// Drops all finalized state queue blocks, and sends an error on their result channels.
571    fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
572        for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
573            Self::send_checkpoint_verified_block_error(queued, error.clone());
574        }
575    }
576
577    /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
578    fn send_checkpoint_verified_block_error(
579        queued: QueuedCheckpointVerified,
580        error: impl Into<BoxError>,
581    ) {
582        let (finalized, rsp_tx) = queued;
583
584        // The block sender might have already given up on this block,
585        // so ignore any channel send errors.
586        let _ = rsp_tx.send(Err(error.into()));
587        std::mem::drop(finalized);
588    }
589
590    /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
591    fn clear_non_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
592        for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
593            Self::send_semantically_verified_block_error(queued, error.clone());
594        }
595    }
596
597    /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
598    fn send_semantically_verified_block_error(
599        queued: QueuedSemanticallyVerified,
600        error: impl Into<BoxError>,
601    ) {
602        let (finalized, rsp_tx) = queued;
603
604        // The block sender might have already given up on this block,
605        // so ignore any channel send errors.
606        let _ = rsp_tx.send(Err(error.into()));
607        std::mem::drop(finalized);
608    }
609
610    /// Queue a semantically verified block for contextual verification and check if any queued
611    /// blocks are ready to be verified and committed to the state.
612    ///
613    /// This function encodes the logic for [committing non-finalized blocks][1]
614    /// in RFC0005.
615    ///
616    /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
617    #[instrument(level = "debug", skip(self, semantically_verrified))]
618    fn queue_and_commit_to_non_finalized_state(
619        &mut self,
620        semantically_verrified: SemanticallyVerifiedBlock,
621    ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
622        tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
623        let parent_hash = semantically_verrified.block.header.previous_block_hash;
624
625        if self
626            .non_finalized_block_write_sent_hashes
627            .contains(&semantically_verrified.hash)
628        {
629            let (rsp_tx, rsp_rx) = oneshot::channel();
630            let _ = rsp_tx.send(Err(
631                "block has already been sent to be committed to the state".into(),
632            ));
633            return rsp_rx;
634        }
635
636        if self
637            .read_service
638            .db
639            .contains_height(semantically_verrified.height)
640        {
641            let (rsp_tx, rsp_rx) = oneshot::channel();
642            let _ = rsp_tx.send(Err(
643                "block height is in the finalized state: block is already committed to the state"
644                    .into(),
645            ));
646            return rsp_rx;
647        }
648
649        // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
650        // has been queued but not yet committed to the state fails the older request and replaces
651        // it with the newer request.
652        let rsp_rx = if let Some((_, old_rsp_tx)) = self
653            .non_finalized_state_queued_blocks
654            .get_mut(&semantically_verrified.hash)
655        {
656            tracing::debug!("replacing older queued request with new request");
657            let (mut rsp_tx, rsp_rx) = oneshot::channel();
658            std::mem::swap(old_rsp_tx, &mut rsp_tx);
659            let _ = rsp_tx.send(Err("replaced by newer request".into()));
660            rsp_rx
661        } else {
662            let (rsp_tx, rsp_rx) = oneshot::channel();
663            self.non_finalized_state_queued_blocks
664                .queue((semantically_verrified, rsp_tx));
665            rsp_rx
666        };
667
668        // We've finished sending checkpoint verified blocks when:
669        // - we've sent the verified block for the last checkpoint, and
670        // - it has been successfully written to disk.
671        //
672        // We detect the last checkpoint by looking for non-finalized blocks
673        // that are a child of the last block we sent.
674        //
675        // TODO: configure the state with the last checkpoint hash instead?
676        if self.finalized_block_write_sender.is_some()
677            && self
678                .non_finalized_state_queued_blocks
679                .has_queued_children(self.finalized_block_write_last_sent_hash)
680            && self.read_service.db.finalized_tip_hash()
681                == self.finalized_block_write_last_sent_hash
682        {
683            // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
684            // and move on to committing semantically verified blocks to the non-finalized state.
685            std::mem::drop(self.finalized_block_write_sender.take());
686            // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
687            self.non_finalized_block_write_sent_hashes = SentHashes::default();
688            // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
689            self.non_finalized_block_write_sent_hashes
690                .can_fork_chain_at_hashes = true;
691            // Send blocks from non-finalized queue
692            self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
693            // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
694            self.clear_finalized_block_queue(
695                "already finished committing checkpoint verified blocks: dropped duplicate block, \
696                 block is already committed to the state",
697            );
698        } else if !self.can_fork_chain_at(&parent_hash) {
699            tracing::trace!("unready to verify, returning early");
700        } else if self.finalized_block_write_sender.is_none() {
701            // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
702            self.send_ready_non_finalized_queued(parent_hash);
703
704            let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
705                "Finalized state must have at least one block before committing non-finalized state",
706            );
707
708            self.non_finalized_state_queued_blocks
709                .prune_by_height(finalized_tip_height);
710
711            self.non_finalized_block_write_sent_hashes
712                .prune_by_height(finalized_tip_height);
713        }
714
715        rsp_rx
716    }
717
718    /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
719    fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
720        self.non_finalized_block_write_sent_hashes
721            .can_fork_chain_at(hash)
722            || &self.read_service.db.finalized_tip_hash() == hash
723    }
724
725    /// Returns `true` if `queued_height` is near the final checkpoint.
726    ///
727    /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
728    /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
729    ///
730    /// If it doesn't have the required UTXOs, some blocks will time out,
731    /// but succeed after a syncer restart.
732    fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
733        queued_height >= self.full_verifier_utxo_lookahead
734    }
735
736    /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
737    /// in breadth-first ordering to the block write task which will attempt to validate and commit them
738    #[tracing::instrument(level = "debug", skip(self, new_parent))]
739    fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
740        use tokio::sync::mpsc::error::SendError;
741        if let Some(non_finalized_block_write_sender) = &self.non_finalized_block_write_sender {
742            let mut new_parents: Vec<block::Hash> = vec![new_parent];
743
744            while let Some(parent_hash) = new_parents.pop() {
745                let queued_children = self
746                    .non_finalized_state_queued_blocks
747                    .dequeue_children(parent_hash);
748
749                for queued_child in queued_children {
750                    let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
751
752                    self.non_finalized_block_write_sent_hashes
753                        .add(&queued_child.0);
754                    let send_result = non_finalized_block_write_sender.send(queued_child);
755
756                    if let Err(SendError(queued)) = send_result {
757                        // If Zebra is shutting down, drop blocks and return an error.
758                        Self::send_semantically_verified_block_error(
759                            queued,
760                            "block commit task exited. Is Zebra shutting down?",
761                        );
762
763                        self.clear_non_finalized_block_queue(
764                            "block commit task exited. Is Zebra shutting down?",
765                        );
766
767                        return;
768                    };
769
770                    new_parents.push(hash);
771                }
772            }
773
774            self.non_finalized_block_write_sent_hashes.finish_batch();
775        };
776    }
777
778    /// Return the tip of the current best chain.
779    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
780        self.read_service.best_tip()
781    }
782
783    /// Assert some assumptions about the semantically verified `block` before it is queued.
784    fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
785        // required by `Request::CommitSemanticallyVerifiedBlock` call
786        assert!(
787            block.height > self.network.mandatory_checkpoint_height(),
788            "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
789            blocks, and the canopy activation block, must be committed to the state as finalized \
790            blocks"
791        );
792    }
793}
794
795impl ReadStateService {
796    /// Creates a new read-only state service, using the provided finalized state and
797    /// block write task handle.
798    ///
799    /// Returns the newly created service,
800    /// and a watch channel for updating the shared recent non-finalized chain.
801    pub(crate) fn new(
802        finalized_state: &FinalizedState,
803        block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
804        non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
805    ) -> Self {
806        let read_service = Self {
807            network: finalized_state.network(),
808            db: finalized_state.db.clone(),
809            non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
810            block_write_task,
811        };
812
813        tracing::debug!("created new read-only state service");
814
815        read_service
816    }
817
818    /// Return the tip of the current best chain.
819    pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
820        read::best_tip(&self.latest_non_finalized_state(), &self.db)
821    }
822
823    /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
824    fn latest_non_finalized_state(&self) -> NonFinalizedState {
825        self.non_finalized_state_receiver.cloned_watch_data()
826    }
827
828    /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
829    #[allow(dead_code)]
830    fn latest_best_chain(&self) -> Option<Arc<Chain>> {
831        self.latest_non_finalized_state().best_chain().cloned()
832    }
833
834    /// Test-only access to the inner database.
835    /// Can be used to modify the database without doing any consensus checks.
836    #[cfg(any(test, feature = "proptest-impl"))]
837    pub fn db(&self) -> &ZebraDb {
838        &self.db
839    }
840
841    /// Logs rocksdb metrics using the read only state service.
842    pub fn log_db_metrics(&self) {
843        self.db.print_db_metrics();
844    }
845}
846
847impl Service<Request> for StateService {
848    type Response = Response;
849    type Error = BoxError;
850    type Future =
851        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
852
853    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
854        // Check for panics in the block write task
855        let poll = self.read_service.poll_ready(cx);
856
857        // Prune outdated UTXO requests
858        let now = Instant::now();
859
860        if self.last_prune + Self::PRUNE_INTERVAL < now {
861            let tip = self.best_tip();
862            let old_len = self.pending_utxos.len();
863
864            self.pending_utxos.prune();
865            self.last_prune = now;
866
867            let new_len = self.pending_utxos.len();
868            let prune_count = old_len
869                .checked_sub(new_len)
870                .expect("prune does not add any utxo requests");
871            if prune_count > 0 {
872                tracing::debug!(
873                    ?old_len,
874                    ?new_len,
875                    ?prune_count,
876                    ?tip,
877                    "pruned utxo requests"
878                );
879            } else {
880                tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
881            }
882        }
883
884        poll
885    }
886
887    #[instrument(name = "state", skip(self, req))]
888    fn call(&mut self, req: Request) -> Self::Future {
889        req.count_metric();
890        let timer = CodeTimer::start();
891        let span = Span::current();
892
893        match req {
894            // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
895            // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
896            Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
897                self.assert_block_can_be_validated(&semantically_verified);
898
899                self.pending_utxos
900                    .check_against_ordered(&semantically_verified.new_outputs);
901
902                // # Performance
903                //
904                // Allow other async tasks to make progress while blocks are being verified
905                // and written to disk. But wait for the blocks to finish committing,
906                // so that `StateService` multi-block queries always observe a consistent state.
907                //
908                // Since each block is spawned into its own task,
909                // there shouldn't be any other code running in the same task,
910                // so we don't need to worry about blocking it:
911                // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
912
913                let rsp_rx = tokio::task::block_in_place(move || {
914                    span.in_scope(|| {
915                        self.queue_and_commit_to_non_finalized_state(semantically_verified)
916                    })
917                });
918
919                // TODO:
920                //   - check for panics in the block write task here,
921                //     as well as in poll_ready()
922
923                // The work is all done, the future just waits on a channel for the result
924                timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
925
926                let span = Span::current();
927                async move {
928                    rsp_rx
929                        .await
930                        .map_err(|_recv_error| {
931                            BoxError::from(
932                                "block was dropped from the queue of non-finalized blocks",
933                            )
934                        })
935                        // TODO: replace with Result::flatten once it stabilises
936                        // https://github.com/rust-lang/rust/issues/70142
937                        .and_then(convert::identity)
938                        .map(Response::Committed)
939                }
940                .instrument(span)
941                .boxed()
942            }
943
944            // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
945            // Accesses shared writeable state in the StateService.
946            Request::CommitCheckpointVerifiedBlock(finalized) => {
947                // # Consensus
948                //
949                // A semantic block verification could have called AwaitUtxo
950                // before this checkpoint verified block arrived in the state.
951                // So we need to check for pending UTXO requests sent by running
952                // semantic block verifications.
953                //
954                // This check is redundant for most checkpoint verified blocks,
955                // because semantic verification can only succeed near the final
956                // checkpoint, when all the UTXOs are available for the verifying block.
957                //
958                // (Checkpoint block UTXOs are verified using block hash checkpoints
959                // and transaction merkle tree block header commitments.)
960                self.pending_utxos
961                    .check_against_ordered(&finalized.new_outputs);
962
963                // # Performance
964                //
965                // This method doesn't block, access the database, or perform CPU-intensive tasks,
966                // so we can run it directly in the tokio executor's Future threads.
967                let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
968
969                // TODO:
970                //   - check for panics in the block write task here,
971                //     as well as in poll_ready()
972
973                // The work is all done, the future just waits on a channel for the result
974                timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
975
976                async move {
977                    rsp_rx
978                        .await
979                        .map_err(|_recv_error| {
980                            BoxError::from("block was dropped from the queue of finalized blocks")
981                        })
982                        // TODO: replace with Result::flatten once it stabilises
983                        // https://github.com/rust-lang/rust/issues/70142
984                        .and_then(convert::identity)
985                        .map(Response::Committed)
986                }
987                .instrument(span)
988                .boxed()
989            }
990
991            // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
992            // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
993            Request::AwaitUtxo(outpoint) => {
994                // Prepare the AwaitUtxo future from PendingUxtos.
995                let response_fut = self.pending_utxos.queue(outpoint);
996                // Only instrument `response_fut`, the ReadStateService already
997                // instruments its requests with the same span.
998
999                let response_fut = response_fut.instrument(span).boxed();
1000
1001                // Check the non-finalized block queue outside the returned future,
1002                // so we can access mutable state fields.
1003                if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1004                    self.pending_utxos.respond(&outpoint, utxo);
1005
1006                    // We're finished, the returned future gets the UTXO from the respond() channel.
1007                    timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1008
1009                    return response_fut;
1010                }
1011
1012                // Check the sent non-finalized blocks
1013                if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1014                    self.pending_utxos.respond(&outpoint, utxo);
1015
1016                    // We're finished, the returned future gets the UTXO from the respond() channel.
1017                    timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1018
1019                    return response_fut;
1020                }
1021
1022                // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1023                // because it is only used during checkpoint verification.
1024                //
1025                // This creates a rare race condition, but it doesn't seem to happen much in practice.
1026                // See #5126 for details.
1027
1028                // Manually send a request to the ReadStateService,
1029                // to get UTXOs from any non-finalized chain or the finalized chain.
1030                let read_service = self.read_service.clone();
1031
1032                // Run the request in an async block, so we can await the response.
1033                async move {
1034                    let req = ReadRequest::AnyChainUtxo(outpoint);
1035
1036                    let rsp = read_service.oneshot(req).await?;
1037
1038                    // Optional TODO:
1039                    //  - make pending_utxos.respond() async using a channel,
1040                    //    so we can respond to all waiting requests here
1041                    //
1042                    // This change is not required for correctness, because:
1043                    // - any waiting requests should have returned when the block was sent to the state
1044                    // - otherwise, the request returns immediately if:
1045                    //   - the block is in the non-finalized queue, or
1046                    //   - the block is in any non-finalized chain or the finalized state
1047                    //
1048                    // And if the block is in the finalized queue,
1049                    // that's rare enough that a retry is ok.
1050                    if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1051                        // We got a UTXO, so we replace the response future with the result own.
1052                        timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1053
1054                        return Ok(Response::Utxo(utxo));
1055                    }
1056
1057                    // We're finished, but the returned future is waiting on the respond() channel.
1058                    timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1059
1060                    response_fut.await
1061                }
1062                .boxed()
1063            }
1064
1065            // Used by sync, inbound, and block verifier to check if a block is already in the state
1066            // before downloading or validating it.
1067            Request::KnownBlock(hash) => {
1068                let timer = CodeTimer::start();
1069
1070                let read_service = self.read_service.clone();
1071
1072                async move {
1073                    let response = read::non_finalized_state_contains_block_hash(
1074                        &read_service.latest_non_finalized_state(),
1075                        hash,
1076                    )
1077                    .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1078
1079                    // The work is done in the future.
1080                    timer.finish(module_path!(), line!(), "Request::KnownBlock");
1081
1082                    Ok(Response::KnownBlock(response))
1083                }
1084                .boxed()
1085            }
1086
1087            // Runs concurrently using the ReadStateService
1088            Request::Tip
1089            | Request::Depth(_)
1090            | Request::BestChainNextMedianTimePast
1091            | Request::BestChainBlockHash(_)
1092            | Request::BlockLocator
1093            | Request::Transaction(_)
1094            | Request::UnspentBestChainUtxo(_)
1095            | Request::Block(_)
1096            | Request::BlockAndSize(_)
1097            | Request::BlockHeader(_)
1098            | Request::FindBlockHashes { .. }
1099            | Request::FindBlockHeaders { .. }
1100            | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1101                // Redirect the request to the concurrent ReadStateService
1102                let read_service = self.read_service.clone();
1103
1104                async move {
1105                    let req = req
1106                        .try_into()
1107                        .expect("ReadRequest conversion should not fail");
1108
1109                    let rsp = read_service.oneshot(req).await?;
1110                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1111
1112                    Ok(rsp)
1113                }
1114                .boxed()
1115            }
1116
1117            Request::CheckBlockProposalValidity(_) => {
1118                // Redirect the request to the concurrent ReadStateService
1119                let read_service = self.read_service.clone();
1120
1121                async move {
1122                    let req = req
1123                        .try_into()
1124                        .expect("ReadRequest conversion should not fail");
1125
1126                    let rsp = read_service.oneshot(req).await?;
1127                    let rsp = rsp.try_into().expect("Response conversion should not fail");
1128
1129                    Ok(rsp)
1130                }
1131                .boxed()
1132            }
1133        }
1134    }
1135}
1136
1137impl Service<ReadRequest> for ReadStateService {
1138    type Response = ReadResponse;
1139    type Error = BoxError;
1140    type Future =
1141        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1142
1143    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1144        // Check for panics in the block write task
1145        //
1146        // TODO: move into a check_for_panics() method
1147        let block_write_task = self.block_write_task.take();
1148
1149        if let Some(block_write_task) = block_write_task {
1150            if block_write_task.is_finished() {
1151                if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1152                    // We are the last state with a reference to this task, so we can propagate any panics
1153                    if let Err(thread_panic) = block_write_task.join() {
1154                        std::panic::resume_unwind(thread_panic);
1155                    }
1156                }
1157            } else {
1158                // It hasn't finished, so we need to put it back
1159                self.block_write_task = Some(block_write_task);
1160            }
1161        }
1162
1163        self.db.check_for_panics();
1164
1165        Poll::Ready(Ok(()))
1166    }
1167
1168    #[instrument(name = "read_state", skip(self, req))]
1169    fn call(&mut self, req: ReadRequest) -> Self::Future {
1170        req.count_metric();
1171        let timer = CodeTimer::start();
1172        let span = Span::current();
1173
1174        match req {
1175            // Used by the `getblockchaininfo` RPC.
1176            ReadRequest::UsageInfo => {
1177                let db = self.db.clone();
1178
1179                tokio::task::spawn_blocking(move || {
1180                    span.in_scope(move || {
1181                        // The work is done in the future.
1182
1183                        let db_size = db.size();
1184
1185                        timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1186
1187                        Ok(ReadResponse::UsageInfo(db_size))
1188                    })
1189                })
1190                .wait_for_panics()
1191            }
1192
1193            // Used by the StateService.
1194            ReadRequest::Tip => {
1195                let state = self.clone();
1196
1197                tokio::task::spawn_blocking(move || {
1198                    span.in_scope(move || {
1199                        let tip = state.non_finalized_state_receiver.with_watch_data(
1200                            |non_finalized_state| {
1201                                read::tip(non_finalized_state.best_chain(), &state.db)
1202                            },
1203                        );
1204
1205                        // The work is done in the future.
1206                        timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1207
1208                        Ok(ReadResponse::Tip(tip))
1209                    })
1210                })
1211                .wait_for_panics()
1212            }
1213
1214            // Used by `getblockchaininfo` RPC method.
1215            ReadRequest::TipPoolValues => {
1216                let state = self.clone();
1217
1218                tokio::task::spawn_blocking(move || {
1219                    span.in_scope(move || {
1220                        let tip_with_value_balance = state
1221                            .non_finalized_state_receiver
1222                            .with_watch_data(|non_finalized_state| {
1223                                read::tip_with_value_balance(
1224                                    non_finalized_state.best_chain(),
1225                                    &state.db,
1226                                )
1227                            });
1228
1229                        // The work is done in the future.
1230                        // TODO: Do this in the Drop impl with the variant name?
1231                        timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1232
1233                        let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1234                            .ok_or(BoxError::from("no chain tip available yet"))?;
1235
1236                        Ok(ReadResponse::TipPoolValues {
1237                            tip_height,
1238                            tip_hash,
1239                            value_balance,
1240                        })
1241                    })
1242                })
1243                .wait_for_panics()
1244            }
1245
1246            // Used by getblock
1247            ReadRequest::BlockInfo(hash_or_height) => {
1248                let state = self.clone();
1249
1250                tokio::task::spawn_blocking(move || {
1251                    span.in_scope(move || {
1252                        let value_balance = state.non_finalized_state_receiver.with_watch_data(
1253                            |non_finalized_state| {
1254                                read::block_info(
1255                                    non_finalized_state.best_chain(),
1256                                    &state.db,
1257                                    hash_or_height,
1258                                )
1259                            },
1260                        );
1261
1262                        // The work is done in the future.
1263                        // TODO: Do this in the Drop impl with the variant name?
1264                        timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1265
1266                        Ok(ReadResponse::BlockInfo(value_balance))
1267                    })
1268                })
1269                .wait_for_panics()
1270            }
1271
1272            // Used by the StateService.
1273            ReadRequest::Depth(hash) => {
1274                let state = self.clone();
1275
1276                tokio::task::spawn_blocking(move || {
1277                    span.in_scope(move || {
1278                        let depth = state.non_finalized_state_receiver.with_watch_data(
1279                            |non_finalized_state| {
1280                                read::depth(non_finalized_state.best_chain(), &state.db, hash)
1281                            },
1282                        );
1283
1284                        // The work is done in the future.
1285                        timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1286
1287                        Ok(ReadResponse::Depth(depth))
1288                    })
1289                })
1290                .wait_for_panics()
1291            }
1292
1293            // Used by the StateService.
1294            ReadRequest::BestChainNextMedianTimePast => {
1295                let state = self.clone();
1296
1297                tokio::task::spawn_blocking(move || {
1298                    span.in_scope(move || {
1299                        let non_finalized_state = state.latest_non_finalized_state();
1300                        let median_time_past =
1301                            read::next_median_time_past(&non_finalized_state, &state.db);
1302
1303                        // The work is done in the future.
1304                        timer.finish(
1305                            module_path!(),
1306                            line!(),
1307                            "ReadRequest::BestChainNextMedianTimePast",
1308                        );
1309
1310                        Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1311                    })
1312                })
1313                .wait_for_panics()
1314            }
1315
1316            // Used by the get_block (raw) RPC and the StateService.
1317            ReadRequest::Block(hash_or_height) => {
1318                let state = self.clone();
1319
1320                tokio::task::spawn_blocking(move || {
1321                    span.in_scope(move || {
1322                        let block = state.non_finalized_state_receiver.with_watch_data(
1323                            |non_finalized_state| {
1324                                read::block(
1325                                    non_finalized_state.best_chain(),
1326                                    &state.db,
1327                                    hash_or_height,
1328                                )
1329                            },
1330                        );
1331
1332                        // The work is done in the future.
1333                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1334
1335                        Ok(ReadResponse::Block(block))
1336                    })
1337                })
1338                .wait_for_panics()
1339            }
1340
1341            // Used by the get_block (raw) RPC and the StateService.
1342            ReadRequest::BlockAndSize(hash_or_height) => {
1343                let state = self.clone();
1344
1345                tokio::task::spawn_blocking(move || {
1346                    span.in_scope(move || {
1347                        let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1348                            |non_finalized_state| {
1349                                read::block_and_size(
1350                                    non_finalized_state.best_chain(),
1351                                    &state.db,
1352                                    hash_or_height,
1353                                )
1354                            },
1355                        );
1356
1357                        // The work is done in the future.
1358                        timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1359
1360                        Ok(ReadResponse::BlockAndSize(block_and_size))
1361                    })
1362                })
1363                .wait_for_panics()
1364            }
1365
1366            // Used by the get_block (verbose) RPC and the StateService.
1367            ReadRequest::BlockHeader(hash_or_height) => {
1368                let state = self.clone();
1369
1370                tokio::task::spawn_blocking(move || {
1371                    span.in_scope(move || {
1372                        let best_chain = state.latest_best_chain();
1373
1374                        let height = hash_or_height
1375                            .height_or_else(|hash| {
1376                                read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1377                            })
1378                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1379
1380                        let hash = hash_or_height
1381                            .hash_or_else(|height| {
1382                                read::find::hash_by_height(best_chain.clone(), &state.db, height)
1383                            })
1384                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1385
1386                        let next_height = height.next()?;
1387                        let next_block_hash =
1388                            read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1389
1390                        let header = read::block_header(best_chain, &state.db, height.into())
1391                            .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1392
1393                        // The work is done in the future.
1394                        timer.finish(module_path!(), line!(), "ReadRequest::Block");
1395
1396                        Ok(ReadResponse::BlockHeader {
1397                            header,
1398                            hash,
1399                            height,
1400                            next_block_hash,
1401                        })
1402                    })
1403                })
1404                .wait_for_panics()
1405            }
1406
1407            // For the get_raw_transaction RPC and the StateService.
1408            ReadRequest::Transaction(hash) => {
1409                let state = self.clone();
1410
1411                tokio::task::spawn_blocking(move || {
1412                    span.in_scope(move || {
1413                        let response =
1414                            read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1415
1416                        // The work is done in the future.
1417                        timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1418
1419                        Ok(ReadResponse::Transaction(response))
1420                    })
1421                })
1422                .wait_for_panics()
1423            }
1424
1425            // Used by the getblock (verbose) RPC.
1426            ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1427                let state = self.clone();
1428
1429                tokio::task::spawn_blocking(move || {
1430                    span.in_scope(move || {
1431                        let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1432                            |non_finalized_state| {
1433                                read::transaction_hashes_for_block(
1434                                    non_finalized_state.best_chain(),
1435                                    &state.db,
1436                                    hash_or_height,
1437                                )
1438                            },
1439                        );
1440
1441                        // The work is done in the future.
1442                        timer.finish(
1443                            module_path!(),
1444                            line!(),
1445                            "ReadRequest::TransactionIdsForBlock",
1446                        );
1447
1448                        Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1449                    })
1450                })
1451                .wait_for_panics()
1452            }
1453
1454            #[cfg(feature = "indexer")]
1455            ReadRequest::SpendingTransactionId(spend) => {
1456                let state = self.clone();
1457
1458                tokio::task::spawn_blocking(move || {
1459                    span.in_scope(move || {
1460                        let spending_transaction_id = state
1461                            .non_finalized_state_receiver
1462                            .with_watch_data(|non_finalized_state| {
1463                                read::spending_transaction_hash(
1464                                    non_finalized_state.best_chain(),
1465                                    &state.db,
1466                                    spend,
1467                                )
1468                            });
1469
1470                        // The work is done in the future.
1471                        timer.finish(
1472                            module_path!(),
1473                            line!(),
1474                            "ReadRequest::TransactionIdForSpentOutPoint",
1475                        );
1476
1477                        Ok(ReadResponse::TransactionId(spending_transaction_id))
1478                    })
1479                })
1480                .wait_for_panics()
1481            }
1482
1483            ReadRequest::UnspentBestChainUtxo(outpoint) => {
1484                let state = self.clone();
1485
1486                tokio::task::spawn_blocking(move || {
1487                    span.in_scope(move || {
1488                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1489                            |non_finalized_state| {
1490                                read::unspent_utxo(
1491                                    non_finalized_state.best_chain(),
1492                                    &state.db,
1493                                    outpoint,
1494                                )
1495                            },
1496                        );
1497
1498                        // The work is done in the future.
1499                        timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1500
1501                        Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1502                    })
1503                })
1504                .wait_for_panics()
1505            }
1506
1507            // Manually used by the StateService to implement part of AwaitUtxo.
1508            ReadRequest::AnyChainUtxo(outpoint) => {
1509                let state = self.clone();
1510
1511                tokio::task::spawn_blocking(move || {
1512                    span.in_scope(move || {
1513                        let utxo = state.non_finalized_state_receiver.with_watch_data(
1514                            |non_finalized_state| {
1515                                read::any_utxo(non_finalized_state, &state.db, outpoint)
1516                            },
1517                        );
1518
1519                        // The work is done in the future.
1520                        timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1521
1522                        Ok(ReadResponse::AnyChainUtxo(utxo))
1523                    })
1524                })
1525                .wait_for_panics()
1526            }
1527
1528            // Used by the StateService.
1529            ReadRequest::BlockLocator => {
1530                let state = self.clone();
1531
1532                tokio::task::spawn_blocking(move || {
1533                    span.in_scope(move || {
1534                        let block_locator = state.non_finalized_state_receiver.with_watch_data(
1535                            |non_finalized_state| {
1536                                read::block_locator(non_finalized_state.best_chain(), &state.db)
1537                            },
1538                        );
1539
1540                        // The work is done in the future.
1541                        timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1542
1543                        Ok(ReadResponse::BlockLocator(
1544                            block_locator.unwrap_or_default(),
1545                        ))
1546                    })
1547                })
1548                .wait_for_panics()
1549            }
1550
1551            // Used by the StateService.
1552            ReadRequest::FindBlockHashes { known_blocks, stop } => {
1553                let state = self.clone();
1554
1555                tokio::task::spawn_blocking(move || {
1556                    span.in_scope(move || {
1557                        let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1558                            |non_finalized_state| {
1559                                read::find_chain_hashes(
1560                                    non_finalized_state.best_chain(),
1561                                    &state.db,
1562                                    known_blocks,
1563                                    stop,
1564                                    MAX_FIND_BLOCK_HASHES_RESULTS,
1565                                )
1566                            },
1567                        );
1568
1569                        // The work is done in the future.
1570                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1571
1572                        Ok(ReadResponse::BlockHashes(block_hashes))
1573                    })
1574                })
1575                .wait_for_panics()
1576            }
1577
1578            // Used by the StateService.
1579            ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1580                let state = self.clone();
1581
1582                tokio::task::spawn_blocking(move || {
1583                    span.in_scope(move || {
1584                        let block_headers = state.non_finalized_state_receiver.with_watch_data(
1585                            |non_finalized_state| {
1586                                read::find_chain_headers(
1587                                    non_finalized_state.best_chain(),
1588                                    &state.db,
1589                                    known_blocks,
1590                                    stop,
1591                                    MAX_FIND_BLOCK_HEADERS_RESULTS,
1592                                )
1593                            },
1594                        );
1595
1596                        let block_headers = block_headers
1597                            .into_iter()
1598                            .map(|header| CountedHeader { header })
1599                            .collect();
1600
1601                        // The work is done in the future.
1602                        timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1603
1604                        Ok(ReadResponse::BlockHeaders(block_headers))
1605                    })
1606                })
1607                .wait_for_panics()
1608            }
1609
1610            ReadRequest::SaplingTree(hash_or_height) => {
1611                let state = self.clone();
1612
1613                tokio::task::spawn_blocking(move || {
1614                    span.in_scope(move || {
1615                        let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1616                            |non_finalized_state| {
1617                                read::sapling_tree(
1618                                    non_finalized_state.best_chain(),
1619                                    &state.db,
1620                                    hash_or_height,
1621                                )
1622                            },
1623                        );
1624
1625                        // The work is done in the future.
1626                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1627
1628                        Ok(ReadResponse::SaplingTree(sapling_tree))
1629                    })
1630                })
1631                .wait_for_panics()
1632            }
1633
1634            ReadRequest::OrchardTree(hash_or_height) => {
1635                let state = self.clone();
1636
1637                tokio::task::spawn_blocking(move || {
1638                    span.in_scope(move || {
1639                        let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1640                            |non_finalized_state| {
1641                                read::orchard_tree(
1642                                    non_finalized_state.best_chain(),
1643                                    &state.db,
1644                                    hash_or_height,
1645                                )
1646                            },
1647                        );
1648
1649                        // The work is done in the future.
1650                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1651
1652                        Ok(ReadResponse::OrchardTree(orchard_tree))
1653                    })
1654                })
1655                .wait_for_panics()
1656            }
1657
1658            ReadRequest::SaplingSubtrees { start_index, limit } => {
1659                let state = self.clone();
1660
1661                tokio::task::spawn_blocking(move || {
1662                    span.in_scope(move || {
1663                        let end_index = limit
1664                            .and_then(|limit| start_index.0.checked_add(limit.0))
1665                            .map(NoteCommitmentSubtreeIndex);
1666
1667                        let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1668                            |non_finalized_state| {
1669                                if let Some(end_index) = end_index {
1670                                    read::sapling_subtrees(
1671                                        non_finalized_state.best_chain(),
1672                                        &state.db,
1673                                        start_index..end_index,
1674                                    )
1675                                } else {
1676                                    // If there is no end bound, just return all the trees.
1677                                    // If the end bound would overflow, just returns all the trees, because that's what
1678                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1679                                    // the trees run out.)
1680                                    read::sapling_subtrees(
1681                                        non_finalized_state.best_chain(),
1682                                        &state.db,
1683                                        start_index..,
1684                                    )
1685                                }
1686                            },
1687                        );
1688
1689                        // The work is done in the future.
1690                        timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1691
1692                        Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1693                    })
1694                })
1695                .wait_for_panics()
1696            }
1697
1698            ReadRequest::OrchardSubtrees { start_index, limit } => {
1699                let state = self.clone();
1700
1701                tokio::task::spawn_blocking(move || {
1702                    span.in_scope(move || {
1703                        let end_index = limit
1704                            .and_then(|limit| start_index.0.checked_add(limit.0))
1705                            .map(NoteCommitmentSubtreeIndex);
1706
1707                        let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1708                            |non_finalized_state| {
1709                                if let Some(end_index) = end_index {
1710                                    read::orchard_subtrees(
1711                                        non_finalized_state.best_chain(),
1712                                        &state.db,
1713                                        start_index..end_index,
1714                                    )
1715                                } else {
1716                                    // If there is no end bound, just return all the trees.
1717                                    // If the end bound would overflow, just returns all the trees, because that's what
1718                                    // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1719                                    // the trees run out.)
1720                                    read::orchard_subtrees(
1721                                        non_finalized_state.best_chain(),
1722                                        &state.db,
1723                                        start_index..,
1724                                    )
1725                                }
1726                            },
1727                        );
1728
1729                        // The work is done in the future.
1730                        timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1731
1732                        Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1733                    })
1734                })
1735                .wait_for_panics()
1736            }
1737
1738            // For the get_address_balance RPC.
1739            ReadRequest::AddressBalance(addresses) => {
1740                let state = self.clone();
1741
1742                tokio::task::spawn_blocking(move || {
1743                    span.in_scope(move || {
1744                        let (balance, received) = state
1745                            .non_finalized_state_receiver
1746                            .with_watch_data(|non_finalized_state| {
1747                                read::transparent_balance(
1748                                    non_finalized_state.best_chain().cloned(),
1749                                    &state.db,
1750                                    addresses,
1751                                )
1752                            })?;
1753
1754                        // The work is done in the future.
1755                        timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1756
1757                        Ok(ReadResponse::AddressBalance { balance, received })
1758                    })
1759                })
1760                .wait_for_panics()
1761            }
1762
1763            // For the get_address_tx_ids RPC.
1764            ReadRequest::TransactionIdsByAddresses {
1765                addresses,
1766                height_range,
1767            } => {
1768                let state = self.clone();
1769
1770                tokio::task::spawn_blocking(move || {
1771                    span.in_scope(move || {
1772                        let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1773                            |non_finalized_state| {
1774                                read::transparent_tx_ids(
1775                                    non_finalized_state.best_chain(),
1776                                    &state.db,
1777                                    addresses,
1778                                    height_range,
1779                                )
1780                            },
1781                        );
1782
1783                        // The work is done in the future.
1784                        timer.finish(
1785                            module_path!(),
1786                            line!(),
1787                            "ReadRequest::TransactionIdsByAddresses",
1788                        );
1789
1790                        tx_ids.map(ReadResponse::AddressesTransactionIds)
1791                    })
1792                })
1793                .wait_for_panics()
1794            }
1795
1796            // For the get_address_utxos RPC.
1797            ReadRequest::UtxosByAddresses(addresses) => {
1798                let state = self.clone();
1799
1800                tokio::task::spawn_blocking(move || {
1801                    span.in_scope(move || {
1802                        let utxos = state.non_finalized_state_receiver.with_watch_data(
1803                            |non_finalized_state| {
1804                                read::address_utxos(
1805                                    &state.network,
1806                                    non_finalized_state.best_chain(),
1807                                    &state.db,
1808                                    addresses,
1809                                )
1810                            },
1811                        );
1812
1813                        // The work is done in the future.
1814                        timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1815
1816                        utxos.map(ReadResponse::AddressUtxos)
1817                    })
1818                })
1819                .wait_for_panics()
1820            }
1821
1822            ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1823                let state = self.clone();
1824
1825                tokio::task::spawn_blocking(move || {
1826                    span.in_scope(move || {
1827                        let latest_non_finalized_best_chain =
1828                            state.latest_non_finalized_state().best_chain().cloned();
1829
1830                        check::nullifier::tx_no_duplicates_in_chain(
1831                            &state.db,
1832                            latest_non_finalized_best_chain.as_ref(),
1833                            &unmined_tx.transaction,
1834                        )?;
1835
1836                        check::anchors::tx_anchors_refer_to_final_treestates(
1837                            &state.db,
1838                            latest_non_finalized_best_chain.as_ref(),
1839                            &unmined_tx,
1840                        )?;
1841
1842                        // The work is done in the future.
1843                        timer.finish(
1844                            module_path!(),
1845                            line!(),
1846                            "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1847                        );
1848
1849                        Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1850                    })
1851                })
1852                .wait_for_panics()
1853            }
1854
1855            // Used by the get_block and get_block_hash RPCs.
1856            ReadRequest::BestChainBlockHash(height) => {
1857                let state = self.clone();
1858
1859                // # Performance
1860                //
1861                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1862
1863                tokio::task::spawn_blocking(move || {
1864                    span.in_scope(move || {
1865                        let hash = state.non_finalized_state_receiver.with_watch_data(
1866                            |non_finalized_state| {
1867                                read::hash_by_height(
1868                                    non_finalized_state.best_chain(),
1869                                    &state.db,
1870                                    height,
1871                                )
1872                            },
1873                        );
1874
1875                        // The work is done in the future.
1876                        timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
1877
1878                        Ok(ReadResponse::BlockHash(hash))
1879                    })
1880                })
1881                .wait_for_panics()
1882            }
1883
1884            // Used by get_block_template and getblockchaininfo RPCs.
1885            ReadRequest::ChainInfo => {
1886                let state = self.clone();
1887                let latest_non_finalized_state = self.latest_non_finalized_state();
1888
1889                // # Performance
1890                //
1891                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1892
1893                tokio::task::spawn_blocking(move || {
1894                    span.in_scope(move || {
1895                        // # Correctness
1896                        //
1897                        // It is ok to do these lookups using multiple database calls. Finalized state updates
1898                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
1899                        //
1900                        // If there is a large overlap between the non-finalized and finalized states,
1901                        // where the finalized tip is above the non-finalized tip,
1902                        // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1903                        //
1904                        // In that case, the `getblocktemplate` RPC will return an error because Zebra
1905                        // is not synced to the tip. That check happens before the RPC makes this request.
1906                        let get_block_template_info =
1907                            read::difficulty::get_block_template_chain_info(
1908                                &latest_non_finalized_state,
1909                                &state.db,
1910                                &state.network,
1911                            );
1912
1913                        // The work is done in the future.
1914                        timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
1915
1916                        get_block_template_info.map(ReadResponse::ChainInfo)
1917                    })
1918                })
1919                .wait_for_panics()
1920            }
1921
1922            // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1923            ReadRequest::SolutionRate { num_blocks, height } => {
1924                let state = self.clone();
1925
1926                // # Performance
1927                //
1928                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1929
1930                tokio::task::spawn_blocking(move || {
1931                    span.in_scope(move || {
1932                        let latest_non_finalized_state = state.latest_non_finalized_state();
1933                        // # Correctness
1934                        //
1935                        // It is ok to do these lookups using multiple database calls. Finalized state updates
1936                        // can only add overlapping blocks, and block hashes are unique across all chain forks.
1937                        //
1938                        // The worst that can happen here is that the default `start_hash` will be below
1939                        // the chain tip.
1940                        let (tip_height, tip_hash) =
1941                            match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1942                                Some(tip_hash) => tip_hash,
1943                                None => return Ok(ReadResponse::SolutionRate(None)),
1944                            };
1945
1946                        let start_hash = match height {
1947                            Some(height) if height < tip_height => read::hash_by_height(
1948                                latest_non_finalized_state.best_chain(),
1949                                &state.db,
1950                                height,
1951                            ),
1952                            // use the chain tip hash if height is above it or not provided.
1953                            _ => Some(tip_hash),
1954                        };
1955
1956                        let solution_rate = start_hash.and_then(|start_hash| {
1957                            read::difficulty::solution_rate(
1958                                &latest_non_finalized_state,
1959                                &state.db,
1960                                num_blocks,
1961                                start_hash,
1962                            )
1963                        });
1964
1965                        // The work is done in the future.
1966                        timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
1967
1968                        Ok(ReadResponse::SolutionRate(solution_rate))
1969                    })
1970                })
1971                .wait_for_panics()
1972            }
1973
1974            ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1975                let state = self.clone();
1976
1977                // # Performance
1978                //
1979                // Allow other async tasks to make progress while concurrently reading blocks from disk.
1980
1981                tokio::task::spawn_blocking(move || {
1982                    span.in_scope(move || {
1983                        tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
1984                        let mut latest_non_finalized_state = state.latest_non_finalized_state();
1985
1986                        // The previous block of a valid proposal must be on the best chain tip.
1987                        let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
1988                            return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
1989                        };
1990
1991                        if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1992                            return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
1993                        }
1994
1995                        // This clone of the non-finalized state is dropped when this closure returns.
1996                        // The non-finalized state that's used in the rest of the state (including finalizing
1997                        // blocks into the db) is not mutated here.
1998                        //
1999                        // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2000                        latest_non_finalized_state.disable_metrics();
2001
2002                        write::validate_and_commit_non_finalized(
2003                            &state.db,
2004                            &mut latest_non_finalized_state,
2005                            semantically_verified,
2006                        )?;
2007
2008                        // The work is done in the future.
2009                        timer.finish(
2010                            module_path!(),
2011                            line!(),
2012                            "ReadRequest::CheckBlockProposalValidity",
2013                        );
2014
2015                        Ok(ReadResponse::ValidBlockProposal)
2016                    })
2017                })
2018                .wait_for_panics()
2019            }
2020
2021            ReadRequest::TipBlockSize => {
2022                let state = self.clone();
2023
2024                tokio::task::spawn_blocking(move || {
2025                    span.in_scope(move || {
2026                        // Get the best chain tip height.
2027                        let tip_height = state
2028                            .non_finalized_state_receiver
2029                            .with_watch_data(|non_finalized_state| {
2030                                read::tip_height(non_finalized_state.best_chain(), &state.db)
2031                            })
2032                            .unwrap_or(Height(0));
2033
2034                        // Get the block at the best chain tip height.
2035                        let block = state.non_finalized_state_receiver.with_watch_data(
2036                            |non_finalized_state| {
2037                                read::block(
2038                                    non_finalized_state.best_chain(),
2039                                    &state.db,
2040                                    tip_height.into(),
2041                                )
2042                            },
2043                        );
2044
2045                        // The work is done in the future.
2046                        timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2047
2048                        // Respond with the length of the obtained block if any.
2049                        match block {
2050                            Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2051                                b.zcash_serialize_to_vec()?.len(),
2052                            ))),
2053                            None => Ok(ReadResponse::TipBlockSize(None)),
2054                        }
2055                    })
2056                })
2057                .wait_for_panics()
2058            }
2059        }
2060    }
2061}
2062
2063/// Initialize a state service from the provided [`Config`].
2064/// Returns a boxed state service, a read-only state service,
2065/// and receivers for state chain tip updates.
2066///
2067/// Each `network` has its own separate on-disk database.
2068///
2069/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2070/// to work out when it is near the final checkpoint.
2071///
2072/// To share access to the state, wrap the returned service in a `Buffer`,
2073/// or clone the returned [`ReadStateService`].
2074///
2075/// It's possible to construct multiple state services in the same application (as
2076/// long as they, e.g., use different storage locations), but doing so is
2077/// probably not what you want.
2078pub fn init(
2079    config: Config,
2080    network: &Network,
2081    max_checkpoint_height: block::Height,
2082    checkpoint_verify_concurrency_limit: usize,
2083) -> (
2084    BoxService<Request, Response, BoxError>,
2085    ReadStateService,
2086    LatestChainTip,
2087    ChainTipChange,
2088) {
2089    let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2090        StateService::new(
2091            config,
2092            network,
2093            max_checkpoint_height,
2094            checkpoint_verify_concurrency_limit,
2095        );
2096
2097    (
2098        BoxService::new(state_service),
2099        read_only_state_service,
2100        latest_chain_tip,
2101        chain_tip_change,
2102    )
2103}
2104
2105/// Initialize a read state service from the provided [`Config`].
2106/// Returns a read-only state service,
2107///
2108/// Each `network` has its own separate on-disk database.
2109///
2110/// To share access to the state, clone the returned [`ReadStateService`].
2111pub fn init_read_only(
2112    config: Config,
2113    network: &Network,
2114) -> (
2115    ReadStateService,
2116    ZebraDb,
2117    tokio::sync::watch::Sender<NonFinalizedState>,
2118) {
2119    let finalized_state = FinalizedState::new_with_debug(
2120        &config,
2121        network,
2122        true,
2123        #[cfg(feature = "elasticsearch")]
2124        false,
2125        true,
2126    );
2127    let (non_finalized_state_sender, non_finalized_state_receiver) =
2128        tokio::sync::watch::channel(NonFinalizedState::new(network));
2129
2130    (
2131        ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
2132        finalized_state.db.clone(),
2133        non_finalized_state_sender,
2134    )
2135}
2136
2137/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2138/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2139pub fn spawn_init_read_only(
2140    config: Config,
2141    network: &Network,
2142) -> tokio::task::JoinHandle<(
2143    ReadStateService,
2144    ZebraDb,
2145    tokio::sync::watch::Sender<NonFinalizedState>,
2146)> {
2147    let network = network.clone();
2148    tokio::task::spawn_blocking(move || init_read_only(config, &network))
2149}
2150
2151/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
2152/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
2153/// a read state service, and receivers for state chain tip updates.
2154pub fn spawn_init(
2155    config: Config,
2156    network: &Network,
2157    max_checkpoint_height: block::Height,
2158    checkpoint_verify_concurrency_limit: usize,
2159) -> tokio::task::JoinHandle<(
2160    BoxService<Request, Response, BoxError>,
2161    ReadStateService,
2162    LatestChainTip,
2163    ChainTipChange,
2164)> {
2165    let network = network.clone();
2166    tokio::task::spawn_blocking(move || {
2167        init(
2168            config,
2169            &network,
2170            max_checkpoint_height,
2171            checkpoint_verify_concurrency_limit,
2172        )
2173    })
2174}
2175
2176/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2177///
2178/// This can be used to create a state service for testing. See also [`init`].
2179#[cfg(any(test, feature = "proptest-impl"))]
2180pub fn init_test(network: &Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2181    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2182    //       if we ever need to test final checkpoint sent UTXO queries
2183    let (state_service, _, _, _) =
2184        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2185
2186    Buffer::new(BoxService::new(state_service), 1)
2187}
2188
2189/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2190/// then returns the read-write service, read-only service, and tip watch channels.
2191///
2192/// This can be used to create a state service for testing. See also [`init`].
2193#[cfg(any(test, feature = "proptest-impl"))]
2194pub fn init_test_services(
2195    network: &Network,
2196) -> (
2197    Buffer<BoxService<Request, Response, BoxError>, Request>,
2198    ReadStateService,
2199    LatestChainTip,
2200    ChainTipChange,
2201) {
2202    // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2203    //       if we ever need to test final checkpoint sent UTXO queries
2204    let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2205        StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2206
2207    let state_service = Buffer::new(BoxService::new(state_service), 1);
2208
2209    (
2210        state_service,
2211        read_state_service,
2212        latest_chain_tip,
2213        chain_tip_change,
2214    )
2215}