zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 convert,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24 time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::oneshot;
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36 block::{self, CountedHeader, HeightDiff},
37 diagnostic::{task::WaitForPanics, CodeTimer},
38 parameters::{Network, NetworkUpgrade},
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45 constants::{
46 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47 },
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 watch_receiver::WatchReceiver,
57 },
58 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
59 ReadResponse, Request, Response, SemanticallyVerifiedBlock, ValidateContextError,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod write;
74
75#[cfg(any(test, feature = "proptest-impl"))]
76pub mod arbitrary;
77
78#[cfg(test)]
79mod tests;
80
81pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
82use write::NonFinalizedWriteMessage;
83
84use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
85
86/// A read-write service for Zebra's cached blockchain state.
87///
88/// This service modifies and provides access to:
89/// - the non-finalized state: the ~100 most recent blocks.
90/// Zebra allows chain forks in the non-finalized state,
91/// stores it in memory, and re-downloads it when restarted.
92/// - the finalized state: older blocks that have many confirmations.
93/// Zebra stores the single best chain in the finalized state,
94/// and re-loads it from disk when restarted.
95///
96/// Read requests to this service are buffered, then processed concurrently.
97/// Block write requests are buffered, then queued, then processed in order by a separate task.
98///
99/// Most state users can get faster read responses using the [`ReadStateService`],
100/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
101///
102/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
103/// They can read the latest block directly, without queueing any requests.
104#[derive(Debug)]
105pub(crate) struct StateService {
106 // Configuration
107 //
108 /// The configured Zcash network.
109 network: Network,
110
111 /// The height that we start storing UTXOs from finalized blocks.
112 ///
113 /// This height should be lower than the last few checkpoints,
114 /// so the full verifier can verify UTXO spends from those blocks,
115 /// even if they haven't been committed to the finalized state yet.
116 full_verifier_utxo_lookahead: block::Height,
117
118 // Queued Blocks
119 //
120 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
121 /// These blocks are awaiting their parent blocks before they can do contextual verification.
122 non_finalized_state_queued_blocks: QueuedBlocks,
123
124 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
125 /// These blocks are awaiting their parent blocks before they can do contextual verification.
126 ///
127 /// Indexed by their parent block hash.
128 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
129
130 /// Channels to send blocks to the block write task.
131 block_write_sender: write::BlockWriteSender,
132
133 /// The [`block::Hash`] of the most recent block sent on
134 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
135 ///
136 /// On startup, this is:
137 /// - the finalized tip, if there are stored blocks, or
138 /// - the genesis block's parent hash, if the database is empty.
139 ///
140 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
141 /// - the hash of the last valid committed block (the parent of the invalid block).
142 finalized_block_write_last_sent_hash: block::Hash,
143
144 /// A set of block hashes that have been sent to the block write task.
145 /// Hashes of blocks below the finalized tip height are periodically pruned.
146 non_finalized_block_write_sent_hashes: SentHashes,
147
148 /// If an invalid block is sent on `finalized_block_write_sender`
149 /// or `non_finalized_block_write_sender`,
150 /// this channel gets the [`block::Hash`] of the valid tip.
151 //
152 // TODO: add tests for finalized and non-finalized resets (#2654)
153 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
154
155 // Pending UTXO Request Tracking
156 //
157 /// The set of outpoints with pending requests for their associated transparent::Output.
158 pending_utxos: PendingUtxos,
159
160 /// Instant tracking the last time `pending_utxos` was pruned.
161 last_prune: Instant,
162
163 // Updating Concurrently Readable State
164 //
165 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
166 ///
167 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
168 read_service: ReadStateService,
169
170 // Metrics
171 //
172 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
173 ///
174 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
175 /// as a break in the graph.
176 max_finalized_queue_height: f64,
177}
178
179/// A read-only service for accessing Zebra's cached blockchain state.
180///
181/// This service provides read-only access to:
182/// - the non-finalized state: the ~100 most recent blocks.
183/// - the finalized state: older blocks that have many confirmations.
184///
185/// Requests to this service are processed in parallel,
186/// ignoring any blocks queued by the read-write [`StateService`].
187///
188/// This quick response behavior is better for most state users.
189/// It allows other async tasks to make progress while concurrently reading data from disk.
190#[derive(Clone, Debug)]
191pub struct ReadStateService {
192 // Configuration
193 //
194 /// The configured Zcash network.
195 network: Network,
196
197 // Shared Concurrently Readable State
198 //
199 /// A watch channel with a cached copy of the [`NonFinalizedState`].
200 ///
201 /// This state is only updated between requests,
202 /// so it might include some block data that is also on `disk`.
203 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
204
205 /// The shared inner on-disk database for the finalized state.
206 ///
207 /// RocksDB allows reads and writes via a shared reference,
208 /// but [`ZebraDb`] doesn't expose any write methods or types.
209 ///
210 /// This chain is updated concurrently with requests,
211 /// so it might include some block data that is also in `best_mem`.
212 db: ZebraDb,
213
214 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
215 /// once the queues have received all their parent blocks.
216 ///
217 /// Used to check for panics when writing blocks.
218 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
219}
220
221impl Drop for StateService {
222 fn drop(&mut self) {
223 // The state service owns the state, tasks, and channels,
224 // so dropping it should shut down everything.
225
226 // Close the channels (non-blocking)
227 // This makes the block write thread exit the next time it checks the channels.
228 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
229 self.invalid_block_write_reset_receiver.close();
230
231 std::mem::drop(self.block_write_sender.finalized.take());
232 std::mem::drop(self.block_write_sender.non_finalized.take());
233
234 self.clear_finalized_block_queue(
235 "dropping the state: dropped unused finalized state queue block",
236 );
237 self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::from(
238 ValidateContextError::DroppedUnusedBlock,
239 ));
240
241 // Log database metrics before shutting down
242 info!("dropping the state: logging database metrics");
243 self.log_db_metrics();
244
245 // Then drop self.read_service, which checks the block write task for panics,
246 // and tries to shut down the database.
247 }
248}
249
250impl Drop for ReadStateService {
251 fn drop(&mut self) {
252 // The read state service shares the state,
253 // so dropping it should check if we can shut down.
254
255 // TODO: move this into a try_shutdown() method
256 if let Some(block_write_task) = self.block_write_task.take() {
257 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258 // We're the last database user, so we can tell it to shut down (blocking):
259 // - flushes the database to disk, and
260 // - drops the database, which cleans up any database tasks correctly.
261 self.db.shutdown(true);
262
263 // We are the last state with a reference to this thread, so we can
264 // wait until the block write task finishes, then check for panics (blocking).
265 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267 // This log is verbose during tests.
268 #[cfg(not(test))]
269 info!("waiting for the block write task to finish");
270 #[cfg(test)]
271 debug!("waiting for the block write task to finish");
272
273 // TODO: move this into a check_for_panics() method
274 if let Err(thread_panic) = block_write_task_handle.join() {
275 std::panic::resume_unwind(thread_panic);
276 } else {
277 debug!("shutting down the state because the block write task has finished");
278 }
279 }
280 } else {
281 // Even if we're not the last database user, try shutting it down.
282 //
283 // TODO: rename this to try_shutdown()?
284 self.db.shutdown(false);
285 }
286 }
287}
288
289impl StateService {
290 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292 /// Creates a new state service for the state `config` and `network`.
293 ///
294 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295 /// to work out when it is near the final checkpoint.
296 ///
297 /// Returns the read-write and read-only state services,
298 /// and read-only watch channels for its best chain tip.
299 pub async fn new(
300 config: Config,
301 network: &Network,
302 max_checkpoint_height: block::Height,
303 checkpoint_verify_concurrency_limit: usize,
304 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305 let (finalized_state, finalized_tip, timer) = {
306 let config = config.clone();
307 let network = network.clone();
308 tokio::task::spawn_blocking(move || {
309 let timer = CodeTimer::start();
310 let finalized_state = FinalizedState::new(
311 &config,
312 &network,
313 #[cfg(feature = "elasticsearch")]
314 true,
315 );
316 timer.finish(module_path!(), line!(), "opening finalized state database");
317
318 let timer = CodeTimer::start();
319 let finalized_tip = finalized_state.db.tip_block();
320
321 (finalized_state, finalized_tip, timer)
322 })
323 .await
324 .expect("failed to join blocking task")
325 };
326
327 // # Correctness
328 //
329 // The state service must set the finalized block write sender to `None`
330 // if there are blocks in the restored non-finalized state that are above
331 // the max checkpoint height so that non-finalized blocks can be written, otherwise,
332 // Zebra will be unable to commit semantically verified blocks, and its chain sync will stall.
333 //
334 // The state service must not set the finalized block write sender to `None` if there
335 // aren't blocks in the restored non-finalized state that are above the max checkpoint height,
336 // otherwise, unless checkpoint sync is disabled in the zebra-consensus configuration,
337 // Zebra will be unable to commit checkpoint verified blocks, and its chain sync will stall.
338 let is_finalized_tip_past_max_checkpoint = if let Some(tip) = &finalized_tip {
339 tip.coinbase_height().expect("valid block must have height") >= max_checkpoint_height
340 } else {
341 false
342 };
343 let (non_finalized_state, non_finalized_state_sender, non_finalized_state_receiver) =
344 NonFinalizedState::new(network)
345 .with_backup(
346 config.non_finalized_state_backup_dir(network),
347 &finalized_state.db,
348 is_finalized_tip_past_max_checkpoint,
349 )
350 .await;
351
352 let non_finalized_block_write_sent_hashes = SentHashes::new(&non_finalized_state);
353 let initial_tip = non_finalized_state
354 .best_tip_block()
355 .map(|cv_block| cv_block.block.clone())
356 .or(finalized_tip)
357 .map(CheckpointVerifiedBlock::from)
358 .map(ChainTipBlock::from);
359
360 tracing::info!(chain_tip = ?initial_tip.as_ref().map(|tip| (tip.hash, tip.height)), "loaded Zebra state cache");
361
362 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
363 ChainTipSender::new(initial_tip, network);
364
365 let finalized_state_for_writing = finalized_state.clone();
366 let should_use_finalized_block_write_sender = non_finalized_state.is_chain_set_empty();
367 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
368 write::BlockWriteSender::spawn(
369 finalized_state_for_writing,
370 non_finalized_state,
371 chain_tip_sender,
372 non_finalized_state_sender,
373 should_use_finalized_block_write_sender,
374 );
375
376 let read_service = ReadStateService::new(
377 &finalized_state,
378 block_write_task,
379 non_finalized_state_receiver,
380 );
381
382 let full_verifier_utxo_lookahead = max_checkpoint_height
383 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
384 .expect("fits in HeightDiff");
385 let full_verifier_utxo_lookahead =
386 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
387 let non_finalized_state_queued_blocks = QueuedBlocks::default();
388 let pending_utxos = PendingUtxos::default();
389
390 let finalized_block_write_last_sent_hash =
391 tokio::task::spawn_blocking(move || finalized_state.db.finalized_tip_hash())
392 .await
393 .expect("failed to join blocking task");
394
395 let state = Self {
396 network: network.clone(),
397 full_verifier_utxo_lookahead,
398 non_finalized_state_queued_blocks,
399 finalized_state_queued_blocks: HashMap::new(),
400 block_write_sender,
401 finalized_block_write_last_sent_hash,
402 non_finalized_block_write_sent_hashes,
403 invalid_block_write_reset_receiver,
404 pending_utxos,
405 last_prune: Instant::now(),
406 read_service: read_service.clone(),
407 max_finalized_queue_height: f64::NAN,
408 };
409 timer.finish(module_path!(), line!(), "initializing state service");
410
411 tracing::info!("starting legacy chain check");
412 let timer = CodeTimer::start();
413
414 if let (Some(tip), Some(nu5_activation_height)) = (
415 {
416 let read_state = state.read_service.clone();
417 tokio::task::spawn_blocking(move || read_state.best_tip())
418 .await
419 .expect("task should not panic")
420 },
421 NetworkUpgrade::Nu5.activation_height(network),
422 ) {
423 if let Err(error) = check::legacy_chain(
424 nu5_activation_height,
425 any_ancestor_blocks(
426 &state.read_service.latest_non_finalized_state(),
427 &state.read_service.db,
428 tip.1,
429 ),
430 &state.network,
431 MAX_LEGACY_CHAIN_BLOCKS,
432 ) {
433 let legacy_db_path = state.read_service.db.path().to_path_buf();
434 panic!(
435 "Cached state contains a legacy chain.\n\
436 An outdated Zebra version did not know about a recent network upgrade,\n\
437 so it followed a legacy chain using outdated consensus branch rules.\n\
438 Hint: Delete your database, and restart Zebra to do a full sync.\n\
439 Database path: {legacy_db_path:?}\n\
440 Error: {error:?}",
441 );
442 }
443 }
444
445 tracing::info!("cached state consensus branch is valid: no legacy chain found");
446 timer.finish(module_path!(), line!(), "legacy chain check");
447
448 (state, read_service, latest_chain_tip, chain_tip_change)
449 }
450
451 /// Call read only state service to log rocksdb database metrics.
452 pub fn log_db_metrics(&self) {
453 self.read_service.db.print_db_metrics();
454 }
455
456 /// Queue a checkpoint verified block for verification and storage in the finalized state.
457 ///
458 /// Returns a channel receiver that provides the result of the block commit.
459 fn queue_and_commit_to_finalized_state(
460 &mut self,
461 checkpoint_verified: CheckpointVerifiedBlock,
462 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
463 // # Correctness & Performance
464 //
465 // This method must not block, access the database, or perform CPU-intensive tasks,
466 // because it is called directly from the tokio executor's Future threads.
467
468 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
469 let queued_height = checkpoint_verified.height;
470
471 // If we're close to the final checkpoint, make the block's UTXOs available for
472 // semantic block verification, even when it is in the channel.
473 if self.is_close_to_final_checkpoint(queued_height) {
474 self.non_finalized_block_write_sent_hashes
475 .add_finalized(&checkpoint_verified)
476 }
477
478 let (rsp_tx, rsp_rx) = oneshot::channel();
479 let queued = (checkpoint_verified, rsp_tx);
480
481 if self.block_write_sender.finalized.is_some() {
482 // We're still committing checkpoint verified blocks
483 if let Some(duplicate_queued) = self
484 .finalized_state_queued_blocks
485 .insert(queued_prev_hash, queued)
486 {
487 Self::send_checkpoint_verified_block_error(
488 duplicate_queued,
489 "dropping older checkpoint verified block: got newer duplicate block",
490 );
491 }
492
493 self.drain_finalized_queue_and_commit();
494 } else {
495 // We've finished committing checkpoint verified blocks to the finalized state,
496 // so drop any repeated queued blocks, and return an error.
497 //
498 // TODO: track the latest sent height, and drop any blocks under that height
499 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
500 Self::send_checkpoint_verified_block_error(
501 queued,
502 "already finished committing checkpoint verified blocks: dropped duplicate block, \
503 block is already committed to the state",
504 );
505
506 self.clear_finalized_block_queue(
507 "already finished committing checkpoint verified blocks: dropped duplicate block, \
508 block is already committed to the state",
509 );
510 }
511
512 if self.finalized_state_queued_blocks.is_empty() {
513 self.max_finalized_queue_height = f64::NAN;
514 } else if self.max_finalized_queue_height.is_nan()
515 || self.max_finalized_queue_height < queued_height.0 as f64
516 {
517 // if there are still blocks in the queue, then either:
518 // - the new block was lower than the old maximum, and there was a gap before it,
519 // so the maximum is still the same (and we skip this code), or
520 // - the new block is higher than the old maximum, and there is at least one gap
521 // between the finalized tip and the new maximum
522 self.max_finalized_queue_height = queued_height.0 as f64;
523 }
524
525 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
526 metrics::gauge!("state.checkpoint.queued.block.count")
527 .set(self.finalized_state_queued_blocks.len() as f64);
528
529 rsp_rx
530 }
531
532 /// Finds finalized state queue blocks to be committed to the state in order,
533 /// removes them from the queue, and sends them to the block commit task.
534 ///
535 /// After queueing a finalized block, this method checks whether the newly
536 /// queued block (and any of its descendants) can be committed to the state.
537 ///
538 /// Returns an error if the block commit channel has been closed.
539 pub fn drain_finalized_queue_and_commit(&mut self) {
540 use tokio::sync::mpsc::error::{SendError, TryRecvError};
541
542 // # Correctness & Performance
543 //
544 // This method must not block, access the database, or perform CPU-intensive tasks,
545 // because it is called directly from the tokio executor's Future threads.
546
547 // If a block failed, we need to start again from a valid tip.
548 match self.invalid_block_write_reset_receiver.try_recv() {
549 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
550 Err(TryRecvError::Disconnected) => {
551 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
552 return;
553 }
554 // There are no errors, so we can just use the last block hash we sent
555 Err(TryRecvError::Empty) => {}
556 }
557
558 while let Some(queued_block) = self
559 .finalized_state_queued_blocks
560 .remove(&self.finalized_block_write_last_sent_hash)
561 {
562 let last_sent_finalized_block_height = queued_block.0.height;
563
564 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
565
566 // If we've finished sending finalized blocks, ignore any repeated blocks.
567 // (Blocks can be repeated after a syncer reset.)
568 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
569 let send_result = finalized_block_write_sender.send(queued_block);
570
571 // If the receiver is closed, we can't send any more blocks.
572 if let Err(SendError(queued)) = send_result {
573 // If Zebra is shutting down, drop blocks and return an error.
574 Self::send_checkpoint_verified_block_error(
575 queued,
576 "block commit task exited. Is Zebra shutting down?",
577 );
578
579 self.clear_finalized_block_queue(
580 "block commit task exited. Is Zebra shutting down?",
581 );
582 } else {
583 metrics::gauge!("state.checkpoint.sent.block.height")
584 .set(last_sent_finalized_block_height.0 as f64);
585 };
586 }
587 }
588 }
589
590 /// Drops all finalized state queue blocks, and sends an error on their result channels.
591 fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
592 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
593 Self::send_checkpoint_verified_block_error(queued, error.clone());
594 }
595 }
596
597 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
598 fn send_checkpoint_verified_block_error(
599 queued: QueuedCheckpointVerified,
600 error: impl Into<BoxError>,
601 ) {
602 let (finalized, rsp_tx) = queued;
603
604 // The block sender might have already given up on this block,
605 // so ignore any channel send errors.
606 let _ = rsp_tx.send(Err(error.into()));
607 std::mem::drop(finalized);
608 }
609
610 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
611 fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
612 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
613 Self::send_semantically_verified_block_error(queued, error.clone());
614 }
615 }
616
617 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
618 fn send_semantically_verified_block_error(
619 queued: QueuedSemanticallyVerified,
620 error: CommitSemanticallyVerifiedError,
621 ) {
622 let (finalized, rsp_tx) = queued;
623
624 // The block sender might have already given up on this block,
625 // so ignore any channel send errors.
626 let _ = rsp_tx.send(Err(error));
627 std::mem::drop(finalized);
628 }
629
630 /// Queue a semantically verified block for contextual verification and check if any queued
631 /// blocks are ready to be verified and committed to the state.
632 ///
633 /// This function encodes the logic for [committing non-finalized blocks][1]
634 /// in RFC0005.
635 ///
636 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
637 #[instrument(level = "debug", skip(self, semantically_verrified))]
638 fn queue_and_commit_to_non_finalized_state(
639 &mut self,
640 semantically_verrified: SemanticallyVerifiedBlock,
641 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
642 tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
643 let parent_hash = semantically_verrified.block.header.previous_block_hash;
644
645 if self
646 .non_finalized_block_write_sent_hashes
647 .contains(&semantically_verrified.hash)
648 {
649 let (rsp_tx, rsp_rx) = oneshot::channel();
650 let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
651 ValidateContextError::DuplicateCommitRequest {
652 block_hash: semantically_verrified.hash,
653 },
654 )));
655 return rsp_rx;
656 }
657
658 if self
659 .read_service
660 .db
661 .contains_height(semantically_verrified.height)
662 {
663 let (rsp_tx, rsp_rx) = oneshot::channel();
664 let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
665 ValidateContextError::AlreadyFinalized {
666 block_height: semantically_verrified.height,
667 },
668 )));
669 return rsp_rx;
670 }
671
672 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
673 // has been queued but not yet committed to the state fails the older request and replaces
674 // it with the newer request.
675 let rsp_rx = if let Some((_, old_rsp_tx)) = self
676 .non_finalized_state_queued_blocks
677 .get_mut(&semantically_verrified.hash)
678 {
679 tracing::debug!("replacing older queued request with new request");
680 let (mut rsp_tx, rsp_rx) = oneshot::channel();
681 std::mem::swap(old_rsp_tx, &mut rsp_tx);
682 let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
683 ValidateContextError::ReplacedByNewerRequest {
684 block_hash: semantically_verrified.hash,
685 },
686 )));
687 rsp_rx
688 } else {
689 let (rsp_tx, rsp_rx) = oneshot::channel();
690 self.non_finalized_state_queued_blocks
691 .queue((semantically_verrified, rsp_tx));
692 rsp_rx
693 };
694
695 // We've finished sending checkpoint verified blocks when:
696 // - we've sent the verified block for the last checkpoint, and
697 // - it has been successfully written to disk.
698 //
699 // We detect the last checkpoint by looking for non-finalized blocks
700 // that are a child of the last block we sent.
701 //
702 // TODO: configure the state with the last checkpoint hash instead?
703 if self.block_write_sender.finalized.is_some()
704 && self
705 .non_finalized_state_queued_blocks
706 .has_queued_children(self.finalized_block_write_last_sent_hash)
707 && self.read_service.db.finalized_tip_hash()
708 == self.finalized_block_write_last_sent_hash
709 {
710 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
711 // and move on to committing semantically verified blocks to the non-finalized state.
712 std::mem::drop(self.block_write_sender.finalized.take());
713 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
714 self.non_finalized_block_write_sent_hashes = SentHashes::default();
715 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
716 self.non_finalized_block_write_sent_hashes
717 .can_fork_chain_at_hashes = true;
718 // Send blocks from non-finalized queue
719 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
720 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
721 self.clear_finalized_block_queue(
722 "already finished committing checkpoint verified blocks: dropped duplicate block, \
723 block is already committed to the state",
724 );
725 } else if !self.can_fork_chain_at(&parent_hash) {
726 tracing::trace!("unready to verify, returning early");
727 } else if self.block_write_sender.finalized.is_none() {
728 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
729 self.send_ready_non_finalized_queued(parent_hash);
730
731 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
732 "Finalized state must have at least one block before committing non-finalized state",
733 );
734
735 self.non_finalized_state_queued_blocks
736 .prune_by_height(finalized_tip_height);
737
738 self.non_finalized_block_write_sent_hashes
739 .prune_by_height(finalized_tip_height);
740 }
741
742 rsp_rx
743 }
744
745 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
746 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
747 self.non_finalized_block_write_sent_hashes
748 .can_fork_chain_at(hash)
749 || &self.read_service.db.finalized_tip_hash() == hash
750 }
751
752 /// Returns `true` if `queued_height` is near the final checkpoint.
753 ///
754 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
755 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
756 ///
757 /// If it doesn't have the required UTXOs, some blocks will time out,
758 /// but succeed after a syncer restart.
759 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
760 queued_height >= self.full_verifier_utxo_lookahead
761 }
762
763 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
764 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
765 #[tracing::instrument(level = "debug", skip(self, new_parent))]
766 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
767 use tokio::sync::mpsc::error::SendError;
768 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
769 let mut new_parents: Vec<block::Hash> = vec![new_parent];
770
771 while let Some(parent_hash) = new_parents.pop() {
772 let queued_children = self
773 .non_finalized_state_queued_blocks
774 .dequeue_children(parent_hash);
775
776 for queued_child in queued_children {
777 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
778
779 self.non_finalized_block_write_sent_hashes
780 .add(&queued_child.0);
781 let send_result = non_finalized_block_write_sender.send(queued_child.into());
782
783 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
784 // If Zebra is shutting down, drop blocks and return an error.
785 Self::send_semantically_verified_block_error(
786 queued,
787 CommitSemanticallyVerifiedError::from(
788 ValidateContextError::CommitTaskExited,
789 ),
790 );
791
792 self.clear_non_finalized_block_queue(
793 CommitSemanticallyVerifiedError::from(
794 ValidateContextError::CommitTaskExited,
795 ),
796 );
797
798 return;
799 };
800
801 new_parents.push(hash);
802 }
803 }
804
805 self.non_finalized_block_write_sent_hashes.finish_batch();
806 };
807 }
808
809 /// Return the tip of the current best chain.
810 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
811 self.read_service.best_tip()
812 }
813
814 fn send_invalidate_block(
815 &self,
816 hash: block::Hash,
817 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
818 let (rsp_tx, rsp_rx) = oneshot::channel();
819
820 let Some(sender) = &self.block_write_sender.non_finalized else {
821 let _ = rsp_tx.send(Err(
822 "cannot invalidate blocks while still committing checkpointed blocks".into(),
823 ));
824 return rsp_rx;
825 };
826
827 if let Err(tokio::sync::mpsc::error::SendError(error)) =
828 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
829 {
830 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
831 unreachable!("should return the same Invalidate message could not be sent");
832 };
833
834 let _ = rsp_tx.send(Err(
835 "failed to send invalidate block request to block write task".into(),
836 ));
837 }
838
839 rsp_rx
840 }
841
842 fn send_reconsider_block(
843 &self,
844 hash: block::Hash,
845 ) -> oneshot::Receiver<Result<Vec<block::Hash>, BoxError>> {
846 let (rsp_tx, rsp_rx) = oneshot::channel();
847
848 let Some(sender) = &self.block_write_sender.non_finalized else {
849 let _ = rsp_tx.send(Err(
850 "cannot reconsider blocks while still committing checkpointed blocks".into(),
851 ));
852 return rsp_rx;
853 };
854
855 if let Err(tokio::sync::mpsc::error::SendError(error)) =
856 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
857 {
858 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
859 unreachable!("should return the same Reconsider message could not be sent");
860 };
861
862 let _ = rsp_tx.send(Err(
863 "failed to send reconsider block request to block write task".into(),
864 ));
865 }
866
867 rsp_rx
868 }
869
870 /// Assert some assumptions about the semantically verified `block` before it is queued.
871 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
872 // required by `Request::CommitSemanticallyVerifiedBlock` call
873 assert!(
874 block.height > self.network.mandatory_checkpoint_height(),
875 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
876 blocks, and the canopy activation block, must be committed to the state as finalized \
877 blocks"
878 );
879 }
880}
881
882impl ReadStateService {
883 /// Creates a new read-only state service, using the provided finalized state and
884 /// block write task handle.
885 ///
886 /// Returns the newly created service,
887 /// and a watch channel for updating the shared recent non-finalized chain.
888 pub(crate) fn new(
889 finalized_state: &FinalizedState,
890 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
891 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
892 ) -> Self {
893 let read_service = Self {
894 network: finalized_state.network(),
895 db: finalized_state.db.clone(),
896 non_finalized_state_receiver,
897 block_write_task,
898 };
899
900 tracing::debug!("created new read-only state service");
901
902 read_service
903 }
904
905 /// Return the tip of the current best chain.
906 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
907 read::best_tip(&self.latest_non_finalized_state(), &self.db)
908 }
909
910 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
911 fn latest_non_finalized_state(&self) -> NonFinalizedState {
912 self.non_finalized_state_receiver.cloned_watch_data()
913 }
914
915 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
916 #[allow(dead_code)]
917 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
918 self.latest_non_finalized_state().best_chain().cloned()
919 }
920
921 /// Test-only access to the inner database.
922 /// Can be used to modify the database without doing any consensus checks.
923 #[cfg(any(test, feature = "proptest-impl"))]
924 pub fn db(&self) -> &ZebraDb {
925 &self.db
926 }
927
928 /// Logs rocksdb metrics using the read only state service.
929 pub fn log_db_metrics(&self) {
930 self.db.print_db_metrics();
931 }
932}
933
934impl Service<Request> for StateService {
935 type Response = Response;
936 type Error = BoxError;
937 type Future =
938 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
939
940 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
941 // Check for panics in the block write task
942 let poll = self.read_service.poll_ready(cx);
943
944 // Prune outdated UTXO requests
945 let now = Instant::now();
946
947 if self.last_prune + Self::PRUNE_INTERVAL < now {
948 let tip = self.best_tip();
949 let old_len = self.pending_utxos.len();
950
951 self.pending_utxos.prune();
952 self.last_prune = now;
953
954 let new_len = self.pending_utxos.len();
955 let prune_count = old_len
956 .checked_sub(new_len)
957 .expect("prune does not add any utxo requests");
958 if prune_count > 0 {
959 tracing::debug!(
960 ?old_len,
961 ?new_len,
962 ?prune_count,
963 ?tip,
964 "pruned utxo requests"
965 );
966 } else {
967 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
968 }
969 }
970
971 poll
972 }
973
974 #[instrument(name = "state", skip(self, req))]
975 fn call(&mut self, req: Request) -> Self::Future {
976 req.count_metric();
977 let timer = CodeTimer::start();
978 let span = Span::current();
979
980 match req {
981 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
982 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
983 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
984 self.assert_block_can_be_validated(&semantically_verified);
985
986 self.pending_utxos
987 .check_against_ordered(&semantically_verified.new_outputs);
988
989 // # Performance
990 //
991 // Allow other async tasks to make progress while blocks are being verified
992 // and written to disk. But wait for the blocks to finish committing,
993 // so that `StateService` multi-block queries always observe a consistent state.
994 //
995 // Since each block is spawned into its own task,
996 // there shouldn't be any other code running in the same task,
997 // so we don't need to worry about blocking it:
998 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
999
1000 let rsp_rx = tokio::task::block_in_place(move || {
1001 span.in_scope(|| {
1002 self.queue_and_commit_to_non_finalized_state(semantically_verified)
1003 })
1004 });
1005
1006 // TODO:
1007 // - check for panics in the block write task here,
1008 // as well as in poll_ready()
1009
1010 // The work is all done, the future just waits on a channel for the result
1011 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
1012
1013 // Await the channel response, mapping any receive error into a BoxError.
1014 // Then flatten the nested Result by converting the inner CommitSemanticallyVerifiedError into a BoxError.
1015 let span = Span::current();
1016 async move {
1017 rsp_rx
1018 .await
1019 .map_err(|_recv_error| {
1020 BoxError::from(CommitSemanticallyVerifiedError::from(
1021 ValidateContextError::NotReadyToBeCommitted,
1022 ))
1023 })
1024 // TODO: replace with Result::flatten once it stabilises
1025 // https://github.com/rust-lang/rust/issues/70142
1026 .and_then(|res| res.map_err(BoxError::from))
1027 .map(Response::Committed)
1028 }
1029 .instrument(span)
1030 .boxed()
1031 }
1032
1033 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
1034 // Accesses shared writeable state in the StateService.
1035 Request::CommitCheckpointVerifiedBlock(finalized) => {
1036 // # Consensus
1037 //
1038 // A semantic block verification could have called AwaitUtxo
1039 // before this checkpoint verified block arrived in the state.
1040 // So we need to check for pending UTXO requests sent by running
1041 // semantic block verifications.
1042 //
1043 // This check is redundant for most checkpoint verified blocks,
1044 // because semantic verification can only succeed near the final
1045 // checkpoint, when all the UTXOs are available for the verifying block.
1046 //
1047 // (Checkpoint block UTXOs are verified using block hash checkpoints
1048 // and transaction merkle tree block header commitments.)
1049 self.pending_utxos
1050 .check_against_ordered(&finalized.new_outputs);
1051
1052 // # Performance
1053 //
1054 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1055 // so we can run it directly in the tokio executor's Future threads.
1056 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1057
1058 // TODO:
1059 // - check for panics in the block write task here,
1060 // as well as in poll_ready()
1061
1062 // The work is all done, the future just waits on a channel for the result
1063 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1064
1065 async move {
1066 rsp_rx
1067 .await
1068 .map_err(|_recv_error| {
1069 BoxError::from("block was dropped from the queue of finalized blocks")
1070 })
1071 // TODO: replace with Result::flatten once it stabilises
1072 // https://github.com/rust-lang/rust/issues/70142
1073 .and_then(convert::identity)
1074 .map(Response::Committed)
1075 }
1076 .instrument(span)
1077 .boxed()
1078 }
1079
1080 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1081 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1082 Request::AwaitUtxo(outpoint) => {
1083 // Prepare the AwaitUtxo future from PendingUxtos.
1084 let response_fut = self.pending_utxos.queue(outpoint);
1085 // Only instrument `response_fut`, the ReadStateService already
1086 // instruments its requests with the same span.
1087
1088 let response_fut = response_fut.instrument(span).boxed();
1089
1090 // Check the non-finalized block queue outside the returned future,
1091 // so we can access mutable state fields.
1092 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1093 self.pending_utxos.respond(&outpoint, utxo);
1094
1095 // We're finished, the returned future gets the UTXO from the respond() channel.
1096 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1097
1098 return response_fut;
1099 }
1100
1101 // Check the sent non-finalized blocks
1102 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1103 self.pending_utxos.respond(&outpoint, utxo);
1104
1105 // We're finished, the returned future gets the UTXO from the respond() channel.
1106 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1107
1108 return response_fut;
1109 }
1110
1111 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1112 // because it is only used during checkpoint verification.
1113 //
1114 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1115 // See #5126 for details.
1116
1117 // Manually send a request to the ReadStateService,
1118 // to get UTXOs from any non-finalized chain or the finalized chain.
1119 let read_service = self.read_service.clone();
1120
1121 // Run the request in an async block, so we can await the response.
1122 async move {
1123 let req = ReadRequest::AnyChainUtxo(outpoint);
1124
1125 let rsp = read_service.oneshot(req).await?;
1126
1127 // Optional TODO:
1128 // - make pending_utxos.respond() async using a channel,
1129 // so we can respond to all waiting requests here
1130 //
1131 // This change is not required for correctness, because:
1132 // - any waiting requests should have returned when the block was sent to the state
1133 // - otherwise, the request returns immediately if:
1134 // - the block is in the non-finalized queue, or
1135 // - the block is in any non-finalized chain or the finalized state
1136 //
1137 // And if the block is in the finalized queue,
1138 // that's rare enough that a retry is ok.
1139 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1140 // We got a UTXO, so we replace the response future with the result own.
1141 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1142
1143 return Ok(Response::Utxo(utxo));
1144 }
1145
1146 // We're finished, but the returned future is waiting on the respond() channel.
1147 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1148
1149 response_fut.await
1150 }
1151 .boxed()
1152 }
1153
1154 // Used by sync, inbound, and block verifier to check if a block is already in the state
1155 // before downloading or validating it.
1156 Request::KnownBlock(hash) => {
1157 let timer = CodeTimer::start();
1158
1159 let read_service = self.read_service.clone();
1160
1161 async move {
1162 let response = read::non_finalized_state_contains_block_hash(
1163 &read_service.latest_non_finalized_state(),
1164 hash,
1165 )
1166 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1167
1168 // The work is done in the future.
1169 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1170
1171 Ok(Response::KnownBlock(response))
1172 }
1173 .boxed()
1174 }
1175
1176 Request::InvalidateBlock(block_hash) => {
1177 let rsp_rx = tokio::task::block_in_place(move || {
1178 span.in_scope(|| self.send_invalidate_block(block_hash))
1179 });
1180
1181 let span = Span::current();
1182 async move {
1183 rsp_rx
1184 .await
1185 .map_err(|_recv_error| {
1186 BoxError::from("invalidate block request was unexpectedly dropped")
1187 })
1188 // TODO: replace with Result::flatten once it stabilises
1189 // https://github.com/rust-lang/rust/issues/70142
1190 .and_then(convert::identity)
1191 .map(Response::Invalidated)
1192 }
1193 .instrument(span)
1194 .boxed()
1195 }
1196
1197 Request::ReconsiderBlock(block_hash) => {
1198 let rsp_rx = tokio::task::block_in_place(move || {
1199 span.in_scope(|| self.send_reconsider_block(block_hash))
1200 });
1201
1202 let span = Span::current();
1203 async move {
1204 rsp_rx
1205 .await
1206 .map_err(|_recv_error| {
1207 BoxError::from("reconsider block request was unexpectedly dropped")
1208 })
1209 // TODO: replace with Result::flatten once it stabilises
1210 // https://github.com/rust-lang/rust/issues/70142
1211 .and_then(convert::identity)
1212 .map(Response::Reconsidered)
1213 }
1214 .instrument(span)
1215 .boxed()
1216 }
1217
1218 // Runs concurrently using the ReadStateService
1219 Request::Tip
1220 | Request::Depth(_)
1221 | Request::BestChainNextMedianTimePast
1222 | Request::BestChainBlockHash(_)
1223 | Request::BlockLocator
1224 | Request::Transaction(_)
1225 | Request::UnspentBestChainUtxo(_)
1226 | Request::Block(_)
1227 | Request::BlockAndSize(_)
1228 | Request::BlockHeader(_)
1229 | Request::FindBlockHashes { .. }
1230 | Request::FindBlockHeaders { .. }
1231 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1232 // Redirect the request to the concurrent ReadStateService
1233 let read_service = self.read_service.clone();
1234
1235 async move {
1236 let req = req
1237 .try_into()
1238 .expect("ReadRequest conversion should not fail");
1239
1240 let rsp = read_service.oneshot(req).await?;
1241 let rsp = rsp.try_into().expect("Response conversion should not fail");
1242
1243 Ok(rsp)
1244 }
1245 .boxed()
1246 }
1247
1248 Request::CheckBlockProposalValidity(_) => {
1249 // Redirect the request to the concurrent ReadStateService
1250 let read_service = self.read_service.clone();
1251
1252 async move {
1253 let req = req
1254 .try_into()
1255 .expect("ReadRequest conversion should not fail");
1256
1257 let rsp = read_service.oneshot(req).await?;
1258 let rsp = rsp.try_into().expect("Response conversion should not fail");
1259
1260 Ok(rsp)
1261 }
1262 .boxed()
1263 }
1264 }
1265 }
1266}
1267
1268impl Service<ReadRequest> for ReadStateService {
1269 type Response = ReadResponse;
1270 type Error = BoxError;
1271 type Future =
1272 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1273
1274 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1275 // Check for panics in the block write task
1276 //
1277 // TODO: move into a check_for_panics() method
1278 let block_write_task = self.block_write_task.take();
1279
1280 if let Some(block_write_task) = block_write_task {
1281 if block_write_task.is_finished() {
1282 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1283 // We are the last state with a reference to this task, so we can propagate any panics
1284 if let Err(thread_panic) = block_write_task.join() {
1285 std::panic::resume_unwind(thread_panic);
1286 }
1287 }
1288 } else {
1289 // It hasn't finished, so we need to put it back
1290 self.block_write_task = Some(block_write_task);
1291 }
1292 }
1293
1294 self.db.check_for_panics();
1295
1296 Poll::Ready(Ok(()))
1297 }
1298
1299 #[instrument(name = "read_state", skip(self, req))]
1300 fn call(&mut self, req: ReadRequest) -> Self::Future {
1301 req.count_metric();
1302 let timer = CodeTimer::start();
1303 let span = Span::current();
1304
1305 match req {
1306 // Used by the `getblockchaininfo` RPC.
1307 ReadRequest::UsageInfo => {
1308 let db = self.db.clone();
1309
1310 tokio::task::spawn_blocking(move || {
1311 span.in_scope(move || {
1312 // The work is done in the future.
1313
1314 let db_size = db.size();
1315
1316 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1317
1318 Ok(ReadResponse::UsageInfo(db_size))
1319 })
1320 })
1321 .wait_for_panics()
1322 }
1323
1324 // Used by the StateService.
1325 ReadRequest::Tip => {
1326 let state = self.clone();
1327
1328 tokio::task::spawn_blocking(move || {
1329 span.in_scope(move || {
1330 let tip = state.non_finalized_state_receiver.with_watch_data(
1331 |non_finalized_state| {
1332 read::tip(non_finalized_state.best_chain(), &state.db)
1333 },
1334 );
1335
1336 // The work is done in the future.
1337 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1338
1339 Ok(ReadResponse::Tip(tip))
1340 })
1341 })
1342 .wait_for_panics()
1343 }
1344
1345 // Used by `getblockchaininfo` RPC method.
1346 ReadRequest::TipPoolValues => {
1347 let state = self.clone();
1348
1349 tokio::task::spawn_blocking(move || {
1350 span.in_scope(move || {
1351 let tip_with_value_balance = state
1352 .non_finalized_state_receiver
1353 .with_watch_data(|non_finalized_state| {
1354 read::tip_with_value_balance(
1355 non_finalized_state.best_chain(),
1356 &state.db,
1357 )
1358 });
1359
1360 // The work is done in the future.
1361 // TODO: Do this in the Drop impl with the variant name?
1362 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1363
1364 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1365 .ok_or(BoxError::from("no chain tip available yet"))?;
1366
1367 Ok(ReadResponse::TipPoolValues {
1368 tip_height,
1369 tip_hash,
1370 value_balance,
1371 })
1372 })
1373 })
1374 .wait_for_panics()
1375 }
1376
1377 // Used by getblock
1378 ReadRequest::BlockInfo(hash_or_height) => {
1379 let state = self.clone();
1380
1381 tokio::task::spawn_blocking(move || {
1382 span.in_scope(move || {
1383 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1384 |non_finalized_state| {
1385 read::block_info(
1386 non_finalized_state.best_chain(),
1387 &state.db,
1388 hash_or_height,
1389 )
1390 },
1391 );
1392
1393 // The work is done in the future.
1394 // TODO: Do this in the Drop impl with the variant name?
1395 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1396
1397 Ok(ReadResponse::BlockInfo(value_balance))
1398 })
1399 })
1400 .wait_for_panics()
1401 }
1402
1403 // Used by the StateService.
1404 ReadRequest::Depth(hash) => {
1405 let state = self.clone();
1406
1407 tokio::task::spawn_blocking(move || {
1408 span.in_scope(move || {
1409 let depth = state.non_finalized_state_receiver.with_watch_data(
1410 |non_finalized_state| {
1411 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1412 },
1413 );
1414
1415 // The work is done in the future.
1416 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1417
1418 Ok(ReadResponse::Depth(depth))
1419 })
1420 })
1421 .wait_for_panics()
1422 }
1423
1424 // Used by the StateService.
1425 ReadRequest::BestChainNextMedianTimePast => {
1426 let state = self.clone();
1427
1428 tokio::task::spawn_blocking(move || {
1429 span.in_scope(move || {
1430 let non_finalized_state = state.latest_non_finalized_state();
1431 let median_time_past =
1432 read::next_median_time_past(&non_finalized_state, &state.db);
1433
1434 // The work is done in the future.
1435 timer.finish(
1436 module_path!(),
1437 line!(),
1438 "ReadRequest::BestChainNextMedianTimePast",
1439 );
1440
1441 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1442 })
1443 })
1444 .wait_for_panics()
1445 }
1446
1447 // Used by the get_block (raw) RPC and the StateService.
1448 ReadRequest::Block(hash_or_height) => {
1449 let state = self.clone();
1450
1451 tokio::task::spawn_blocking(move || {
1452 span.in_scope(move || {
1453 let block = state.non_finalized_state_receiver.with_watch_data(
1454 |non_finalized_state| {
1455 read::block(
1456 non_finalized_state.best_chain(),
1457 &state.db,
1458 hash_or_height,
1459 )
1460 },
1461 );
1462
1463 // The work is done in the future.
1464 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1465
1466 Ok(ReadResponse::Block(block))
1467 })
1468 })
1469 .wait_for_panics()
1470 }
1471
1472 // Used by the get_block (raw) RPC and the StateService.
1473 ReadRequest::BlockAndSize(hash_or_height) => {
1474 let state = self.clone();
1475
1476 tokio::task::spawn_blocking(move || {
1477 span.in_scope(move || {
1478 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1479 |non_finalized_state| {
1480 read::block_and_size(
1481 non_finalized_state.best_chain(),
1482 &state.db,
1483 hash_or_height,
1484 )
1485 },
1486 );
1487
1488 // The work is done in the future.
1489 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1490
1491 Ok(ReadResponse::BlockAndSize(block_and_size))
1492 })
1493 })
1494 .wait_for_panics()
1495 }
1496
1497 // Used by the get_block (verbose) RPC and the StateService.
1498 ReadRequest::BlockHeader(hash_or_height) => {
1499 let state = self.clone();
1500
1501 tokio::task::spawn_blocking(move || {
1502 span.in_scope(move || {
1503 let best_chain = state.latest_best_chain();
1504
1505 let height = hash_or_height
1506 .height_or_else(|hash| {
1507 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1508 })
1509 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1510
1511 let hash = hash_or_height
1512 .hash_or_else(|height| {
1513 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1514 })
1515 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1516
1517 let next_height = height.next()?;
1518 let next_block_hash =
1519 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1520
1521 let header = read::block_header(best_chain, &state.db, height.into())
1522 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1523
1524 // The work is done in the future.
1525 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1526
1527 Ok(ReadResponse::BlockHeader {
1528 header,
1529 hash,
1530 height,
1531 next_block_hash,
1532 })
1533 })
1534 })
1535 .wait_for_panics()
1536 }
1537
1538 // For the get_raw_transaction RPC and the StateService.
1539 ReadRequest::Transaction(hash) => {
1540 let state = self.clone();
1541
1542 tokio::task::spawn_blocking(move || {
1543 span.in_scope(move || {
1544 let response =
1545 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1546
1547 // The work is done in the future.
1548 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1549
1550 Ok(ReadResponse::Transaction(response))
1551 })
1552 })
1553 .wait_for_panics()
1554 }
1555
1556 // Used by the getblock (verbose) RPC.
1557 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1558 let state = self.clone();
1559
1560 tokio::task::spawn_blocking(move || {
1561 span.in_scope(move || {
1562 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1563 |non_finalized_state| {
1564 read::transaction_hashes_for_block(
1565 non_finalized_state.best_chain(),
1566 &state.db,
1567 hash_or_height,
1568 )
1569 },
1570 );
1571
1572 // The work is done in the future.
1573 timer.finish(
1574 module_path!(),
1575 line!(),
1576 "ReadRequest::TransactionIdsForBlock",
1577 );
1578
1579 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1580 })
1581 })
1582 .wait_for_panics()
1583 }
1584
1585 #[cfg(feature = "indexer")]
1586 ReadRequest::SpendingTransactionId(spend) => {
1587 let state = self.clone();
1588
1589 tokio::task::spawn_blocking(move || {
1590 span.in_scope(move || {
1591 let spending_transaction_id = state
1592 .non_finalized_state_receiver
1593 .with_watch_data(|non_finalized_state| {
1594 read::spending_transaction_hash(
1595 non_finalized_state.best_chain(),
1596 &state.db,
1597 spend,
1598 )
1599 });
1600
1601 // The work is done in the future.
1602 timer.finish(
1603 module_path!(),
1604 line!(),
1605 "ReadRequest::TransactionIdForSpentOutPoint",
1606 );
1607
1608 Ok(ReadResponse::TransactionId(spending_transaction_id))
1609 })
1610 })
1611 .wait_for_panics()
1612 }
1613
1614 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1615 let state = self.clone();
1616
1617 tokio::task::spawn_blocking(move || {
1618 span.in_scope(move || {
1619 let utxo = state.non_finalized_state_receiver.with_watch_data(
1620 |non_finalized_state| {
1621 read::unspent_utxo(
1622 non_finalized_state.best_chain(),
1623 &state.db,
1624 outpoint,
1625 )
1626 },
1627 );
1628
1629 // The work is done in the future.
1630 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1631
1632 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1633 })
1634 })
1635 .wait_for_panics()
1636 }
1637
1638 // Manually used by the StateService to implement part of AwaitUtxo.
1639 ReadRequest::AnyChainUtxo(outpoint) => {
1640 let state = self.clone();
1641
1642 tokio::task::spawn_blocking(move || {
1643 span.in_scope(move || {
1644 let utxo = state.non_finalized_state_receiver.with_watch_data(
1645 |non_finalized_state| {
1646 read::any_utxo(non_finalized_state, &state.db, outpoint)
1647 },
1648 );
1649
1650 // The work is done in the future.
1651 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1652
1653 Ok(ReadResponse::AnyChainUtxo(utxo))
1654 })
1655 })
1656 .wait_for_panics()
1657 }
1658
1659 // Used by the StateService.
1660 ReadRequest::BlockLocator => {
1661 let state = self.clone();
1662
1663 tokio::task::spawn_blocking(move || {
1664 span.in_scope(move || {
1665 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1666 |non_finalized_state| {
1667 read::block_locator(non_finalized_state.best_chain(), &state.db)
1668 },
1669 );
1670
1671 // The work is done in the future.
1672 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1673
1674 Ok(ReadResponse::BlockLocator(
1675 block_locator.unwrap_or_default(),
1676 ))
1677 })
1678 })
1679 .wait_for_panics()
1680 }
1681
1682 // Used by the StateService.
1683 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1684 let state = self.clone();
1685
1686 tokio::task::spawn_blocking(move || {
1687 span.in_scope(move || {
1688 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1689 |non_finalized_state| {
1690 read::find_chain_hashes(
1691 non_finalized_state.best_chain(),
1692 &state.db,
1693 known_blocks,
1694 stop,
1695 MAX_FIND_BLOCK_HASHES_RESULTS,
1696 )
1697 },
1698 );
1699
1700 // The work is done in the future.
1701 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1702
1703 Ok(ReadResponse::BlockHashes(block_hashes))
1704 })
1705 })
1706 .wait_for_panics()
1707 }
1708
1709 // Used by the StateService.
1710 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1711 let state = self.clone();
1712
1713 tokio::task::spawn_blocking(move || {
1714 span.in_scope(move || {
1715 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1716 |non_finalized_state| {
1717 read::find_chain_headers(
1718 non_finalized_state.best_chain(),
1719 &state.db,
1720 known_blocks,
1721 stop,
1722 MAX_FIND_BLOCK_HEADERS_RESULTS,
1723 )
1724 },
1725 );
1726
1727 let block_headers = block_headers
1728 .into_iter()
1729 .map(|header| CountedHeader { header })
1730 .collect();
1731
1732 // The work is done in the future.
1733 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1734
1735 Ok(ReadResponse::BlockHeaders(block_headers))
1736 })
1737 })
1738 .wait_for_panics()
1739 }
1740
1741 ReadRequest::SaplingTree(hash_or_height) => {
1742 let state = self.clone();
1743
1744 tokio::task::spawn_blocking(move || {
1745 span.in_scope(move || {
1746 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1747 |non_finalized_state| {
1748 read::sapling_tree(
1749 non_finalized_state.best_chain(),
1750 &state.db,
1751 hash_or_height,
1752 )
1753 },
1754 );
1755
1756 // The work is done in the future.
1757 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1758
1759 Ok(ReadResponse::SaplingTree(sapling_tree))
1760 })
1761 })
1762 .wait_for_panics()
1763 }
1764
1765 ReadRequest::OrchardTree(hash_or_height) => {
1766 let state = self.clone();
1767
1768 tokio::task::spawn_blocking(move || {
1769 span.in_scope(move || {
1770 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1771 |non_finalized_state| {
1772 read::orchard_tree(
1773 non_finalized_state.best_chain(),
1774 &state.db,
1775 hash_or_height,
1776 )
1777 },
1778 );
1779
1780 // The work is done in the future.
1781 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1782
1783 Ok(ReadResponse::OrchardTree(orchard_tree))
1784 })
1785 })
1786 .wait_for_panics()
1787 }
1788
1789 ReadRequest::SaplingSubtrees { start_index, limit } => {
1790 let state = self.clone();
1791
1792 tokio::task::spawn_blocking(move || {
1793 span.in_scope(move || {
1794 let end_index = limit
1795 .and_then(|limit| start_index.0.checked_add(limit.0))
1796 .map(NoteCommitmentSubtreeIndex);
1797
1798 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1799 |non_finalized_state| {
1800 if let Some(end_index) = end_index {
1801 read::sapling_subtrees(
1802 non_finalized_state.best_chain(),
1803 &state.db,
1804 start_index..end_index,
1805 )
1806 } else {
1807 // If there is no end bound, just return all the trees.
1808 // If the end bound would overflow, just returns all the trees, because that's what
1809 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1810 // the trees run out.)
1811 read::sapling_subtrees(
1812 non_finalized_state.best_chain(),
1813 &state.db,
1814 start_index..,
1815 )
1816 }
1817 },
1818 );
1819
1820 // The work is done in the future.
1821 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1822
1823 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1824 })
1825 })
1826 .wait_for_panics()
1827 }
1828
1829 ReadRequest::OrchardSubtrees { start_index, limit } => {
1830 let state = self.clone();
1831
1832 tokio::task::spawn_blocking(move || {
1833 span.in_scope(move || {
1834 let end_index = limit
1835 .and_then(|limit| start_index.0.checked_add(limit.0))
1836 .map(NoteCommitmentSubtreeIndex);
1837
1838 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1839 |non_finalized_state| {
1840 if let Some(end_index) = end_index {
1841 read::orchard_subtrees(
1842 non_finalized_state.best_chain(),
1843 &state.db,
1844 start_index..end_index,
1845 )
1846 } else {
1847 // If there is no end bound, just return all the trees.
1848 // If the end bound would overflow, just returns all the trees, because that's what
1849 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1850 // the trees run out.)
1851 read::orchard_subtrees(
1852 non_finalized_state.best_chain(),
1853 &state.db,
1854 start_index..,
1855 )
1856 }
1857 },
1858 );
1859
1860 // The work is done in the future.
1861 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1862
1863 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1864 })
1865 })
1866 .wait_for_panics()
1867 }
1868
1869 // For the get_address_balance RPC.
1870 ReadRequest::AddressBalance(addresses) => {
1871 let state = self.clone();
1872
1873 tokio::task::spawn_blocking(move || {
1874 span.in_scope(move || {
1875 let (balance, received) = state
1876 .non_finalized_state_receiver
1877 .with_watch_data(|non_finalized_state| {
1878 read::transparent_balance(
1879 non_finalized_state.best_chain().cloned(),
1880 &state.db,
1881 addresses,
1882 )
1883 })?;
1884
1885 // The work is done in the future.
1886 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1887
1888 Ok(ReadResponse::AddressBalance { balance, received })
1889 })
1890 })
1891 .wait_for_panics()
1892 }
1893
1894 // For the get_address_tx_ids RPC.
1895 ReadRequest::TransactionIdsByAddresses {
1896 addresses,
1897 height_range,
1898 } => {
1899 let state = self.clone();
1900
1901 tokio::task::spawn_blocking(move || {
1902 span.in_scope(move || {
1903 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1904 |non_finalized_state| {
1905 read::transparent_tx_ids(
1906 non_finalized_state.best_chain(),
1907 &state.db,
1908 addresses,
1909 height_range,
1910 )
1911 },
1912 );
1913
1914 // The work is done in the future.
1915 timer.finish(
1916 module_path!(),
1917 line!(),
1918 "ReadRequest::TransactionIdsByAddresses",
1919 );
1920
1921 tx_ids.map(ReadResponse::AddressesTransactionIds)
1922 })
1923 })
1924 .wait_for_panics()
1925 }
1926
1927 // For the get_address_utxos RPC.
1928 ReadRequest::UtxosByAddresses(addresses) => {
1929 let state = self.clone();
1930
1931 tokio::task::spawn_blocking(move || {
1932 span.in_scope(move || {
1933 let utxos = state.non_finalized_state_receiver.with_watch_data(
1934 |non_finalized_state| {
1935 read::address_utxos(
1936 &state.network,
1937 non_finalized_state.best_chain(),
1938 &state.db,
1939 addresses,
1940 )
1941 },
1942 );
1943
1944 // The work is done in the future.
1945 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1946
1947 utxos.map(ReadResponse::AddressUtxos)
1948 })
1949 })
1950 .wait_for_panics()
1951 }
1952
1953 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1954 let state = self.clone();
1955
1956 tokio::task::spawn_blocking(move || {
1957 span.in_scope(move || {
1958 let latest_non_finalized_best_chain =
1959 state.latest_non_finalized_state().best_chain().cloned();
1960
1961 check::nullifier::tx_no_duplicates_in_chain(
1962 &state.db,
1963 latest_non_finalized_best_chain.as_ref(),
1964 &unmined_tx.transaction,
1965 )?;
1966
1967 check::anchors::tx_anchors_refer_to_final_treestates(
1968 &state.db,
1969 latest_non_finalized_best_chain.as_ref(),
1970 &unmined_tx,
1971 )?;
1972
1973 // The work is done in the future.
1974 timer.finish(
1975 module_path!(),
1976 line!(),
1977 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1978 );
1979
1980 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1981 })
1982 })
1983 .wait_for_panics()
1984 }
1985
1986 // Used by the get_block and get_block_hash RPCs.
1987 ReadRequest::BestChainBlockHash(height) => {
1988 let state = self.clone();
1989
1990 // # Performance
1991 //
1992 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1993
1994 tokio::task::spawn_blocking(move || {
1995 span.in_scope(move || {
1996 let hash = state.non_finalized_state_receiver.with_watch_data(
1997 |non_finalized_state| {
1998 read::hash_by_height(
1999 non_finalized_state.best_chain(),
2000 &state.db,
2001 height,
2002 )
2003 },
2004 );
2005
2006 // The work is done in the future.
2007 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
2008
2009 Ok(ReadResponse::BlockHash(hash))
2010 })
2011 })
2012 .wait_for_panics()
2013 }
2014
2015 // Used by get_block_template and getblockchaininfo RPCs.
2016 ReadRequest::ChainInfo => {
2017 let state = self.clone();
2018 let latest_non_finalized_state = self.latest_non_finalized_state();
2019
2020 // # Performance
2021 //
2022 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2023
2024 tokio::task::spawn_blocking(move || {
2025 span.in_scope(move || {
2026 // # Correctness
2027 //
2028 // It is ok to do these lookups using multiple database calls. Finalized state updates
2029 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2030 //
2031 // If there is a large overlap between the non-finalized and finalized states,
2032 // where the finalized tip is above the non-finalized tip,
2033 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
2034 //
2035 // In that case, the `getblocktemplate` RPC will return an error because Zebra
2036 // is not synced to the tip. That check happens before the RPC makes this request.
2037 let get_block_template_info =
2038 read::difficulty::get_block_template_chain_info(
2039 &latest_non_finalized_state,
2040 &state.db,
2041 &state.network,
2042 );
2043
2044 // The work is done in the future.
2045 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2046
2047 get_block_template_info.map(ReadResponse::ChainInfo)
2048 })
2049 })
2050 .wait_for_panics()
2051 }
2052
2053 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2054 ReadRequest::SolutionRate { num_blocks, height } => {
2055 let state = self.clone();
2056
2057 // # Performance
2058 //
2059 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2060
2061 tokio::task::spawn_blocking(move || {
2062 span.in_scope(move || {
2063 let latest_non_finalized_state = state.latest_non_finalized_state();
2064 // # Correctness
2065 //
2066 // It is ok to do these lookups using multiple database calls. Finalized state updates
2067 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2068 //
2069 // The worst that can happen here is that the default `start_hash` will be below
2070 // the chain tip.
2071 let (tip_height, tip_hash) =
2072 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2073 Some(tip_hash) => tip_hash,
2074 None => return Ok(ReadResponse::SolutionRate(None)),
2075 };
2076
2077 let start_hash = match height {
2078 Some(height) if height < tip_height => read::hash_by_height(
2079 latest_non_finalized_state.best_chain(),
2080 &state.db,
2081 height,
2082 ),
2083 // use the chain tip hash if height is above it or not provided.
2084 _ => Some(tip_hash),
2085 };
2086
2087 let solution_rate = start_hash.and_then(|start_hash| {
2088 read::difficulty::solution_rate(
2089 &latest_non_finalized_state,
2090 &state.db,
2091 num_blocks,
2092 start_hash,
2093 )
2094 });
2095
2096 // The work is done in the future.
2097 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2098
2099 Ok(ReadResponse::SolutionRate(solution_rate))
2100 })
2101 })
2102 .wait_for_panics()
2103 }
2104
2105 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2106 let state = self.clone();
2107
2108 // # Performance
2109 //
2110 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2111
2112 tokio::task::spawn_blocking(move || {
2113 span.in_scope(move || {
2114 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2115 let mut latest_non_finalized_state = state.latest_non_finalized_state();
2116
2117 // The previous block of a valid proposal must be on the best chain tip.
2118 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2119 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2120 };
2121
2122 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2123 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2124 }
2125
2126 // This clone of the non-finalized state is dropped when this closure returns.
2127 // The non-finalized state that's used in the rest of the state (including finalizing
2128 // blocks into the db) is not mutated here.
2129 //
2130 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2131 latest_non_finalized_state.disable_metrics();
2132
2133 write::validate_and_commit_non_finalized(
2134 &state.db,
2135 &mut latest_non_finalized_state,
2136 semantically_verified,
2137 )?;
2138
2139 // The work is done in the future.
2140 timer.finish(
2141 module_path!(),
2142 line!(),
2143 "ReadRequest::CheckBlockProposalValidity",
2144 );
2145
2146 Ok(ReadResponse::ValidBlockProposal)
2147 })
2148 })
2149 .wait_for_panics()
2150 }
2151
2152 ReadRequest::TipBlockSize => {
2153 let state = self.clone();
2154
2155 tokio::task::spawn_blocking(move || {
2156 span.in_scope(move || {
2157 // Get the best chain tip height.
2158 let tip_height = state
2159 .non_finalized_state_receiver
2160 .with_watch_data(|non_finalized_state| {
2161 read::tip_height(non_finalized_state.best_chain(), &state.db)
2162 })
2163 .unwrap_or(Height(0));
2164
2165 // Get the block at the best chain tip height.
2166 let block = state.non_finalized_state_receiver.with_watch_data(
2167 |non_finalized_state| {
2168 read::block(
2169 non_finalized_state.best_chain(),
2170 &state.db,
2171 tip_height.into(),
2172 )
2173 },
2174 );
2175
2176 // The work is done in the future.
2177 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2178
2179 // Respond with the length of the obtained block if any.
2180 match block {
2181 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2182 b.zcash_serialize_to_vec()?.len(),
2183 ))),
2184 None => Ok(ReadResponse::TipBlockSize(None)),
2185 }
2186 })
2187 })
2188 .wait_for_panics()
2189 }
2190
2191 ReadRequest::NonFinalizedBlocksListener => {
2192 // The non-finalized blocks listener is used to notify the state service
2193 // about new blocks that have been added to the non-finalized state.
2194 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2195 self.network.clone(),
2196 self.non_finalized_state_receiver.clone(),
2197 );
2198
2199 async move {
2200 timer.finish(
2201 module_path!(),
2202 line!(),
2203 "ReadRequest::NonFinalizedBlocksListener",
2204 );
2205
2206 Ok(ReadResponse::NonFinalizedBlocksListener(
2207 non_finalized_blocks_listener,
2208 ))
2209 }
2210 .boxed()
2211 }
2212 }
2213 }
2214}
2215
2216/// Initialize a state service from the provided [`Config`].
2217/// Returns a boxed state service, a read-only state service,
2218/// and receivers for state chain tip updates.
2219///
2220/// Each `network` has its own separate on-disk database.
2221///
2222/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2223/// to work out when it is near the final checkpoint.
2224///
2225/// To share access to the state, wrap the returned service in a `Buffer`,
2226/// or clone the returned [`ReadStateService`].
2227///
2228/// It's possible to construct multiple state services in the same application (as
2229/// long as they, e.g., use different storage locations), but doing so is
2230/// probably not what you want.
2231pub async fn init(
2232 config: Config,
2233 network: &Network,
2234 max_checkpoint_height: block::Height,
2235 checkpoint_verify_concurrency_limit: usize,
2236) -> (
2237 BoxService<Request, Response, BoxError>,
2238 ReadStateService,
2239 LatestChainTip,
2240 ChainTipChange,
2241) {
2242 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2243 StateService::new(
2244 config,
2245 network,
2246 max_checkpoint_height,
2247 checkpoint_verify_concurrency_limit,
2248 )
2249 .await;
2250
2251 (
2252 BoxService::new(state_service),
2253 read_only_state_service,
2254 latest_chain_tip,
2255 chain_tip_change,
2256 )
2257}
2258
2259/// Initialize a read state service from the provided [`Config`].
2260/// Returns a read-only state service,
2261///
2262/// Each `network` has its own separate on-disk database.
2263///
2264/// To share access to the state, clone the returned [`ReadStateService`].
2265pub fn init_read_only(
2266 config: Config,
2267 network: &Network,
2268) -> (
2269 ReadStateService,
2270 ZebraDb,
2271 tokio::sync::watch::Sender<NonFinalizedState>,
2272) {
2273 let finalized_state = FinalizedState::new_with_debug(
2274 &config,
2275 network,
2276 true,
2277 #[cfg(feature = "elasticsearch")]
2278 false,
2279 true,
2280 );
2281 let (non_finalized_state_sender, non_finalized_state_receiver) =
2282 tokio::sync::watch::channel(NonFinalizedState::new(network));
2283
2284 (
2285 ReadStateService::new(
2286 &finalized_state,
2287 None,
2288 WatchReceiver::new(non_finalized_state_receiver),
2289 ),
2290 finalized_state.db.clone(),
2291 non_finalized_state_sender,
2292 )
2293}
2294
2295/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2296/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2297pub fn spawn_init_read_only(
2298 config: Config,
2299 network: &Network,
2300) -> tokio::task::JoinHandle<(
2301 ReadStateService,
2302 ZebraDb,
2303 tokio::sync::watch::Sender<NonFinalizedState>,
2304)> {
2305 let network = network.clone();
2306 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2307}
2308
2309/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2310///
2311/// This can be used to create a state service for testing. See also [`init`].
2312#[cfg(any(test, feature = "proptest-impl"))]
2313pub async fn init_test(
2314 network: &Network,
2315) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2316 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2317 // if we ever need to test final checkpoint sent UTXO queries
2318 let (state_service, _, _, _) =
2319 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2320
2321 Buffer::new(BoxService::new(state_service), 1)
2322}
2323
2324/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2325/// then returns the read-write service, read-only service, and tip watch channels.
2326///
2327/// This can be used to create a state service for testing. See also [`init`].
2328#[cfg(any(test, feature = "proptest-impl"))]
2329pub async fn init_test_services(
2330 network: &Network,
2331) -> (
2332 Buffer<BoxService<Request, Response, BoxError>, Request>,
2333 ReadStateService,
2334 LatestChainTip,
2335 ChainTipChange,
2336) {
2337 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2338 // if we ever need to test final checkpoint sent UTXO queries
2339 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2340 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0).await;
2341
2342 let state_service = Buffer::new(BoxService::new(state_service), 1);
2343
2344 (
2345 state_service,
2346 read_state_service,
2347 latest_chain_tip,
2348 chain_tip_change,
2349 )
2350}