zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 convert,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24 time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::{oneshot, watch};
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36 block::{self, CountedHeader, HeightDiff},
37 diagnostic::{task::WaitForPanics, CodeTimer},
38 parameters::{Network, NetworkUpgrade},
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45 constants::{
46 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47 },
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 watch_receiver::WatchReceiver,
57 },
58 BoxError, CheckpointVerifiedBlock, CloneError, Config, ReadRequest, ReadResponse, Request,
59 Response, SemanticallyVerifiedBlock,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod write;
74
75#[cfg(any(test, feature = "proptest-impl"))]
76pub mod arbitrary;
77
78#[cfg(test)]
79mod tests;
80
81pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
82use write::NonFinalizedWriteMessage;
83
84use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
85
86/// A read-write service for Zebra's cached blockchain state.
87///
88/// This service modifies and provides access to:
89/// - the non-finalized state: the ~100 most recent blocks.
90/// Zebra allows chain forks in the non-finalized state,
91/// stores it in memory, and re-downloads it when restarted.
92/// - the finalized state: older blocks that have many confirmations.
93/// Zebra stores the single best chain in the finalized state,
94/// and re-loads it from disk when restarted.
95///
96/// Read requests to this service are buffered, then processed concurrently.
97/// Block write requests are buffered, then queued, then processed in order by a separate task.
98///
99/// Most state users can get faster read responses using the [`ReadStateService`],
100/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
101///
102/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
103/// They can read the latest block directly, without queueing any requests.
104#[derive(Debug)]
105pub(crate) struct StateService {
106 // Configuration
107 //
108 /// The configured Zcash network.
109 network: Network,
110
111 /// The height that we start storing UTXOs from finalized blocks.
112 ///
113 /// This height should be lower than the last few checkpoints,
114 /// so the full verifier can verify UTXO spends from those blocks,
115 /// even if they haven't been committed to the finalized state yet.
116 full_verifier_utxo_lookahead: block::Height,
117
118 // Queued Blocks
119 //
120 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
121 /// These blocks are awaiting their parent blocks before they can do contextual verification.
122 non_finalized_state_queued_blocks: QueuedBlocks,
123
124 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
125 /// These blocks are awaiting their parent blocks before they can do contextual verification.
126 ///
127 /// Indexed by their parent block hash.
128 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
129
130 /// Channels to send blocks to the block write task.
131 block_write_sender: write::BlockWriteSender,
132
133 /// The [`block::Hash`] of the most recent block sent on
134 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
135 ///
136 /// On startup, this is:
137 /// - the finalized tip, if there are stored blocks, or
138 /// - the genesis block's parent hash, if the database is empty.
139 ///
140 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
141 /// - the hash of the last valid committed block (the parent of the invalid block).
142 finalized_block_write_last_sent_hash: block::Hash,
143
144 /// A set of block hashes that have been sent to the block write task.
145 /// Hashes of blocks below the finalized tip height are periodically pruned.
146 non_finalized_block_write_sent_hashes: SentHashes,
147
148 /// If an invalid block is sent on `finalized_block_write_sender`
149 /// or `non_finalized_block_write_sender`,
150 /// this channel gets the [`block::Hash`] of the valid tip.
151 //
152 // TODO: add tests for finalized and non-finalized resets (#2654)
153 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
154
155 // Pending UTXO Request Tracking
156 //
157 /// The set of outpoints with pending requests for their associated transparent::Output.
158 pending_utxos: PendingUtxos,
159
160 /// Instant tracking the last time `pending_utxos` was pruned.
161 last_prune: Instant,
162
163 // Updating Concurrently Readable State
164 //
165 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
166 ///
167 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
168 read_service: ReadStateService,
169
170 // Metrics
171 //
172 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
173 ///
174 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
175 /// as a break in the graph.
176 max_finalized_queue_height: f64,
177}
178
179/// A read-only service for accessing Zebra's cached blockchain state.
180///
181/// This service provides read-only access to:
182/// - the non-finalized state: the ~100 most recent blocks.
183/// - the finalized state: older blocks that have many confirmations.
184///
185/// Requests to this service are processed in parallel,
186/// ignoring any blocks queued by the read-write [`StateService`].
187///
188/// This quick response behavior is better for most state users.
189/// It allows other async tasks to make progress while concurrently reading data from disk.
190#[derive(Clone, Debug)]
191pub struct ReadStateService {
192 // Configuration
193 //
194 /// The configured Zcash network.
195 network: Network,
196
197 // Shared Concurrently Readable State
198 //
199 /// A watch channel with a cached copy of the [`NonFinalizedState`].
200 ///
201 /// This state is only updated between requests,
202 /// so it might include some block data that is also on `disk`.
203 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
204
205 /// The shared inner on-disk database for the finalized state.
206 ///
207 /// RocksDB allows reads and writes via a shared reference,
208 /// but [`ZebraDb`] doesn't expose any write methods or types.
209 ///
210 /// This chain is updated concurrently with requests,
211 /// so it might include some block data that is also in `best_mem`.
212 db: ZebraDb,
213
214 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
215 /// once the queues have received all their parent blocks.
216 ///
217 /// Used to check for panics when writing blocks.
218 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
219}
220
221impl Drop for StateService {
222 fn drop(&mut self) {
223 // The state service owns the state, tasks, and channels,
224 // so dropping it should shut down everything.
225
226 // Close the channels (non-blocking)
227 // This makes the block write thread exit the next time it checks the channels.
228 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
229 self.invalid_block_write_reset_receiver.close();
230
231 std::mem::drop(self.block_write_sender.finalized.take());
232 std::mem::drop(self.block_write_sender.non_finalized.take());
233
234 self.clear_finalized_block_queue(
235 "dropping the state: dropped unused finalized state queue block",
236 );
237 self.clear_non_finalized_block_queue(
238 "dropping the state: dropped unused non-finalized state queue block",
239 );
240
241 // Log database metrics before shutting down
242 info!("dropping the state: logging database metrics");
243 self.log_db_metrics();
244
245 // Then drop self.read_service, which checks the block write task for panics,
246 // and tries to shut down the database.
247 }
248}
249
250impl Drop for ReadStateService {
251 fn drop(&mut self) {
252 // The read state service shares the state,
253 // so dropping it should check if we can shut down.
254
255 // TODO: move this into a try_shutdown() method
256 if let Some(block_write_task) = self.block_write_task.take() {
257 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258 // We're the last database user, so we can tell it to shut down (blocking):
259 // - flushes the database to disk, and
260 // - drops the database, which cleans up any database tasks correctly.
261 self.db.shutdown(true);
262
263 // We are the last state with a reference to this thread, so we can
264 // wait until the block write task finishes, then check for panics (blocking).
265 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267 // This log is verbose during tests.
268 #[cfg(not(test))]
269 info!("waiting for the block write task to finish");
270 #[cfg(test)]
271 debug!("waiting for the block write task to finish");
272
273 // TODO: move this into a check_for_panics() method
274 if let Err(thread_panic) = block_write_task_handle.join() {
275 std::panic::resume_unwind(thread_panic);
276 } else {
277 debug!("shutting down the state because the block write task has finished");
278 }
279 }
280 } else {
281 // Even if we're not the last database user, try shutting it down.
282 //
283 // TODO: rename this to try_shutdown()?
284 self.db.shutdown(false);
285 }
286 }
287}
288
289impl StateService {
290 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292 /// Creates a new state service for the state `config` and `network`.
293 ///
294 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295 /// to work out when it is near the final checkpoint.
296 ///
297 /// Returns the read-write and read-only state services,
298 /// and read-only watch channels for its best chain tip.
299 pub fn new(
300 config: Config,
301 network: &Network,
302 max_checkpoint_height: block::Height,
303 checkpoint_verify_concurrency_limit: usize,
304 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305 let timer = CodeTimer::start();
306 let finalized_state = FinalizedState::new(
307 &config,
308 network,
309 #[cfg(feature = "elasticsearch")]
310 true,
311 );
312 timer.finish(module_path!(), line!(), "opening finalized state database");
313
314 let timer = CodeTimer::start();
315 let initial_tip = finalized_state
316 .db
317 .tip_block()
318 .map(CheckpointVerifiedBlock::from)
319 .map(ChainTipBlock::from);
320
321 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
322 ChainTipSender::new(initial_tip, network);
323
324 let non_finalized_state = NonFinalizedState::new(network);
325
326 let (non_finalized_state_sender, non_finalized_state_receiver) =
327 watch::channel(NonFinalizedState::new(&finalized_state.network()));
328
329 let finalized_state_for_writing = finalized_state.clone();
330 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
331 write::BlockWriteSender::spawn(
332 finalized_state_for_writing,
333 non_finalized_state,
334 chain_tip_sender,
335 non_finalized_state_sender,
336 );
337
338 let read_service = ReadStateService::new(
339 &finalized_state,
340 block_write_task,
341 non_finalized_state_receiver,
342 );
343
344 let full_verifier_utxo_lookahead = max_checkpoint_height
345 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
346 .expect("fits in HeightDiff");
347 let full_verifier_utxo_lookahead =
348 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
349 let non_finalized_state_queued_blocks = QueuedBlocks::default();
350 let pending_utxos = PendingUtxos::default();
351
352 let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash();
353
354 let state = Self {
355 network: network.clone(),
356 full_verifier_utxo_lookahead,
357 non_finalized_state_queued_blocks,
358 finalized_state_queued_blocks: HashMap::new(),
359 block_write_sender,
360 finalized_block_write_last_sent_hash,
361 non_finalized_block_write_sent_hashes: SentHashes::default(),
362 invalid_block_write_reset_receiver,
363 pending_utxos,
364 last_prune: Instant::now(),
365 read_service: read_service.clone(),
366 max_finalized_queue_height: f64::NAN,
367 };
368 timer.finish(module_path!(), line!(), "initializing state service");
369
370 tracing::info!("starting legacy chain check");
371 let timer = CodeTimer::start();
372
373 if let (Some(tip), Some(nu5_activation_height)) = (
374 state.best_tip(),
375 NetworkUpgrade::Nu5.activation_height(network),
376 ) {
377 if let Err(error) = check::legacy_chain(
378 nu5_activation_height,
379 any_ancestor_blocks(
380 &state.read_service.latest_non_finalized_state(),
381 &state.read_service.db,
382 tip.1,
383 ),
384 &state.network,
385 MAX_LEGACY_CHAIN_BLOCKS,
386 ) {
387 let legacy_db_path = state.read_service.db.path().to_path_buf();
388 panic!(
389 "Cached state contains a legacy chain.\n\
390 An outdated Zebra version did not know about a recent network upgrade,\n\
391 so it followed a legacy chain using outdated consensus branch rules.\n\
392 Hint: Delete your database, and restart Zebra to do a full sync.\n\
393 Database path: {legacy_db_path:?}\n\
394 Error: {error:?}",
395 );
396 }
397 }
398
399 tracing::info!("cached state consensus branch is valid: no legacy chain found");
400 timer.finish(module_path!(), line!(), "legacy chain check");
401
402 (state, read_service, latest_chain_tip, chain_tip_change)
403 }
404
405 /// Call read only state service to log rocksdb database metrics.
406 pub fn log_db_metrics(&self) {
407 self.read_service.db.print_db_metrics();
408 }
409
410 /// Queue a checkpoint verified block for verification and storage in the finalized state.
411 ///
412 /// Returns a channel receiver that provides the result of the block commit.
413 fn queue_and_commit_to_finalized_state(
414 &mut self,
415 checkpoint_verified: CheckpointVerifiedBlock,
416 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
417 // # Correctness & Performance
418 //
419 // This method must not block, access the database, or perform CPU-intensive tasks,
420 // because it is called directly from the tokio executor's Future threads.
421
422 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
423 let queued_height = checkpoint_verified.height;
424
425 // If we're close to the final checkpoint, make the block's UTXOs available for
426 // semantic block verification, even when it is in the channel.
427 if self.is_close_to_final_checkpoint(queued_height) {
428 self.non_finalized_block_write_sent_hashes
429 .add_finalized(&checkpoint_verified)
430 }
431
432 let (rsp_tx, rsp_rx) = oneshot::channel();
433 let queued = (checkpoint_verified, rsp_tx);
434
435 if self.block_write_sender.finalized.is_some() {
436 // We're still committing checkpoint verified blocks
437 if let Some(duplicate_queued) = self
438 .finalized_state_queued_blocks
439 .insert(queued_prev_hash, queued)
440 {
441 Self::send_checkpoint_verified_block_error(
442 duplicate_queued,
443 "dropping older checkpoint verified block: got newer duplicate block",
444 );
445 }
446
447 self.drain_finalized_queue_and_commit();
448 } else {
449 // We've finished committing checkpoint verified blocks to the finalized state,
450 // so drop any repeated queued blocks, and return an error.
451 //
452 // TODO: track the latest sent height, and drop any blocks under that height
453 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
454 Self::send_checkpoint_verified_block_error(
455 queued,
456 "already finished committing checkpoint verified blocks: dropped duplicate block, \
457 block is already committed to the state",
458 );
459
460 self.clear_finalized_block_queue(
461 "already finished committing checkpoint verified blocks: dropped duplicate block, \
462 block is already committed to the state",
463 );
464 }
465
466 if self.finalized_state_queued_blocks.is_empty() {
467 self.max_finalized_queue_height = f64::NAN;
468 } else if self.max_finalized_queue_height.is_nan()
469 || self.max_finalized_queue_height < queued_height.0 as f64
470 {
471 // if there are still blocks in the queue, then either:
472 // - the new block was lower than the old maximum, and there was a gap before it,
473 // so the maximum is still the same (and we skip this code), or
474 // - the new block is higher than the old maximum, and there is at least one gap
475 // between the finalized tip and the new maximum
476 self.max_finalized_queue_height = queued_height.0 as f64;
477 }
478
479 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
480 metrics::gauge!("state.checkpoint.queued.block.count")
481 .set(self.finalized_state_queued_blocks.len() as f64);
482
483 rsp_rx
484 }
485
486 /// Finds finalized state queue blocks to be committed to the state in order,
487 /// removes them from the queue, and sends them to the block commit task.
488 ///
489 /// After queueing a finalized block, this method checks whether the newly
490 /// queued block (and any of its descendants) can be committed to the state.
491 ///
492 /// Returns an error if the block commit channel has been closed.
493 pub fn drain_finalized_queue_and_commit(&mut self) {
494 use tokio::sync::mpsc::error::{SendError, TryRecvError};
495
496 // # Correctness & Performance
497 //
498 // This method must not block, access the database, or perform CPU-intensive tasks,
499 // because it is called directly from the tokio executor's Future threads.
500
501 // If a block failed, we need to start again from a valid tip.
502 match self.invalid_block_write_reset_receiver.try_recv() {
503 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
504 Err(TryRecvError::Disconnected) => {
505 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
506 return;
507 }
508 // There are no errors, so we can just use the last block hash we sent
509 Err(TryRecvError::Empty) => {}
510 }
511
512 while let Some(queued_block) = self
513 .finalized_state_queued_blocks
514 .remove(&self.finalized_block_write_last_sent_hash)
515 {
516 let last_sent_finalized_block_height = queued_block.0.height;
517
518 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
519
520 // If we've finished sending finalized blocks, ignore any repeated blocks.
521 // (Blocks can be repeated after a syncer reset.)
522 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
523 let send_result = finalized_block_write_sender.send(queued_block);
524
525 // If the receiver is closed, we can't send any more blocks.
526 if let Err(SendError(queued)) = send_result {
527 // If Zebra is shutting down, drop blocks and return an error.
528 Self::send_checkpoint_verified_block_error(
529 queued,
530 "block commit task exited. Is Zebra shutting down?",
531 );
532
533 self.clear_finalized_block_queue(
534 "block commit task exited. Is Zebra shutting down?",
535 );
536 } else {
537 metrics::gauge!("state.checkpoint.sent.block.height")
538 .set(last_sent_finalized_block_height.0 as f64);
539 };
540 }
541 }
542 }
543
544 /// Drops all finalized state queue blocks, and sends an error on their result channels.
545 fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
546 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
547 Self::send_checkpoint_verified_block_error(queued, error.clone());
548 }
549 }
550
551 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
552 fn send_checkpoint_verified_block_error(
553 queued: QueuedCheckpointVerified,
554 error: impl Into<BoxError>,
555 ) {
556 let (finalized, rsp_tx) = queued;
557
558 // The block sender might have already given up on this block,
559 // so ignore any channel send errors.
560 let _ = rsp_tx.send(Err(error.into()));
561 std::mem::drop(finalized);
562 }
563
564 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
565 fn clear_non_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
566 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
567 Self::send_semantically_verified_block_error(queued, error.clone());
568 }
569 }
570
571 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
572 fn send_semantically_verified_block_error(
573 queued: QueuedSemanticallyVerified,
574 error: impl Into<BoxError>,
575 ) {
576 let (finalized, rsp_tx) = queued;
577
578 // The block sender might have already given up on this block,
579 // so ignore any channel send errors.
580 let _ = rsp_tx.send(Err(error.into()));
581 std::mem::drop(finalized);
582 }
583
584 /// Queue a semantically verified block for contextual verification and check if any queued
585 /// blocks are ready to be verified and committed to the state.
586 ///
587 /// This function encodes the logic for [committing non-finalized blocks][1]
588 /// in RFC0005.
589 ///
590 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
591 #[instrument(level = "debug", skip(self, semantically_verrified))]
592 fn queue_and_commit_to_non_finalized_state(
593 &mut self,
594 semantically_verrified: SemanticallyVerifiedBlock,
595 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
596 tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
597 let parent_hash = semantically_verrified.block.header.previous_block_hash;
598
599 if self
600 .non_finalized_block_write_sent_hashes
601 .contains(&semantically_verrified.hash)
602 {
603 let (rsp_tx, rsp_rx) = oneshot::channel();
604 let _ = rsp_tx.send(Err(
605 "block has already been sent to be committed to the state".into(),
606 ));
607 return rsp_rx;
608 }
609
610 if self
611 .read_service
612 .db
613 .contains_height(semantically_verrified.height)
614 {
615 let (rsp_tx, rsp_rx) = oneshot::channel();
616 let _ = rsp_tx.send(Err(
617 "block height is in the finalized state: block is already committed to the state"
618 .into(),
619 ));
620 return rsp_rx;
621 }
622
623 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
624 // has been queued but not yet committed to the state fails the older request and replaces
625 // it with the newer request.
626 let rsp_rx = if let Some((_, old_rsp_tx)) = self
627 .non_finalized_state_queued_blocks
628 .get_mut(&semantically_verrified.hash)
629 {
630 tracing::debug!("replacing older queued request with new request");
631 let (mut rsp_tx, rsp_rx) = oneshot::channel();
632 std::mem::swap(old_rsp_tx, &mut rsp_tx);
633 let _ = rsp_tx.send(Err("replaced by newer request".into()));
634 rsp_rx
635 } else {
636 let (rsp_tx, rsp_rx) = oneshot::channel();
637 self.non_finalized_state_queued_blocks
638 .queue((semantically_verrified, rsp_tx));
639 rsp_rx
640 };
641
642 // We've finished sending checkpoint verified blocks when:
643 // - we've sent the verified block for the last checkpoint, and
644 // - it has been successfully written to disk.
645 //
646 // We detect the last checkpoint by looking for non-finalized blocks
647 // that are a child of the last block we sent.
648 //
649 // TODO: configure the state with the last checkpoint hash instead?
650 if self.block_write_sender.finalized.is_some()
651 && self
652 .non_finalized_state_queued_blocks
653 .has_queued_children(self.finalized_block_write_last_sent_hash)
654 && self.read_service.db.finalized_tip_hash()
655 == self.finalized_block_write_last_sent_hash
656 {
657 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
658 // and move on to committing semantically verified blocks to the non-finalized state.
659 std::mem::drop(self.block_write_sender.finalized.take());
660 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
661 self.non_finalized_block_write_sent_hashes = SentHashes::default();
662 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
663 self.non_finalized_block_write_sent_hashes
664 .can_fork_chain_at_hashes = true;
665 // Send blocks from non-finalized queue
666 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
667 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
668 self.clear_finalized_block_queue(
669 "already finished committing checkpoint verified blocks: dropped duplicate block, \
670 block is already committed to the state",
671 );
672 } else if !self.can_fork_chain_at(&parent_hash) {
673 tracing::trace!("unready to verify, returning early");
674 } else if self.block_write_sender.finalized.is_none() {
675 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
676 self.send_ready_non_finalized_queued(parent_hash);
677
678 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
679 "Finalized state must have at least one block before committing non-finalized state",
680 );
681
682 self.non_finalized_state_queued_blocks
683 .prune_by_height(finalized_tip_height);
684
685 self.non_finalized_block_write_sent_hashes
686 .prune_by_height(finalized_tip_height);
687 }
688
689 rsp_rx
690 }
691
692 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
693 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
694 self.non_finalized_block_write_sent_hashes
695 .can_fork_chain_at(hash)
696 || &self.read_service.db.finalized_tip_hash() == hash
697 }
698
699 /// Returns `true` if `queued_height` is near the final checkpoint.
700 ///
701 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
702 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
703 ///
704 /// If it doesn't have the required UTXOs, some blocks will time out,
705 /// but succeed after a syncer restart.
706 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
707 queued_height >= self.full_verifier_utxo_lookahead
708 }
709
710 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
711 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
712 #[tracing::instrument(level = "debug", skip(self, new_parent))]
713 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
714 use tokio::sync::mpsc::error::SendError;
715 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
716 let mut new_parents: Vec<block::Hash> = vec![new_parent];
717
718 while let Some(parent_hash) = new_parents.pop() {
719 let queued_children = self
720 .non_finalized_state_queued_blocks
721 .dequeue_children(parent_hash);
722
723 for queued_child in queued_children {
724 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
725
726 self.non_finalized_block_write_sent_hashes
727 .add(&queued_child.0);
728 let send_result = non_finalized_block_write_sender.send(queued_child.into());
729
730 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
731 // If Zebra is shutting down, drop blocks and return an error.
732 Self::send_semantically_verified_block_error(
733 queued,
734 "block commit task exited. Is Zebra shutting down?",
735 );
736
737 self.clear_non_finalized_block_queue(
738 "block commit task exited. Is Zebra shutting down?",
739 );
740
741 return;
742 };
743
744 new_parents.push(hash);
745 }
746 }
747
748 self.non_finalized_block_write_sent_hashes.finish_batch();
749 };
750 }
751
752 /// Return the tip of the current best chain.
753 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
754 self.read_service.best_tip()
755 }
756
757 fn send_invalidate_block(
758 &self,
759 hash: block::Hash,
760 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
761 let (rsp_tx, rsp_rx) = oneshot::channel();
762
763 let Some(sender) = &self.block_write_sender.non_finalized else {
764 let _ = rsp_tx.send(Err(
765 "cannot invalidate blocks while still committing checkpointed blocks".into(),
766 ));
767 return rsp_rx;
768 };
769
770 if let Err(tokio::sync::mpsc::error::SendError(error)) =
771 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
772 {
773 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
774 unreachable!("should return the same Invalidate message could not be sent");
775 };
776
777 let _ = rsp_tx.send(Err(
778 "failed to send invalidate block request to block write task".into(),
779 ));
780 }
781
782 rsp_rx
783 }
784
785 fn send_reconsider_block(
786 &self,
787 hash: block::Hash,
788 ) -> oneshot::Receiver<Result<Vec<block::Hash>, BoxError>> {
789 let (rsp_tx, rsp_rx) = oneshot::channel();
790
791 let Some(sender) = &self.block_write_sender.non_finalized else {
792 let _ = rsp_tx.send(Err(
793 "cannot reconsider blocks while still committing checkpointed blocks".into(),
794 ));
795 return rsp_rx;
796 };
797
798 if let Err(tokio::sync::mpsc::error::SendError(error)) =
799 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
800 {
801 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
802 unreachable!("should return the same Reconsider message could not be sent");
803 };
804
805 let _ = rsp_tx.send(Err(
806 "failed to send reconsider block request to block write task".into(),
807 ));
808 }
809
810 rsp_rx
811 }
812
813 /// Assert some assumptions about the semantically verified `block` before it is queued.
814 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
815 // required by `Request::CommitSemanticallyVerifiedBlock` call
816 assert!(
817 block.height > self.network.mandatory_checkpoint_height(),
818 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
819 blocks, and the canopy activation block, must be committed to the state as finalized \
820 blocks"
821 );
822 }
823}
824
825impl ReadStateService {
826 /// Creates a new read-only state service, using the provided finalized state and
827 /// block write task handle.
828 ///
829 /// Returns the newly created service,
830 /// and a watch channel for updating the shared recent non-finalized chain.
831 pub(crate) fn new(
832 finalized_state: &FinalizedState,
833 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
834 non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
835 ) -> Self {
836 let read_service = Self {
837 network: finalized_state.network(),
838 db: finalized_state.db.clone(),
839 non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
840 block_write_task,
841 };
842
843 tracing::debug!("created new read-only state service");
844
845 read_service
846 }
847
848 /// Return the tip of the current best chain.
849 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
850 read::best_tip(&self.latest_non_finalized_state(), &self.db)
851 }
852
853 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
854 fn latest_non_finalized_state(&self) -> NonFinalizedState {
855 self.non_finalized_state_receiver.cloned_watch_data()
856 }
857
858 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
859 #[allow(dead_code)]
860 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
861 self.latest_non_finalized_state().best_chain().cloned()
862 }
863
864 /// Test-only access to the inner database.
865 /// Can be used to modify the database without doing any consensus checks.
866 #[cfg(any(test, feature = "proptest-impl"))]
867 pub fn db(&self) -> &ZebraDb {
868 &self.db
869 }
870
871 /// Logs rocksdb metrics using the read only state service.
872 pub fn log_db_metrics(&self) {
873 self.db.print_db_metrics();
874 }
875}
876
877impl Service<Request> for StateService {
878 type Response = Response;
879 type Error = BoxError;
880 type Future =
881 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
882
883 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
884 // Check for panics in the block write task
885 let poll = self.read_service.poll_ready(cx);
886
887 // Prune outdated UTXO requests
888 let now = Instant::now();
889
890 if self.last_prune + Self::PRUNE_INTERVAL < now {
891 let tip = self.best_tip();
892 let old_len = self.pending_utxos.len();
893
894 self.pending_utxos.prune();
895 self.last_prune = now;
896
897 let new_len = self.pending_utxos.len();
898 let prune_count = old_len
899 .checked_sub(new_len)
900 .expect("prune does not add any utxo requests");
901 if prune_count > 0 {
902 tracing::debug!(
903 ?old_len,
904 ?new_len,
905 ?prune_count,
906 ?tip,
907 "pruned utxo requests"
908 );
909 } else {
910 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
911 }
912 }
913
914 poll
915 }
916
917 #[instrument(name = "state", skip(self, req))]
918 fn call(&mut self, req: Request) -> Self::Future {
919 req.count_metric();
920 let timer = CodeTimer::start();
921 let span = Span::current();
922
923 match req {
924 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
925 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
926 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
927 self.assert_block_can_be_validated(&semantically_verified);
928
929 self.pending_utxos
930 .check_against_ordered(&semantically_verified.new_outputs);
931
932 // # Performance
933 //
934 // Allow other async tasks to make progress while blocks are being verified
935 // and written to disk. But wait for the blocks to finish committing,
936 // so that `StateService` multi-block queries always observe a consistent state.
937 //
938 // Since each block is spawned into its own task,
939 // there shouldn't be any other code running in the same task,
940 // so we don't need to worry about blocking it:
941 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
942
943 let rsp_rx = tokio::task::block_in_place(move || {
944 span.in_scope(|| {
945 self.queue_and_commit_to_non_finalized_state(semantically_verified)
946 })
947 });
948
949 // TODO:
950 // - check for panics in the block write task here,
951 // as well as in poll_ready()
952
953 // The work is all done, the future just waits on a channel for the result
954 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
955
956 let span = Span::current();
957 async move {
958 rsp_rx
959 .await
960 .map_err(|_recv_error| {
961 BoxError::from(
962 "block was dropped from the queue of non-finalized blocks",
963 )
964 })
965 // TODO: replace with Result::flatten once it stabilises
966 // https://github.com/rust-lang/rust/issues/70142
967 .and_then(convert::identity)
968 .map(Response::Committed)
969 }
970 .instrument(span)
971 .boxed()
972 }
973
974 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
975 // Accesses shared writeable state in the StateService.
976 Request::CommitCheckpointVerifiedBlock(finalized) => {
977 // # Consensus
978 //
979 // A semantic block verification could have called AwaitUtxo
980 // before this checkpoint verified block arrived in the state.
981 // So we need to check for pending UTXO requests sent by running
982 // semantic block verifications.
983 //
984 // This check is redundant for most checkpoint verified blocks,
985 // because semantic verification can only succeed near the final
986 // checkpoint, when all the UTXOs are available for the verifying block.
987 //
988 // (Checkpoint block UTXOs are verified using block hash checkpoints
989 // and transaction merkle tree block header commitments.)
990 self.pending_utxos
991 .check_against_ordered(&finalized.new_outputs);
992
993 // # Performance
994 //
995 // This method doesn't block, access the database, or perform CPU-intensive tasks,
996 // so we can run it directly in the tokio executor's Future threads.
997 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
998
999 // TODO:
1000 // - check for panics in the block write task here,
1001 // as well as in poll_ready()
1002
1003 // The work is all done, the future just waits on a channel for the result
1004 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1005
1006 async move {
1007 rsp_rx
1008 .await
1009 .map_err(|_recv_error| {
1010 BoxError::from("block was dropped from the queue of finalized blocks")
1011 })
1012 // TODO: replace with Result::flatten once it stabilises
1013 // https://github.com/rust-lang/rust/issues/70142
1014 .and_then(convert::identity)
1015 .map(Response::Committed)
1016 }
1017 .instrument(span)
1018 .boxed()
1019 }
1020
1021 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1022 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1023 Request::AwaitUtxo(outpoint) => {
1024 // Prepare the AwaitUtxo future from PendingUxtos.
1025 let response_fut = self.pending_utxos.queue(outpoint);
1026 // Only instrument `response_fut`, the ReadStateService already
1027 // instruments its requests with the same span.
1028
1029 let response_fut = response_fut.instrument(span).boxed();
1030
1031 // Check the non-finalized block queue outside the returned future,
1032 // so we can access mutable state fields.
1033 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1034 self.pending_utxos.respond(&outpoint, utxo);
1035
1036 // We're finished, the returned future gets the UTXO from the respond() channel.
1037 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1038
1039 return response_fut;
1040 }
1041
1042 // Check the sent non-finalized blocks
1043 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1044 self.pending_utxos.respond(&outpoint, utxo);
1045
1046 // We're finished, the returned future gets the UTXO from the respond() channel.
1047 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1048
1049 return response_fut;
1050 }
1051
1052 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1053 // because it is only used during checkpoint verification.
1054 //
1055 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1056 // See #5126 for details.
1057
1058 // Manually send a request to the ReadStateService,
1059 // to get UTXOs from any non-finalized chain or the finalized chain.
1060 let read_service = self.read_service.clone();
1061
1062 // Run the request in an async block, so we can await the response.
1063 async move {
1064 let req = ReadRequest::AnyChainUtxo(outpoint);
1065
1066 let rsp = read_service.oneshot(req).await?;
1067
1068 // Optional TODO:
1069 // - make pending_utxos.respond() async using a channel,
1070 // so we can respond to all waiting requests here
1071 //
1072 // This change is not required for correctness, because:
1073 // - any waiting requests should have returned when the block was sent to the state
1074 // - otherwise, the request returns immediately if:
1075 // - the block is in the non-finalized queue, or
1076 // - the block is in any non-finalized chain or the finalized state
1077 //
1078 // And if the block is in the finalized queue,
1079 // that's rare enough that a retry is ok.
1080 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1081 // We got a UTXO, so we replace the response future with the result own.
1082 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1083
1084 return Ok(Response::Utxo(utxo));
1085 }
1086
1087 // We're finished, but the returned future is waiting on the respond() channel.
1088 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1089
1090 response_fut.await
1091 }
1092 .boxed()
1093 }
1094
1095 // Used by sync, inbound, and block verifier to check if a block is already in the state
1096 // before downloading or validating it.
1097 Request::KnownBlock(hash) => {
1098 let timer = CodeTimer::start();
1099
1100 let read_service = self.read_service.clone();
1101
1102 async move {
1103 let response = read::non_finalized_state_contains_block_hash(
1104 &read_service.latest_non_finalized_state(),
1105 hash,
1106 )
1107 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1108
1109 // The work is done in the future.
1110 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1111
1112 Ok(Response::KnownBlock(response))
1113 }
1114 .boxed()
1115 }
1116
1117 Request::InvalidateBlock(block_hash) => {
1118 let rsp_rx = tokio::task::block_in_place(move || {
1119 span.in_scope(|| self.send_invalidate_block(block_hash))
1120 });
1121
1122 let span = Span::current();
1123 async move {
1124 rsp_rx
1125 .await
1126 .map_err(|_recv_error| {
1127 BoxError::from("invalidate block request was unexpectedly dropped")
1128 })
1129 // TODO: replace with Result::flatten once it stabilises
1130 // https://github.com/rust-lang/rust/issues/70142
1131 .and_then(convert::identity)
1132 .map(Response::Invalidated)
1133 }
1134 .instrument(span)
1135 .boxed()
1136 }
1137
1138 Request::ReconsiderBlock(block_hash) => {
1139 let rsp_rx = tokio::task::block_in_place(move || {
1140 span.in_scope(|| self.send_reconsider_block(block_hash))
1141 });
1142
1143 let span = Span::current();
1144 async move {
1145 rsp_rx
1146 .await
1147 .map_err(|_recv_error| {
1148 BoxError::from("reconsider block request was unexpectedly dropped")
1149 })
1150 // TODO: replace with Result::flatten once it stabilises
1151 // https://github.com/rust-lang/rust/issues/70142
1152 .and_then(convert::identity)
1153 .map(Response::Reconsidered)
1154 }
1155 .instrument(span)
1156 .boxed()
1157 }
1158
1159 // Runs concurrently using the ReadStateService
1160 Request::Tip
1161 | Request::Depth(_)
1162 | Request::BestChainNextMedianTimePast
1163 | Request::BestChainBlockHash(_)
1164 | Request::BlockLocator
1165 | Request::Transaction(_)
1166 | Request::UnspentBestChainUtxo(_)
1167 | Request::Block(_)
1168 | Request::BlockAndSize(_)
1169 | Request::BlockHeader(_)
1170 | Request::FindBlockHashes { .. }
1171 | Request::FindBlockHeaders { .. }
1172 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1173 // Redirect the request to the concurrent ReadStateService
1174 let read_service = self.read_service.clone();
1175
1176 async move {
1177 let req = req
1178 .try_into()
1179 .expect("ReadRequest conversion should not fail");
1180
1181 let rsp = read_service.oneshot(req).await?;
1182 let rsp = rsp.try_into().expect("Response conversion should not fail");
1183
1184 Ok(rsp)
1185 }
1186 .boxed()
1187 }
1188
1189 Request::CheckBlockProposalValidity(_) => {
1190 // Redirect the request to the concurrent ReadStateService
1191 let read_service = self.read_service.clone();
1192
1193 async move {
1194 let req = req
1195 .try_into()
1196 .expect("ReadRequest conversion should not fail");
1197
1198 let rsp = read_service.oneshot(req).await?;
1199 let rsp = rsp.try_into().expect("Response conversion should not fail");
1200
1201 Ok(rsp)
1202 }
1203 .boxed()
1204 }
1205 }
1206 }
1207}
1208
1209impl Service<ReadRequest> for ReadStateService {
1210 type Response = ReadResponse;
1211 type Error = BoxError;
1212 type Future =
1213 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1214
1215 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1216 // Check for panics in the block write task
1217 //
1218 // TODO: move into a check_for_panics() method
1219 let block_write_task = self.block_write_task.take();
1220
1221 if let Some(block_write_task) = block_write_task {
1222 if block_write_task.is_finished() {
1223 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1224 // We are the last state with a reference to this task, so we can propagate any panics
1225 if let Err(thread_panic) = block_write_task.join() {
1226 std::panic::resume_unwind(thread_panic);
1227 }
1228 }
1229 } else {
1230 // It hasn't finished, so we need to put it back
1231 self.block_write_task = Some(block_write_task);
1232 }
1233 }
1234
1235 self.db.check_for_panics();
1236
1237 Poll::Ready(Ok(()))
1238 }
1239
1240 #[instrument(name = "read_state", skip(self, req))]
1241 fn call(&mut self, req: ReadRequest) -> Self::Future {
1242 req.count_metric();
1243 let timer = CodeTimer::start();
1244 let span = Span::current();
1245
1246 match req {
1247 // Used by the `getblockchaininfo` RPC.
1248 ReadRequest::UsageInfo => {
1249 let db = self.db.clone();
1250
1251 tokio::task::spawn_blocking(move || {
1252 span.in_scope(move || {
1253 // The work is done in the future.
1254
1255 let db_size = db.size();
1256
1257 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1258
1259 Ok(ReadResponse::UsageInfo(db_size))
1260 })
1261 })
1262 .wait_for_panics()
1263 }
1264
1265 // Used by the StateService.
1266 ReadRequest::Tip => {
1267 let state = self.clone();
1268
1269 tokio::task::spawn_blocking(move || {
1270 span.in_scope(move || {
1271 let tip = state.non_finalized_state_receiver.with_watch_data(
1272 |non_finalized_state| {
1273 read::tip(non_finalized_state.best_chain(), &state.db)
1274 },
1275 );
1276
1277 // The work is done in the future.
1278 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1279
1280 Ok(ReadResponse::Tip(tip))
1281 })
1282 })
1283 .wait_for_panics()
1284 }
1285
1286 // Used by `getblockchaininfo` RPC method.
1287 ReadRequest::TipPoolValues => {
1288 let state = self.clone();
1289
1290 tokio::task::spawn_blocking(move || {
1291 span.in_scope(move || {
1292 let tip_with_value_balance = state
1293 .non_finalized_state_receiver
1294 .with_watch_data(|non_finalized_state| {
1295 read::tip_with_value_balance(
1296 non_finalized_state.best_chain(),
1297 &state.db,
1298 )
1299 });
1300
1301 // The work is done in the future.
1302 // TODO: Do this in the Drop impl with the variant name?
1303 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1304
1305 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1306 .ok_or(BoxError::from("no chain tip available yet"))?;
1307
1308 Ok(ReadResponse::TipPoolValues {
1309 tip_height,
1310 tip_hash,
1311 value_balance,
1312 })
1313 })
1314 })
1315 .wait_for_panics()
1316 }
1317
1318 // Used by getblock
1319 ReadRequest::BlockInfo(hash_or_height) => {
1320 let state = self.clone();
1321
1322 tokio::task::spawn_blocking(move || {
1323 span.in_scope(move || {
1324 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1325 |non_finalized_state| {
1326 read::block_info(
1327 non_finalized_state.best_chain(),
1328 &state.db,
1329 hash_or_height,
1330 )
1331 },
1332 );
1333
1334 // The work is done in the future.
1335 // TODO: Do this in the Drop impl with the variant name?
1336 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1337
1338 Ok(ReadResponse::BlockInfo(value_balance))
1339 })
1340 })
1341 .wait_for_panics()
1342 }
1343
1344 // Used by the StateService.
1345 ReadRequest::Depth(hash) => {
1346 let state = self.clone();
1347
1348 tokio::task::spawn_blocking(move || {
1349 span.in_scope(move || {
1350 let depth = state.non_finalized_state_receiver.with_watch_data(
1351 |non_finalized_state| {
1352 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1353 },
1354 );
1355
1356 // The work is done in the future.
1357 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1358
1359 Ok(ReadResponse::Depth(depth))
1360 })
1361 })
1362 .wait_for_panics()
1363 }
1364
1365 // Used by the StateService.
1366 ReadRequest::BestChainNextMedianTimePast => {
1367 let state = self.clone();
1368
1369 tokio::task::spawn_blocking(move || {
1370 span.in_scope(move || {
1371 let non_finalized_state = state.latest_non_finalized_state();
1372 let median_time_past =
1373 read::next_median_time_past(&non_finalized_state, &state.db);
1374
1375 // The work is done in the future.
1376 timer.finish(
1377 module_path!(),
1378 line!(),
1379 "ReadRequest::BestChainNextMedianTimePast",
1380 );
1381
1382 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1383 })
1384 })
1385 .wait_for_panics()
1386 }
1387
1388 // Used by the get_block (raw) RPC and the StateService.
1389 ReadRequest::Block(hash_or_height) => {
1390 let state = self.clone();
1391
1392 tokio::task::spawn_blocking(move || {
1393 span.in_scope(move || {
1394 let block = state.non_finalized_state_receiver.with_watch_data(
1395 |non_finalized_state| {
1396 read::block(
1397 non_finalized_state.best_chain(),
1398 &state.db,
1399 hash_or_height,
1400 )
1401 },
1402 );
1403
1404 // The work is done in the future.
1405 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1406
1407 Ok(ReadResponse::Block(block))
1408 })
1409 })
1410 .wait_for_panics()
1411 }
1412
1413 // Used by the get_block (raw) RPC and the StateService.
1414 ReadRequest::BlockAndSize(hash_or_height) => {
1415 let state = self.clone();
1416
1417 tokio::task::spawn_blocking(move || {
1418 span.in_scope(move || {
1419 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1420 |non_finalized_state| {
1421 read::block_and_size(
1422 non_finalized_state.best_chain(),
1423 &state.db,
1424 hash_or_height,
1425 )
1426 },
1427 );
1428
1429 // The work is done in the future.
1430 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1431
1432 Ok(ReadResponse::BlockAndSize(block_and_size))
1433 })
1434 })
1435 .wait_for_panics()
1436 }
1437
1438 // Used by the get_block (verbose) RPC and the StateService.
1439 ReadRequest::BlockHeader(hash_or_height) => {
1440 let state = self.clone();
1441
1442 tokio::task::spawn_blocking(move || {
1443 span.in_scope(move || {
1444 let best_chain = state.latest_best_chain();
1445
1446 let height = hash_or_height
1447 .height_or_else(|hash| {
1448 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1449 })
1450 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1451
1452 let hash = hash_or_height
1453 .hash_or_else(|height| {
1454 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1455 })
1456 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1457
1458 let next_height = height.next()?;
1459 let next_block_hash =
1460 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1461
1462 let header = read::block_header(best_chain, &state.db, height.into())
1463 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1464
1465 // The work is done in the future.
1466 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1467
1468 Ok(ReadResponse::BlockHeader {
1469 header,
1470 hash,
1471 height,
1472 next_block_hash,
1473 })
1474 })
1475 })
1476 .wait_for_panics()
1477 }
1478
1479 // For the get_raw_transaction RPC and the StateService.
1480 ReadRequest::Transaction(hash) => {
1481 let state = self.clone();
1482
1483 tokio::task::spawn_blocking(move || {
1484 span.in_scope(move || {
1485 let response =
1486 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1487
1488 // The work is done in the future.
1489 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1490
1491 Ok(ReadResponse::Transaction(response))
1492 })
1493 })
1494 .wait_for_panics()
1495 }
1496
1497 // Used by the getblock (verbose) RPC.
1498 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1499 let state = self.clone();
1500
1501 tokio::task::spawn_blocking(move || {
1502 span.in_scope(move || {
1503 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1504 |non_finalized_state| {
1505 read::transaction_hashes_for_block(
1506 non_finalized_state.best_chain(),
1507 &state.db,
1508 hash_or_height,
1509 )
1510 },
1511 );
1512
1513 // The work is done in the future.
1514 timer.finish(
1515 module_path!(),
1516 line!(),
1517 "ReadRequest::TransactionIdsForBlock",
1518 );
1519
1520 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1521 })
1522 })
1523 .wait_for_panics()
1524 }
1525
1526 #[cfg(feature = "indexer")]
1527 ReadRequest::SpendingTransactionId(spend) => {
1528 let state = self.clone();
1529
1530 tokio::task::spawn_blocking(move || {
1531 span.in_scope(move || {
1532 let spending_transaction_id = state
1533 .non_finalized_state_receiver
1534 .with_watch_data(|non_finalized_state| {
1535 read::spending_transaction_hash(
1536 non_finalized_state.best_chain(),
1537 &state.db,
1538 spend,
1539 )
1540 });
1541
1542 // The work is done in the future.
1543 timer.finish(
1544 module_path!(),
1545 line!(),
1546 "ReadRequest::TransactionIdForSpentOutPoint",
1547 );
1548
1549 Ok(ReadResponse::TransactionId(spending_transaction_id))
1550 })
1551 })
1552 .wait_for_panics()
1553 }
1554
1555 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1556 let state = self.clone();
1557
1558 tokio::task::spawn_blocking(move || {
1559 span.in_scope(move || {
1560 let utxo = state.non_finalized_state_receiver.with_watch_data(
1561 |non_finalized_state| {
1562 read::unspent_utxo(
1563 non_finalized_state.best_chain(),
1564 &state.db,
1565 outpoint,
1566 )
1567 },
1568 );
1569
1570 // The work is done in the future.
1571 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1572
1573 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1574 })
1575 })
1576 .wait_for_panics()
1577 }
1578
1579 // Manually used by the StateService to implement part of AwaitUtxo.
1580 ReadRequest::AnyChainUtxo(outpoint) => {
1581 let state = self.clone();
1582
1583 tokio::task::spawn_blocking(move || {
1584 span.in_scope(move || {
1585 let utxo = state.non_finalized_state_receiver.with_watch_data(
1586 |non_finalized_state| {
1587 read::any_utxo(non_finalized_state, &state.db, outpoint)
1588 },
1589 );
1590
1591 // The work is done in the future.
1592 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1593
1594 Ok(ReadResponse::AnyChainUtxo(utxo))
1595 })
1596 })
1597 .wait_for_panics()
1598 }
1599
1600 // Used by the StateService.
1601 ReadRequest::BlockLocator => {
1602 let state = self.clone();
1603
1604 tokio::task::spawn_blocking(move || {
1605 span.in_scope(move || {
1606 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1607 |non_finalized_state| {
1608 read::block_locator(non_finalized_state.best_chain(), &state.db)
1609 },
1610 );
1611
1612 // The work is done in the future.
1613 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1614
1615 Ok(ReadResponse::BlockLocator(
1616 block_locator.unwrap_or_default(),
1617 ))
1618 })
1619 })
1620 .wait_for_panics()
1621 }
1622
1623 // Used by the StateService.
1624 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1625 let state = self.clone();
1626
1627 tokio::task::spawn_blocking(move || {
1628 span.in_scope(move || {
1629 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1630 |non_finalized_state| {
1631 read::find_chain_hashes(
1632 non_finalized_state.best_chain(),
1633 &state.db,
1634 known_blocks,
1635 stop,
1636 MAX_FIND_BLOCK_HASHES_RESULTS,
1637 )
1638 },
1639 );
1640
1641 // The work is done in the future.
1642 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1643
1644 Ok(ReadResponse::BlockHashes(block_hashes))
1645 })
1646 })
1647 .wait_for_panics()
1648 }
1649
1650 // Used by the StateService.
1651 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1652 let state = self.clone();
1653
1654 tokio::task::spawn_blocking(move || {
1655 span.in_scope(move || {
1656 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1657 |non_finalized_state| {
1658 read::find_chain_headers(
1659 non_finalized_state.best_chain(),
1660 &state.db,
1661 known_blocks,
1662 stop,
1663 MAX_FIND_BLOCK_HEADERS_RESULTS,
1664 )
1665 },
1666 );
1667
1668 let block_headers = block_headers
1669 .into_iter()
1670 .map(|header| CountedHeader { header })
1671 .collect();
1672
1673 // The work is done in the future.
1674 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1675
1676 Ok(ReadResponse::BlockHeaders(block_headers))
1677 })
1678 })
1679 .wait_for_panics()
1680 }
1681
1682 ReadRequest::SaplingTree(hash_or_height) => {
1683 let state = self.clone();
1684
1685 tokio::task::spawn_blocking(move || {
1686 span.in_scope(move || {
1687 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1688 |non_finalized_state| {
1689 read::sapling_tree(
1690 non_finalized_state.best_chain(),
1691 &state.db,
1692 hash_or_height,
1693 )
1694 },
1695 );
1696
1697 // The work is done in the future.
1698 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1699
1700 Ok(ReadResponse::SaplingTree(sapling_tree))
1701 })
1702 })
1703 .wait_for_panics()
1704 }
1705
1706 ReadRequest::OrchardTree(hash_or_height) => {
1707 let state = self.clone();
1708
1709 tokio::task::spawn_blocking(move || {
1710 span.in_scope(move || {
1711 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1712 |non_finalized_state| {
1713 read::orchard_tree(
1714 non_finalized_state.best_chain(),
1715 &state.db,
1716 hash_or_height,
1717 )
1718 },
1719 );
1720
1721 // The work is done in the future.
1722 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1723
1724 Ok(ReadResponse::OrchardTree(orchard_tree))
1725 })
1726 })
1727 .wait_for_panics()
1728 }
1729
1730 ReadRequest::SaplingSubtrees { start_index, limit } => {
1731 let state = self.clone();
1732
1733 tokio::task::spawn_blocking(move || {
1734 span.in_scope(move || {
1735 let end_index = limit
1736 .and_then(|limit| start_index.0.checked_add(limit.0))
1737 .map(NoteCommitmentSubtreeIndex);
1738
1739 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1740 |non_finalized_state| {
1741 if let Some(end_index) = end_index {
1742 read::sapling_subtrees(
1743 non_finalized_state.best_chain(),
1744 &state.db,
1745 start_index..end_index,
1746 )
1747 } else {
1748 // If there is no end bound, just return all the trees.
1749 // If the end bound would overflow, just returns all the trees, because that's what
1750 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1751 // the trees run out.)
1752 read::sapling_subtrees(
1753 non_finalized_state.best_chain(),
1754 &state.db,
1755 start_index..,
1756 )
1757 }
1758 },
1759 );
1760
1761 // The work is done in the future.
1762 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1763
1764 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1765 })
1766 })
1767 .wait_for_panics()
1768 }
1769
1770 ReadRequest::OrchardSubtrees { start_index, limit } => {
1771 let state = self.clone();
1772
1773 tokio::task::spawn_blocking(move || {
1774 span.in_scope(move || {
1775 let end_index = limit
1776 .and_then(|limit| start_index.0.checked_add(limit.0))
1777 .map(NoteCommitmentSubtreeIndex);
1778
1779 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1780 |non_finalized_state| {
1781 if let Some(end_index) = end_index {
1782 read::orchard_subtrees(
1783 non_finalized_state.best_chain(),
1784 &state.db,
1785 start_index..end_index,
1786 )
1787 } else {
1788 // If there is no end bound, just return all the trees.
1789 // If the end bound would overflow, just returns all the trees, because that's what
1790 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1791 // the trees run out.)
1792 read::orchard_subtrees(
1793 non_finalized_state.best_chain(),
1794 &state.db,
1795 start_index..,
1796 )
1797 }
1798 },
1799 );
1800
1801 // The work is done in the future.
1802 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1803
1804 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1805 })
1806 })
1807 .wait_for_panics()
1808 }
1809
1810 // For the get_address_balance RPC.
1811 ReadRequest::AddressBalance(addresses) => {
1812 let state = self.clone();
1813
1814 tokio::task::spawn_blocking(move || {
1815 span.in_scope(move || {
1816 let (balance, received) = state
1817 .non_finalized_state_receiver
1818 .with_watch_data(|non_finalized_state| {
1819 read::transparent_balance(
1820 non_finalized_state.best_chain().cloned(),
1821 &state.db,
1822 addresses,
1823 )
1824 })?;
1825
1826 // The work is done in the future.
1827 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1828
1829 Ok(ReadResponse::AddressBalance { balance, received })
1830 })
1831 })
1832 .wait_for_panics()
1833 }
1834
1835 // For the get_address_tx_ids RPC.
1836 ReadRequest::TransactionIdsByAddresses {
1837 addresses,
1838 height_range,
1839 } => {
1840 let state = self.clone();
1841
1842 tokio::task::spawn_blocking(move || {
1843 span.in_scope(move || {
1844 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1845 |non_finalized_state| {
1846 read::transparent_tx_ids(
1847 non_finalized_state.best_chain(),
1848 &state.db,
1849 addresses,
1850 height_range,
1851 )
1852 },
1853 );
1854
1855 // The work is done in the future.
1856 timer.finish(
1857 module_path!(),
1858 line!(),
1859 "ReadRequest::TransactionIdsByAddresses",
1860 );
1861
1862 tx_ids.map(ReadResponse::AddressesTransactionIds)
1863 })
1864 })
1865 .wait_for_panics()
1866 }
1867
1868 // For the get_address_utxos RPC.
1869 ReadRequest::UtxosByAddresses(addresses) => {
1870 let state = self.clone();
1871
1872 tokio::task::spawn_blocking(move || {
1873 span.in_scope(move || {
1874 let utxos = state.non_finalized_state_receiver.with_watch_data(
1875 |non_finalized_state| {
1876 read::address_utxos(
1877 &state.network,
1878 non_finalized_state.best_chain(),
1879 &state.db,
1880 addresses,
1881 )
1882 },
1883 );
1884
1885 // The work is done in the future.
1886 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1887
1888 utxos.map(ReadResponse::AddressUtxos)
1889 })
1890 })
1891 .wait_for_panics()
1892 }
1893
1894 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1895 let state = self.clone();
1896
1897 tokio::task::spawn_blocking(move || {
1898 span.in_scope(move || {
1899 let latest_non_finalized_best_chain =
1900 state.latest_non_finalized_state().best_chain().cloned();
1901
1902 check::nullifier::tx_no_duplicates_in_chain(
1903 &state.db,
1904 latest_non_finalized_best_chain.as_ref(),
1905 &unmined_tx.transaction,
1906 )?;
1907
1908 check::anchors::tx_anchors_refer_to_final_treestates(
1909 &state.db,
1910 latest_non_finalized_best_chain.as_ref(),
1911 &unmined_tx,
1912 )?;
1913
1914 // The work is done in the future.
1915 timer.finish(
1916 module_path!(),
1917 line!(),
1918 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1919 );
1920
1921 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1922 })
1923 })
1924 .wait_for_panics()
1925 }
1926
1927 // Used by the get_block and get_block_hash RPCs.
1928 ReadRequest::BestChainBlockHash(height) => {
1929 let state = self.clone();
1930
1931 // # Performance
1932 //
1933 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1934
1935 tokio::task::spawn_blocking(move || {
1936 span.in_scope(move || {
1937 let hash = state.non_finalized_state_receiver.with_watch_data(
1938 |non_finalized_state| {
1939 read::hash_by_height(
1940 non_finalized_state.best_chain(),
1941 &state.db,
1942 height,
1943 )
1944 },
1945 );
1946
1947 // The work is done in the future.
1948 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
1949
1950 Ok(ReadResponse::BlockHash(hash))
1951 })
1952 })
1953 .wait_for_panics()
1954 }
1955
1956 // Used by get_block_template and getblockchaininfo RPCs.
1957 ReadRequest::ChainInfo => {
1958 let state = self.clone();
1959 let latest_non_finalized_state = self.latest_non_finalized_state();
1960
1961 // # Performance
1962 //
1963 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1964
1965 tokio::task::spawn_blocking(move || {
1966 span.in_scope(move || {
1967 // # Correctness
1968 //
1969 // It is ok to do these lookups using multiple database calls. Finalized state updates
1970 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1971 //
1972 // If there is a large overlap between the non-finalized and finalized states,
1973 // where the finalized tip is above the non-finalized tip,
1974 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1975 //
1976 // In that case, the `getblocktemplate` RPC will return an error because Zebra
1977 // is not synced to the tip. That check happens before the RPC makes this request.
1978 let get_block_template_info =
1979 read::difficulty::get_block_template_chain_info(
1980 &latest_non_finalized_state,
1981 &state.db,
1982 &state.network,
1983 );
1984
1985 // The work is done in the future.
1986 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
1987
1988 get_block_template_info.map(ReadResponse::ChainInfo)
1989 })
1990 })
1991 .wait_for_panics()
1992 }
1993
1994 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1995 ReadRequest::SolutionRate { num_blocks, height } => {
1996 let state = self.clone();
1997
1998 // # Performance
1999 //
2000 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2001
2002 tokio::task::spawn_blocking(move || {
2003 span.in_scope(move || {
2004 let latest_non_finalized_state = state.latest_non_finalized_state();
2005 // # Correctness
2006 //
2007 // It is ok to do these lookups using multiple database calls. Finalized state updates
2008 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2009 //
2010 // The worst that can happen here is that the default `start_hash` will be below
2011 // the chain tip.
2012 let (tip_height, tip_hash) =
2013 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2014 Some(tip_hash) => tip_hash,
2015 None => return Ok(ReadResponse::SolutionRate(None)),
2016 };
2017
2018 let start_hash = match height {
2019 Some(height) if height < tip_height => read::hash_by_height(
2020 latest_non_finalized_state.best_chain(),
2021 &state.db,
2022 height,
2023 ),
2024 // use the chain tip hash if height is above it or not provided.
2025 _ => Some(tip_hash),
2026 };
2027
2028 let solution_rate = start_hash.and_then(|start_hash| {
2029 read::difficulty::solution_rate(
2030 &latest_non_finalized_state,
2031 &state.db,
2032 num_blocks,
2033 start_hash,
2034 )
2035 });
2036
2037 // The work is done in the future.
2038 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2039
2040 Ok(ReadResponse::SolutionRate(solution_rate))
2041 })
2042 })
2043 .wait_for_panics()
2044 }
2045
2046 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2047 let state = self.clone();
2048
2049 // # Performance
2050 //
2051 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2052
2053 tokio::task::spawn_blocking(move || {
2054 span.in_scope(move || {
2055 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2056 let mut latest_non_finalized_state = state.latest_non_finalized_state();
2057
2058 // The previous block of a valid proposal must be on the best chain tip.
2059 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2060 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2061 };
2062
2063 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2064 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2065 }
2066
2067 // This clone of the non-finalized state is dropped when this closure returns.
2068 // The non-finalized state that's used in the rest of the state (including finalizing
2069 // blocks into the db) is not mutated here.
2070 //
2071 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2072 latest_non_finalized_state.disable_metrics();
2073
2074 write::validate_and_commit_non_finalized(
2075 &state.db,
2076 &mut latest_non_finalized_state,
2077 semantically_verified,
2078 )?;
2079
2080 // The work is done in the future.
2081 timer.finish(
2082 module_path!(),
2083 line!(),
2084 "ReadRequest::CheckBlockProposalValidity",
2085 );
2086
2087 Ok(ReadResponse::ValidBlockProposal)
2088 })
2089 })
2090 .wait_for_panics()
2091 }
2092
2093 ReadRequest::TipBlockSize => {
2094 let state = self.clone();
2095
2096 tokio::task::spawn_blocking(move || {
2097 span.in_scope(move || {
2098 // Get the best chain tip height.
2099 let tip_height = state
2100 .non_finalized_state_receiver
2101 .with_watch_data(|non_finalized_state| {
2102 read::tip_height(non_finalized_state.best_chain(), &state.db)
2103 })
2104 .unwrap_or(Height(0));
2105
2106 // Get the block at the best chain tip height.
2107 let block = state.non_finalized_state_receiver.with_watch_data(
2108 |non_finalized_state| {
2109 read::block(
2110 non_finalized_state.best_chain(),
2111 &state.db,
2112 tip_height.into(),
2113 )
2114 },
2115 );
2116
2117 // The work is done in the future.
2118 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2119
2120 // Respond with the length of the obtained block if any.
2121 match block {
2122 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2123 b.zcash_serialize_to_vec()?.len(),
2124 ))),
2125 None => Ok(ReadResponse::TipBlockSize(None)),
2126 }
2127 })
2128 })
2129 .wait_for_panics()
2130 }
2131
2132 ReadRequest::NonFinalizedBlocksListener => {
2133 // The non-finalized blocks listener is used to notify the state service
2134 // about new blocks that have been added to the non-finalized state.
2135 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2136 self.network.clone(),
2137 self.non_finalized_state_receiver.clone(),
2138 );
2139
2140 async move {
2141 timer.finish(
2142 module_path!(),
2143 line!(),
2144 "ReadRequest::NonFinalizedBlocksListener",
2145 );
2146
2147 Ok(ReadResponse::NonFinalizedBlocksListener(
2148 non_finalized_blocks_listener,
2149 ))
2150 }
2151 .boxed()
2152 }
2153 }
2154 }
2155}
2156
2157/// Initialize a state service from the provided [`Config`].
2158/// Returns a boxed state service, a read-only state service,
2159/// and receivers for state chain tip updates.
2160///
2161/// Each `network` has its own separate on-disk database.
2162///
2163/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2164/// to work out when it is near the final checkpoint.
2165///
2166/// To share access to the state, wrap the returned service in a `Buffer`,
2167/// or clone the returned [`ReadStateService`].
2168///
2169/// It's possible to construct multiple state services in the same application (as
2170/// long as they, e.g., use different storage locations), but doing so is
2171/// probably not what you want.
2172pub fn init(
2173 config: Config,
2174 network: &Network,
2175 max_checkpoint_height: block::Height,
2176 checkpoint_verify_concurrency_limit: usize,
2177) -> (
2178 BoxService<Request, Response, BoxError>,
2179 ReadStateService,
2180 LatestChainTip,
2181 ChainTipChange,
2182) {
2183 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2184 StateService::new(
2185 config,
2186 network,
2187 max_checkpoint_height,
2188 checkpoint_verify_concurrency_limit,
2189 );
2190
2191 (
2192 BoxService::new(state_service),
2193 read_only_state_service,
2194 latest_chain_tip,
2195 chain_tip_change,
2196 )
2197}
2198
2199/// Initialize a read state service from the provided [`Config`].
2200/// Returns a read-only state service,
2201///
2202/// Each `network` has its own separate on-disk database.
2203///
2204/// To share access to the state, clone the returned [`ReadStateService`].
2205pub fn init_read_only(
2206 config: Config,
2207 network: &Network,
2208) -> (
2209 ReadStateService,
2210 ZebraDb,
2211 tokio::sync::watch::Sender<NonFinalizedState>,
2212) {
2213 let finalized_state = FinalizedState::new_with_debug(
2214 &config,
2215 network,
2216 true,
2217 #[cfg(feature = "elasticsearch")]
2218 false,
2219 true,
2220 );
2221 let (non_finalized_state_sender, non_finalized_state_receiver) =
2222 tokio::sync::watch::channel(NonFinalizedState::new(network));
2223
2224 (
2225 ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
2226 finalized_state.db.clone(),
2227 non_finalized_state_sender,
2228 )
2229}
2230
2231/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2232/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2233pub fn spawn_init_read_only(
2234 config: Config,
2235 network: &Network,
2236) -> tokio::task::JoinHandle<(
2237 ReadStateService,
2238 ZebraDb,
2239 tokio::sync::watch::Sender<NonFinalizedState>,
2240)> {
2241 let network = network.clone();
2242 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2243}
2244
2245/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
2246/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
2247/// a read state service, and receivers for state chain tip updates.
2248pub fn spawn_init(
2249 config: Config,
2250 network: &Network,
2251 max_checkpoint_height: block::Height,
2252 checkpoint_verify_concurrency_limit: usize,
2253) -> tokio::task::JoinHandle<(
2254 BoxService<Request, Response, BoxError>,
2255 ReadStateService,
2256 LatestChainTip,
2257 ChainTipChange,
2258)> {
2259 let network = network.clone();
2260 tokio::task::spawn_blocking(move || {
2261 init(
2262 config,
2263 &network,
2264 max_checkpoint_height,
2265 checkpoint_verify_concurrency_limit,
2266 )
2267 })
2268}
2269
2270/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2271///
2272/// This can be used to create a state service for testing. See also [`init`].
2273#[cfg(any(test, feature = "proptest-impl"))]
2274pub fn init_test(network: &Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2275 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2276 // if we ever need to test final checkpoint sent UTXO queries
2277 let (state_service, _, _, _) =
2278 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2279
2280 Buffer::new(BoxService::new(state_service), 1)
2281}
2282
2283/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2284/// then returns the read-write service, read-only service, and tip watch channels.
2285///
2286/// This can be used to create a state service for testing. See also [`init`].
2287#[cfg(any(test, feature = "proptest-impl"))]
2288pub fn init_test_services(
2289 network: &Network,
2290) -> (
2291 Buffer<BoxService<Request, Response, BoxError>, Request>,
2292 ReadStateService,
2293 LatestChainTip,
2294 ChainTipChange,
2295) {
2296 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2297 // if we ever need to test final checkpoint sent UTXO queries
2298 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2299 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2300
2301 let state_service = Buffer::new(BoxService::new(state_service), 1);
2302
2303 (
2304 state_service,
2305 read_state_service,
2306 latest_chain_tip,
2307 chain_tip_change,
2308 )
2309}