zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 convert,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24 time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::{oneshot, watch};
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36 block::{self, CountedHeader, HeightDiff},
37 diagnostic::{task::WaitForPanics, CodeTimer},
38 parameters::{Network, NetworkUpgrade},
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45 constants::{
46 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47 },
48 response::NonFinalizedBlocksListener,
49 service::{
50 block_iter::any_ancestor_blocks,
51 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
52 finalized_state::{FinalizedState, ZebraDb},
53 non_finalized_state::{Chain, NonFinalizedState},
54 pending_utxos::PendingUtxos,
55 queued_blocks::QueuedBlocks,
56 watch_receiver::WatchReceiver,
57 },
58 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, Config, ReadRequest,
59 ReadResponse, Request, Response, SemanticallyVerifiedBlock, ValidateContextError,
60};
61
62pub mod block_iter;
63pub mod chain_tip;
64pub mod watch_receiver;
65
66pub mod check;
67
68pub(crate) mod finalized_state;
69pub(crate) mod non_finalized_state;
70mod pending_utxos;
71mod queued_blocks;
72pub(crate) mod read;
73mod write;
74
75#[cfg(any(test, feature = "proptest-impl"))]
76pub mod arbitrary;
77
78#[cfg(test)]
79mod tests;
80
81pub use finalized_state::{OutputLocation, TransactionIndex, TransactionLocation};
82use write::NonFinalizedWriteMessage;
83
84use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
85
86/// A read-write service for Zebra's cached blockchain state.
87///
88/// This service modifies and provides access to:
89/// - the non-finalized state: the ~100 most recent blocks.
90/// Zebra allows chain forks in the non-finalized state,
91/// stores it in memory, and re-downloads it when restarted.
92/// - the finalized state: older blocks that have many confirmations.
93/// Zebra stores the single best chain in the finalized state,
94/// and re-loads it from disk when restarted.
95///
96/// Read requests to this service are buffered, then processed concurrently.
97/// Block write requests are buffered, then queued, then processed in order by a separate task.
98///
99/// Most state users can get faster read responses using the [`ReadStateService`],
100/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
101///
102/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
103/// They can read the latest block directly, without queueing any requests.
104#[derive(Debug)]
105pub(crate) struct StateService {
106 // Configuration
107 //
108 /// The configured Zcash network.
109 network: Network,
110
111 /// The height that we start storing UTXOs from finalized blocks.
112 ///
113 /// This height should be lower than the last few checkpoints,
114 /// so the full verifier can verify UTXO spends from those blocks,
115 /// even if they haven't been committed to the finalized state yet.
116 full_verifier_utxo_lookahead: block::Height,
117
118 // Queued Blocks
119 //
120 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
121 /// These blocks are awaiting their parent blocks before they can do contextual verification.
122 non_finalized_state_queued_blocks: QueuedBlocks,
123
124 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
125 /// These blocks are awaiting their parent blocks before they can do contextual verification.
126 ///
127 /// Indexed by their parent block hash.
128 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
129
130 /// Channels to send blocks to the block write task.
131 block_write_sender: write::BlockWriteSender,
132
133 /// The [`block::Hash`] of the most recent block sent on
134 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
135 ///
136 /// On startup, this is:
137 /// - the finalized tip, if there are stored blocks, or
138 /// - the genesis block's parent hash, if the database is empty.
139 ///
140 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
141 /// - the hash of the last valid committed block (the parent of the invalid block).
142 finalized_block_write_last_sent_hash: block::Hash,
143
144 /// A set of block hashes that have been sent to the block write task.
145 /// Hashes of blocks below the finalized tip height are periodically pruned.
146 non_finalized_block_write_sent_hashes: SentHashes,
147
148 /// If an invalid block is sent on `finalized_block_write_sender`
149 /// or `non_finalized_block_write_sender`,
150 /// this channel gets the [`block::Hash`] of the valid tip.
151 //
152 // TODO: add tests for finalized and non-finalized resets (#2654)
153 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
154
155 // Pending UTXO Request Tracking
156 //
157 /// The set of outpoints with pending requests for their associated transparent::Output.
158 pending_utxos: PendingUtxos,
159
160 /// Instant tracking the last time `pending_utxos` was pruned.
161 last_prune: Instant,
162
163 // Updating Concurrently Readable State
164 //
165 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
166 ///
167 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
168 read_service: ReadStateService,
169
170 // Metrics
171 //
172 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
173 ///
174 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
175 /// as a break in the graph.
176 max_finalized_queue_height: f64,
177}
178
179/// A read-only service for accessing Zebra's cached blockchain state.
180///
181/// This service provides read-only access to:
182/// - the non-finalized state: the ~100 most recent blocks.
183/// - the finalized state: older blocks that have many confirmations.
184///
185/// Requests to this service are processed in parallel,
186/// ignoring any blocks queued by the read-write [`StateService`].
187///
188/// This quick response behavior is better for most state users.
189/// It allows other async tasks to make progress while concurrently reading data from disk.
190#[derive(Clone, Debug)]
191pub struct ReadStateService {
192 // Configuration
193 //
194 /// The configured Zcash network.
195 network: Network,
196
197 // Shared Concurrently Readable State
198 //
199 /// A watch channel with a cached copy of the [`NonFinalizedState`].
200 ///
201 /// This state is only updated between requests,
202 /// so it might include some block data that is also on `disk`.
203 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
204
205 /// The shared inner on-disk database for the finalized state.
206 ///
207 /// RocksDB allows reads and writes via a shared reference,
208 /// but [`ZebraDb`] doesn't expose any write methods or types.
209 ///
210 /// This chain is updated concurrently with requests,
211 /// so it might include some block data that is also in `best_mem`.
212 db: ZebraDb,
213
214 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
215 /// once the queues have received all their parent blocks.
216 ///
217 /// Used to check for panics when writing blocks.
218 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
219}
220
221impl Drop for StateService {
222 fn drop(&mut self) {
223 // The state service owns the state, tasks, and channels,
224 // so dropping it should shut down everything.
225
226 // Close the channels (non-blocking)
227 // This makes the block write thread exit the next time it checks the channels.
228 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
229 self.invalid_block_write_reset_receiver.close();
230
231 std::mem::drop(self.block_write_sender.finalized.take());
232 std::mem::drop(self.block_write_sender.non_finalized.take());
233
234 self.clear_finalized_block_queue(
235 "dropping the state: dropped unused finalized state queue block",
236 );
237 self.clear_non_finalized_block_queue(CommitSemanticallyVerifiedError::from(
238 ValidateContextError::DroppedUnusedBlock,
239 ));
240
241 // Log database metrics before shutting down
242 info!("dropping the state: logging database metrics");
243 self.log_db_metrics();
244
245 // Then drop self.read_service, which checks the block write task for panics,
246 // and tries to shut down the database.
247 }
248}
249
250impl Drop for ReadStateService {
251 fn drop(&mut self) {
252 // The read state service shares the state,
253 // so dropping it should check if we can shut down.
254
255 // TODO: move this into a try_shutdown() method
256 if let Some(block_write_task) = self.block_write_task.take() {
257 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
258 // We're the last database user, so we can tell it to shut down (blocking):
259 // - flushes the database to disk, and
260 // - drops the database, which cleans up any database tasks correctly.
261 self.db.shutdown(true);
262
263 // We are the last state with a reference to this thread, so we can
264 // wait until the block write task finishes, then check for panics (blocking).
265 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
266
267 // This log is verbose during tests.
268 #[cfg(not(test))]
269 info!("waiting for the block write task to finish");
270 #[cfg(test)]
271 debug!("waiting for the block write task to finish");
272
273 // TODO: move this into a check_for_panics() method
274 if let Err(thread_panic) = block_write_task_handle.join() {
275 std::panic::resume_unwind(thread_panic);
276 } else {
277 debug!("shutting down the state because the block write task has finished");
278 }
279 }
280 } else {
281 // Even if we're not the last database user, try shutting it down.
282 //
283 // TODO: rename this to try_shutdown()?
284 self.db.shutdown(false);
285 }
286 }
287}
288
289impl StateService {
290 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
291
292 /// Creates a new state service for the state `config` and `network`.
293 ///
294 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
295 /// to work out when it is near the final checkpoint.
296 ///
297 /// Returns the read-write and read-only state services,
298 /// and read-only watch channels for its best chain tip.
299 pub fn new(
300 config: Config,
301 network: &Network,
302 max_checkpoint_height: block::Height,
303 checkpoint_verify_concurrency_limit: usize,
304 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
305 let timer = CodeTimer::start();
306 let finalized_state = FinalizedState::new(
307 &config,
308 network,
309 #[cfg(feature = "elasticsearch")]
310 true,
311 );
312 timer.finish(module_path!(), line!(), "opening finalized state database");
313
314 let timer = CodeTimer::start();
315 let initial_tip = finalized_state
316 .db
317 .tip_block()
318 .map(CheckpointVerifiedBlock::from)
319 .map(ChainTipBlock::from);
320
321 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
322 ChainTipSender::new(initial_tip, network);
323
324 let non_finalized_state = NonFinalizedState::new(network);
325
326 let (non_finalized_state_sender, non_finalized_state_receiver) =
327 watch::channel(NonFinalizedState::new(&finalized_state.network()));
328
329 let finalized_state_for_writing = finalized_state.clone();
330 let (block_write_sender, invalid_block_write_reset_receiver, block_write_task) =
331 write::BlockWriteSender::spawn(
332 finalized_state_for_writing,
333 non_finalized_state,
334 chain_tip_sender,
335 non_finalized_state_sender,
336 );
337
338 let read_service = ReadStateService::new(
339 &finalized_state,
340 block_write_task,
341 non_finalized_state_receiver,
342 );
343
344 let full_verifier_utxo_lookahead = max_checkpoint_height
345 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
346 .expect("fits in HeightDiff");
347 let full_verifier_utxo_lookahead =
348 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
349 let non_finalized_state_queued_blocks = QueuedBlocks::default();
350 let pending_utxos = PendingUtxos::default();
351
352 let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash();
353
354 let state = Self {
355 network: network.clone(),
356 full_verifier_utxo_lookahead,
357 non_finalized_state_queued_blocks,
358 finalized_state_queued_blocks: HashMap::new(),
359 block_write_sender,
360 finalized_block_write_last_sent_hash,
361 non_finalized_block_write_sent_hashes: SentHashes::default(),
362 invalid_block_write_reset_receiver,
363 pending_utxos,
364 last_prune: Instant::now(),
365 read_service: read_service.clone(),
366 max_finalized_queue_height: f64::NAN,
367 };
368 timer.finish(module_path!(), line!(), "initializing state service");
369
370 tracing::info!("starting legacy chain check");
371 let timer = CodeTimer::start();
372
373 if let (Some(tip), Some(nu5_activation_height)) = (
374 state.best_tip(),
375 NetworkUpgrade::Nu5.activation_height(network),
376 ) {
377 if let Err(error) = check::legacy_chain(
378 nu5_activation_height,
379 any_ancestor_blocks(
380 &state.read_service.latest_non_finalized_state(),
381 &state.read_service.db,
382 tip.1,
383 ),
384 &state.network,
385 MAX_LEGACY_CHAIN_BLOCKS,
386 ) {
387 let legacy_db_path = state.read_service.db.path().to_path_buf();
388 panic!(
389 "Cached state contains a legacy chain.\n\
390 An outdated Zebra version did not know about a recent network upgrade,\n\
391 so it followed a legacy chain using outdated consensus branch rules.\n\
392 Hint: Delete your database, and restart Zebra to do a full sync.\n\
393 Database path: {legacy_db_path:?}\n\
394 Error: {error:?}",
395 );
396 }
397 }
398
399 tracing::info!("cached state consensus branch is valid: no legacy chain found");
400 timer.finish(module_path!(), line!(), "legacy chain check");
401
402 (state, read_service, latest_chain_tip, chain_tip_change)
403 }
404
405 /// Call read only state service to log rocksdb database metrics.
406 pub fn log_db_metrics(&self) {
407 self.read_service.db.print_db_metrics();
408 }
409
410 /// Queue a checkpoint verified block for verification and storage in the finalized state.
411 ///
412 /// Returns a channel receiver that provides the result of the block commit.
413 fn queue_and_commit_to_finalized_state(
414 &mut self,
415 checkpoint_verified: CheckpointVerifiedBlock,
416 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
417 // # Correctness & Performance
418 //
419 // This method must not block, access the database, or perform CPU-intensive tasks,
420 // because it is called directly from the tokio executor's Future threads.
421
422 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
423 let queued_height = checkpoint_verified.height;
424
425 // If we're close to the final checkpoint, make the block's UTXOs available for
426 // semantic block verification, even when it is in the channel.
427 if self.is_close_to_final_checkpoint(queued_height) {
428 self.non_finalized_block_write_sent_hashes
429 .add_finalized(&checkpoint_verified)
430 }
431
432 let (rsp_tx, rsp_rx) = oneshot::channel();
433 let queued = (checkpoint_verified, rsp_tx);
434
435 if self.block_write_sender.finalized.is_some() {
436 // We're still committing checkpoint verified blocks
437 if let Some(duplicate_queued) = self
438 .finalized_state_queued_blocks
439 .insert(queued_prev_hash, queued)
440 {
441 Self::send_checkpoint_verified_block_error(
442 duplicate_queued,
443 "dropping older checkpoint verified block: got newer duplicate block",
444 );
445 }
446
447 self.drain_finalized_queue_and_commit();
448 } else {
449 // We've finished committing checkpoint verified blocks to the finalized state,
450 // so drop any repeated queued blocks, and return an error.
451 //
452 // TODO: track the latest sent height, and drop any blocks under that height
453 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
454 Self::send_checkpoint_verified_block_error(
455 queued,
456 "already finished committing checkpoint verified blocks: dropped duplicate block, \
457 block is already committed to the state",
458 );
459
460 self.clear_finalized_block_queue(
461 "already finished committing checkpoint verified blocks: dropped duplicate block, \
462 block is already committed to the state",
463 );
464 }
465
466 if self.finalized_state_queued_blocks.is_empty() {
467 self.max_finalized_queue_height = f64::NAN;
468 } else if self.max_finalized_queue_height.is_nan()
469 || self.max_finalized_queue_height < queued_height.0 as f64
470 {
471 // if there are still blocks in the queue, then either:
472 // - the new block was lower than the old maximum, and there was a gap before it,
473 // so the maximum is still the same (and we skip this code), or
474 // - the new block is higher than the old maximum, and there is at least one gap
475 // between the finalized tip and the new maximum
476 self.max_finalized_queue_height = queued_height.0 as f64;
477 }
478
479 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
480 metrics::gauge!("state.checkpoint.queued.block.count")
481 .set(self.finalized_state_queued_blocks.len() as f64);
482
483 rsp_rx
484 }
485
486 /// Finds finalized state queue blocks to be committed to the state in order,
487 /// removes them from the queue, and sends them to the block commit task.
488 ///
489 /// After queueing a finalized block, this method checks whether the newly
490 /// queued block (and any of its descendants) can be committed to the state.
491 ///
492 /// Returns an error if the block commit channel has been closed.
493 pub fn drain_finalized_queue_and_commit(&mut self) {
494 use tokio::sync::mpsc::error::{SendError, TryRecvError};
495
496 // # Correctness & Performance
497 //
498 // This method must not block, access the database, or perform CPU-intensive tasks,
499 // because it is called directly from the tokio executor's Future threads.
500
501 // If a block failed, we need to start again from a valid tip.
502 match self.invalid_block_write_reset_receiver.try_recv() {
503 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
504 Err(TryRecvError::Disconnected) => {
505 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
506 return;
507 }
508 // There are no errors, so we can just use the last block hash we sent
509 Err(TryRecvError::Empty) => {}
510 }
511
512 while let Some(queued_block) = self
513 .finalized_state_queued_blocks
514 .remove(&self.finalized_block_write_last_sent_hash)
515 {
516 let last_sent_finalized_block_height = queued_block.0.height;
517
518 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
519
520 // If we've finished sending finalized blocks, ignore any repeated blocks.
521 // (Blocks can be repeated after a syncer reset.)
522 if let Some(finalized_block_write_sender) = &self.block_write_sender.finalized {
523 let send_result = finalized_block_write_sender.send(queued_block);
524
525 // If the receiver is closed, we can't send any more blocks.
526 if let Err(SendError(queued)) = send_result {
527 // If Zebra is shutting down, drop blocks and return an error.
528 Self::send_checkpoint_verified_block_error(
529 queued,
530 "block commit task exited. Is Zebra shutting down?",
531 );
532
533 self.clear_finalized_block_queue(
534 "block commit task exited. Is Zebra shutting down?",
535 );
536 } else {
537 metrics::gauge!("state.checkpoint.sent.block.height")
538 .set(last_sent_finalized_block_height.0 as f64);
539 };
540 }
541 }
542 }
543
544 /// Drops all finalized state queue blocks, and sends an error on their result channels.
545 fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
546 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
547 Self::send_checkpoint_verified_block_error(queued, error.clone());
548 }
549 }
550
551 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
552 fn send_checkpoint_verified_block_error(
553 queued: QueuedCheckpointVerified,
554 error: impl Into<BoxError>,
555 ) {
556 let (finalized, rsp_tx) = queued;
557
558 // The block sender might have already given up on this block,
559 // so ignore any channel send errors.
560 let _ = rsp_tx.send(Err(error.into()));
561 std::mem::drop(finalized);
562 }
563
564 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
565 fn clear_non_finalized_block_queue(&mut self, error: CommitSemanticallyVerifiedError) {
566 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
567 Self::send_semantically_verified_block_error(queued, error.clone());
568 }
569 }
570
571 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
572 fn send_semantically_verified_block_error(
573 queued: QueuedSemanticallyVerified,
574 error: CommitSemanticallyVerifiedError,
575 ) {
576 let (finalized, rsp_tx) = queued;
577
578 // The block sender might have already given up on this block,
579 // so ignore any channel send errors.
580 let _ = rsp_tx.send(Err(error));
581 std::mem::drop(finalized);
582 }
583
584 /// Queue a semantically verified block for contextual verification and check if any queued
585 /// blocks are ready to be verified and committed to the state.
586 ///
587 /// This function encodes the logic for [committing non-finalized blocks][1]
588 /// in RFC0005.
589 ///
590 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
591 #[instrument(level = "debug", skip(self, semantically_verrified))]
592 fn queue_and_commit_to_non_finalized_state(
593 &mut self,
594 semantically_verrified: SemanticallyVerifiedBlock,
595 ) -> oneshot::Receiver<Result<block::Hash, CommitSemanticallyVerifiedError>> {
596 tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
597 let parent_hash = semantically_verrified.block.header.previous_block_hash;
598
599 if self
600 .non_finalized_block_write_sent_hashes
601 .contains(&semantically_verrified.hash)
602 {
603 let (rsp_tx, rsp_rx) = oneshot::channel();
604 let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
605 ValidateContextError::DuplicateCommitRequest {
606 block_hash: semantically_verrified.hash,
607 },
608 )));
609 return rsp_rx;
610 }
611
612 if self
613 .read_service
614 .db
615 .contains_height(semantically_verrified.height)
616 {
617 let (rsp_tx, rsp_rx) = oneshot::channel();
618 let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
619 ValidateContextError::AlreadyFinalized {
620 block_height: semantically_verrified.height,
621 },
622 )));
623 return rsp_rx;
624 }
625
626 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
627 // has been queued but not yet committed to the state fails the older request and replaces
628 // it with the newer request.
629 let rsp_rx = if let Some((_, old_rsp_tx)) = self
630 .non_finalized_state_queued_blocks
631 .get_mut(&semantically_verrified.hash)
632 {
633 tracing::debug!("replacing older queued request with new request");
634 let (mut rsp_tx, rsp_rx) = oneshot::channel();
635 std::mem::swap(old_rsp_tx, &mut rsp_tx);
636 let _ = rsp_tx.send(Err(CommitSemanticallyVerifiedError::from(
637 ValidateContextError::ReplacedByNewerRequest {
638 block_hash: semantically_verrified.hash,
639 },
640 )));
641 rsp_rx
642 } else {
643 let (rsp_tx, rsp_rx) = oneshot::channel();
644 self.non_finalized_state_queued_blocks
645 .queue((semantically_verrified, rsp_tx));
646 rsp_rx
647 };
648
649 // We've finished sending checkpoint verified blocks when:
650 // - we've sent the verified block for the last checkpoint, and
651 // - it has been successfully written to disk.
652 //
653 // We detect the last checkpoint by looking for non-finalized blocks
654 // that are a child of the last block we sent.
655 //
656 // TODO: configure the state with the last checkpoint hash instead?
657 if self.block_write_sender.finalized.is_some()
658 && self
659 .non_finalized_state_queued_blocks
660 .has_queued_children(self.finalized_block_write_last_sent_hash)
661 && self.read_service.db.finalized_tip_hash()
662 == self.finalized_block_write_last_sent_hash
663 {
664 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
665 // and move on to committing semantically verified blocks to the non-finalized state.
666 std::mem::drop(self.block_write_sender.finalized.take());
667 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
668 self.non_finalized_block_write_sent_hashes = SentHashes::default();
669 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
670 self.non_finalized_block_write_sent_hashes
671 .can_fork_chain_at_hashes = true;
672 // Send blocks from non-finalized queue
673 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
674 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
675 self.clear_finalized_block_queue(
676 "already finished committing checkpoint verified blocks: dropped duplicate block, \
677 block is already committed to the state",
678 );
679 } else if !self.can_fork_chain_at(&parent_hash) {
680 tracing::trace!("unready to verify, returning early");
681 } else if self.block_write_sender.finalized.is_none() {
682 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
683 self.send_ready_non_finalized_queued(parent_hash);
684
685 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
686 "Finalized state must have at least one block before committing non-finalized state",
687 );
688
689 self.non_finalized_state_queued_blocks
690 .prune_by_height(finalized_tip_height);
691
692 self.non_finalized_block_write_sent_hashes
693 .prune_by_height(finalized_tip_height);
694 }
695
696 rsp_rx
697 }
698
699 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
700 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
701 self.non_finalized_block_write_sent_hashes
702 .can_fork_chain_at(hash)
703 || &self.read_service.db.finalized_tip_hash() == hash
704 }
705
706 /// Returns `true` if `queued_height` is near the final checkpoint.
707 ///
708 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
709 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
710 ///
711 /// If it doesn't have the required UTXOs, some blocks will time out,
712 /// but succeed after a syncer restart.
713 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
714 queued_height >= self.full_verifier_utxo_lookahead
715 }
716
717 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
718 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
719 #[tracing::instrument(level = "debug", skip(self, new_parent))]
720 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
721 use tokio::sync::mpsc::error::SendError;
722 if let Some(non_finalized_block_write_sender) = &self.block_write_sender.non_finalized {
723 let mut new_parents: Vec<block::Hash> = vec![new_parent];
724
725 while let Some(parent_hash) = new_parents.pop() {
726 let queued_children = self
727 .non_finalized_state_queued_blocks
728 .dequeue_children(parent_hash);
729
730 for queued_child in queued_children {
731 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
732
733 self.non_finalized_block_write_sent_hashes
734 .add(&queued_child.0);
735 let send_result = non_finalized_block_write_sender.send(queued_child.into());
736
737 if let Err(SendError(NonFinalizedWriteMessage::Commit(queued))) = send_result {
738 // If Zebra is shutting down, drop blocks and return an error.
739 Self::send_semantically_verified_block_error(
740 queued,
741 CommitSemanticallyVerifiedError::from(
742 ValidateContextError::CommitTaskExited,
743 ),
744 );
745
746 self.clear_non_finalized_block_queue(
747 CommitSemanticallyVerifiedError::from(
748 ValidateContextError::CommitTaskExited,
749 ),
750 );
751
752 return;
753 };
754
755 new_parents.push(hash);
756 }
757 }
758
759 self.non_finalized_block_write_sent_hashes.finish_batch();
760 };
761 }
762
763 /// Return the tip of the current best chain.
764 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
765 self.read_service.best_tip()
766 }
767
768 fn send_invalidate_block(
769 &self,
770 hash: block::Hash,
771 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
772 let (rsp_tx, rsp_rx) = oneshot::channel();
773
774 let Some(sender) = &self.block_write_sender.non_finalized else {
775 let _ = rsp_tx.send(Err(
776 "cannot invalidate blocks while still committing checkpointed blocks".into(),
777 ));
778 return rsp_rx;
779 };
780
781 if let Err(tokio::sync::mpsc::error::SendError(error)) =
782 sender.send(NonFinalizedWriteMessage::Invalidate { hash, rsp_tx })
783 {
784 let NonFinalizedWriteMessage::Invalidate { rsp_tx, .. } = error else {
785 unreachable!("should return the same Invalidate message could not be sent");
786 };
787
788 let _ = rsp_tx.send(Err(
789 "failed to send invalidate block request to block write task".into(),
790 ));
791 }
792
793 rsp_rx
794 }
795
796 fn send_reconsider_block(
797 &self,
798 hash: block::Hash,
799 ) -> oneshot::Receiver<Result<Vec<block::Hash>, BoxError>> {
800 let (rsp_tx, rsp_rx) = oneshot::channel();
801
802 let Some(sender) = &self.block_write_sender.non_finalized else {
803 let _ = rsp_tx.send(Err(
804 "cannot reconsider blocks while still committing checkpointed blocks".into(),
805 ));
806 return rsp_rx;
807 };
808
809 if let Err(tokio::sync::mpsc::error::SendError(error)) =
810 sender.send(NonFinalizedWriteMessage::Reconsider { hash, rsp_tx })
811 {
812 let NonFinalizedWriteMessage::Reconsider { rsp_tx, .. } = error else {
813 unreachable!("should return the same Reconsider message could not be sent");
814 };
815
816 let _ = rsp_tx.send(Err(
817 "failed to send reconsider block request to block write task".into(),
818 ));
819 }
820
821 rsp_rx
822 }
823
824 /// Assert some assumptions about the semantically verified `block` before it is queued.
825 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
826 // required by `Request::CommitSemanticallyVerifiedBlock` call
827 assert!(
828 block.height > self.network.mandatory_checkpoint_height(),
829 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
830 blocks, and the canopy activation block, must be committed to the state as finalized \
831 blocks"
832 );
833 }
834}
835
836impl ReadStateService {
837 /// Creates a new read-only state service, using the provided finalized state and
838 /// block write task handle.
839 ///
840 /// Returns the newly created service,
841 /// and a watch channel for updating the shared recent non-finalized chain.
842 pub(crate) fn new(
843 finalized_state: &FinalizedState,
844 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
845 non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
846 ) -> Self {
847 let read_service = Self {
848 network: finalized_state.network(),
849 db: finalized_state.db.clone(),
850 non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
851 block_write_task,
852 };
853
854 tracing::debug!("created new read-only state service");
855
856 read_service
857 }
858
859 /// Return the tip of the current best chain.
860 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
861 read::best_tip(&self.latest_non_finalized_state(), &self.db)
862 }
863
864 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
865 fn latest_non_finalized_state(&self) -> NonFinalizedState {
866 self.non_finalized_state_receiver.cloned_watch_data()
867 }
868
869 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
870 #[allow(dead_code)]
871 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
872 self.latest_non_finalized_state().best_chain().cloned()
873 }
874
875 /// Test-only access to the inner database.
876 /// Can be used to modify the database without doing any consensus checks.
877 #[cfg(any(test, feature = "proptest-impl"))]
878 pub fn db(&self) -> &ZebraDb {
879 &self.db
880 }
881
882 /// Logs rocksdb metrics using the read only state service.
883 pub fn log_db_metrics(&self) {
884 self.db.print_db_metrics();
885 }
886}
887
888impl Service<Request> for StateService {
889 type Response = Response;
890 type Error = BoxError;
891 type Future =
892 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
893
894 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
895 // Check for panics in the block write task
896 let poll = self.read_service.poll_ready(cx);
897
898 // Prune outdated UTXO requests
899 let now = Instant::now();
900
901 if self.last_prune + Self::PRUNE_INTERVAL < now {
902 let tip = self.best_tip();
903 let old_len = self.pending_utxos.len();
904
905 self.pending_utxos.prune();
906 self.last_prune = now;
907
908 let new_len = self.pending_utxos.len();
909 let prune_count = old_len
910 .checked_sub(new_len)
911 .expect("prune does not add any utxo requests");
912 if prune_count > 0 {
913 tracing::debug!(
914 ?old_len,
915 ?new_len,
916 ?prune_count,
917 ?tip,
918 "pruned utxo requests"
919 );
920 } else {
921 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
922 }
923 }
924
925 poll
926 }
927
928 #[instrument(name = "state", skip(self, req))]
929 fn call(&mut self, req: Request) -> Self::Future {
930 req.count_metric();
931 let timer = CodeTimer::start();
932 let span = Span::current();
933
934 match req {
935 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
936 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
937 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
938 self.assert_block_can_be_validated(&semantically_verified);
939
940 self.pending_utxos
941 .check_against_ordered(&semantically_verified.new_outputs);
942
943 // # Performance
944 //
945 // Allow other async tasks to make progress while blocks are being verified
946 // and written to disk. But wait for the blocks to finish committing,
947 // so that `StateService` multi-block queries always observe a consistent state.
948 //
949 // Since each block is spawned into its own task,
950 // there shouldn't be any other code running in the same task,
951 // so we don't need to worry about blocking it:
952 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
953
954 let rsp_rx = tokio::task::block_in_place(move || {
955 span.in_scope(|| {
956 self.queue_and_commit_to_non_finalized_state(semantically_verified)
957 })
958 });
959
960 // TODO:
961 // - check for panics in the block write task here,
962 // as well as in poll_ready()
963
964 // The work is all done, the future just waits on a channel for the result
965 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
966
967 // Await the channel response, mapping any receive error into a BoxError.
968 // Then flatten the nested Result by converting the inner CommitSemanticallyVerifiedError into a BoxError.
969 let span = Span::current();
970 async move {
971 rsp_rx
972 .await
973 .map_err(|_recv_error| {
974 BoxError::from(CommitSemanticallyVerifiedError::from(
975 ValidateContextError::NotReadyToBeCommitted,
976 ))
977 })
978 // TODO: replace with Result::flatten once it stabilises
979 // https://github.com/rust-lang/rust/issues/70142
980 .and_then(|res| res.map_err(BoxError::from))
981 .map(Response::Committed)
982 }
983 .instrument(span)
984 .boxed()
985 }
986
987 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
988 // Accesses shared writeable state in the StateService.
989 Request::CommitCheckpointVerifiedBlock(finalized) => {
990 // # Consensus
991 //
992 // A semantic block verification could have called AwaitUtxo
993 // before this checkpoint verified block arrived in the state.
994 // So we need to check for pending UTXO requests sent by running
995 // semantic block verifications.
996 //
997 // This check is redundant for most checkpoint verified blocks,
998 // because semantic verification can only succeed near the final
999 // checkpoint, when all the UTXOs are available for the verifying block.
1000 //
1001 // (Checkpoint block UTXOs are verified using block hash checkpoints
1002 // and transaction merkle tree block header commitments.)
1003 self.pending_utxos
1004 .check_against_ordered(&finalized.new_outputs);
1005
1006 // # Performance
1007 //
1008 // This method doesn't block, access the database, or perform CPU-intensive tasks,
1009 // so we can run it directly in the tokio executor's Future threads.
1010 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
1011
1012 // TODO:
1013 // - check for panics in the block write task here,
1014 // as well as in poll_ready()
1015
1016 // The work is all done, the future just waits on a channel for the result
1017 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
1018
1019 async move {
1020 rsp_rx
1021 .await
1022 .map_err(|_recv_error| {
1023 BoxError::from("block was dropped from the queue of finalized blocks")
1024 })
1025 // TODO: replace with Result::flatten once it stabilises
1026 // https://github.com/rust-lang/rust/issues/70142
1027 .and_then(convert::identity)
1028 .map(Response::Committed)
1029 }
1030 .instrument(span)
1031 .boxed()
1032 }
1033
1034 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
1035 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
1036 Request::AwaitUtxo(outpoint) => {
1037 // Prepare the AwaitUtxo future from PendingUxtos.
1038 let response_fut = self.pending_utxos.queue(outpoint);
1039 // Only instrument `response_fut`, the ReadStateService already
1040 // instruments its requests with the same span.
1041
1042 let response_fut = response_fut.instrument(span).boxed();
1043
1044 // Check the non-finalized block queue outside the returned future,
1045 // so we can access mutable state fields.
1046 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1047 self.pending_utxos.respond(&outpoint, utxo);
1048
1049 // We're finished, the returned future gets the UTXO from the respond() channel.
1050 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1051
1052 return response_fut;
1053 }
1054
1055 // Check the sent non-finalized blocks
1056 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1057 self.pending_utxos.respond(&outpoint, utxo);
1058
1059 // We're finished, the returned future gets the UTXO from the respond() channel.
1060 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1061
1062 return response_fut;
1063 }
1064
1065 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1066 // because it is only used during checkpoint verification.
1067 //
1068 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1069 // See #5126 for details.
1070
1071 // Manually send a request to the ReadStateService,
1072 // to get UTXOs from any non-finalized chain or the finalized chain.
1073 let read_service = self.read_service.clone();
1074
1075 // Run the request in an async block, so we can await the response.
1076 async move {
1077 let req = ReadRequest::AnyChainUtxo(outpoint);
1078
1079 let rsp = read_service.oneshot(req).await?;
1080
1081 // Optional TODO:
1082 // - make pending_utxos.respond() async using a channel,
1083 // so we can respond to all waiting requests here
1084 //
1085 // This change is not required for correctness, because:
1086 // - any waiting requests should have returned when the block was sent to the state
1087 // - otherwise, the request returns immediately if:
1088 // - the block is in the non-finalized queue, or
1089 // - the block is in any non-finalized chain or the finalized state
1090 //
1091 // And if the block is in the finalized queue,
1092 // that's rare enough that a retry is ok.
1093 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1094 // We got a UTXO, so we replace the response future with the result own.
1095 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1096
1097 return Ok(Response::Utxo(utxo));
1098 }
1099
1100 // We're finished, but the returned future is waiting on the respond() channel.
1101 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1102
1103 response_fut.await
1104 }
1105 .boxed()
1106 }
1107
1108 // Used by sync, inbound, and block verifier to check if a block is already in the state
1109 // before downloading or validating it.
1110 Request::KnownBlock(hash) => {
1111 let timer = CodeTimer::start();
1112
1113 let read_service = self.read_service.clone();
1114
1115 async move {
1116 let response = read::non_finalized_state_contains_block_hash(
1117 &read_service.latest_non_finalized_state(),
1118 hash,
1119 )
1120 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1121
1122 // The work is done in the future.
1123 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1124
1125 Ok(Response::KnownBlock(response))
1126 }
1127 .boxed()
1128 }
1129
1130 Request::InvalidateBlock(block_hash) => {
1131 let rsp_rx = tokio::task::block_in_place(move || {
1132 span.in_scope(|| self.send_invalidate_block(block_hash))
1133 });
1134
1135 let span = Span::current();
1136 async move {
1137 rsp_rx
1138 .await
1139 .map_err(|_recv_error| {
1140 BoxError::from("invalidate block request was unexpectedly dropped")
1141 })
1142 // TODO: replace with Result::flatten once it stabilises
1143 // https://github.com/rust-lang/rust/issues/70142
1144 .and_then(convert::identity)
1145 .map(Response::Invalidated)
1146 }
1147 .instrument(span)
1148 .boxed()
1149 }
1150
1151 Request::ReconsiderBlock(block_hash) => {
1152 let rsp_rx = tokio::task::block_in_place(move || {
1153 span.in_scope(|| self.send_reconsider_block(block_hash))
1154 });
1155
1156 let span = Span::current();
1157 async move {
1158 rsp_rx
1159 .await
1160 .map_err(|_recv_error| {
1161 BoxError::from("reconsider block request was unexpectedly dropped")
1162 })
1163 // TODO: replace with Result::flatten once it stabilises
1164 // https://github.com/rust-lang/rust/issues/70142
1165 .and_then(convert::identity)
1166 .map(Response::Reconsidered)
1167 }
1168 .instrument(span)
1169 .boxed()
1170 }
1171
1172 // Runs concurrently using the ReadStateService
1173 Request::Tip
1174 | Request::Depth(_)
1175 | Request::BestChainNextMedianTimePast
1176 | Request::BestChainBlockHash(_)
1177 | Request::BlockLocator
1178 | Request::Transaction(_)
1179 | Request::UnspentBestChainUtxo(_)
1180 | Request::Block(_)
1181 | Request::BlockAndSize(_)
1182 | Request::BlockHeader(_)
1183 | Request::FindBlockHashes { .. }
1184 | Request::FindBlockHeaders { .. }
1185 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1186 // Redirect the request to the concurrent ReadStateService
1187 let read_service = self.read_service.clone();
1188
1189 async move {
1190 let req = req
1191 .try_into()
1192 .expect("ReadRequest conversion should not fail");
1193
1194 let rsp = read_service.oneshot(req).await?;
1195 let rsp = rsp.try_into().expect("Response conversion should not fail");
1196
1197 Ok(rsp)
1198 }
1199 .boxed()
1200 }
1201
1202 Request::CheckBlockProposalValidity(_) => {
1203 // Redirect the request to the concurrent ReadStateService
1204 let read_service = self.read_service.clone();
1205
1206 async move {
1207 let req = req
1208 .try_into()
1209 .expect("ReadRequest conversion should not fail");
1210
1211 let rsp = read_service.oneshot(req).await?;
1212 let rsp = rsp.try_into().expect("Response conversion should not fail");
1213
1214 Ok(rsp)
1215 }
1216 .boxed()
1217 }
1218 }
1219 }
1220}
1221
1222impl Service<ReadRequest> for ReadStateService {
1223 type Response = ReadResponse;
1224 type Error = BoxError;
1225 type Future =
1226 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1227
1228 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1229 // Check for panics in the block write task
1230 //
1231 // TODO: move into a check_for_panics() method
1232 let block_write_task = self.block_write_task.take();
1233
1234 if let Some(block_write_task) = block_write_task {
1235 if block_write_task.is_finished() {
1236 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1237 // We are the last state with a reference to this task, so we can propagate any panics
1238 if let Err(thread_panic) = block_write_task.join() {
1239 std::panic::resume_unwind(thread_panic);
1240 }
1241 }
1242 } else {
1243 // It hasn't finished, so we need to put it back
1244 self.block_write_task = Some(block_write_task);
1245 }
1246 }
1247
1248 self.db.check_for_panics();
1249
1250 Poll::Ready(Ok(()))
1251 }
1252
1253 #[instrument(name = "read_state", skip(self, req))]
1254 fn call(&mut self, req: ReadRequest) -> Self::Future {
1255 req.count_metric();
1256 let timer = CodeTimer::start();
1257 let span = Span::current();
1258
1259 match req {
1260 // Used by the `getblockchaininfo` RPC.
1261 ReadRequest::UsageInfo => {
1262 let db = self.db.clone();
1263
1264 tokio::task::spawn_blocking(move || {
1265 span.in_scope(move || {
1266 // The work is done in the future.
1267
1268 let db_size = db.size();
1269
1270 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1271
1272 Ok(ReadResponse::UsageInfo(db_size))
1273 })
1274 })
1275 .wait_for_panics()
1276 }
1277
1278 // Used by the StateService.
1279 ReadRequest::Tip => {
1280 let state = self.clone();
1281
1282 tokio::task::spawn_blocking(move || {
1283 span.in_scope(move || {
1284 let tip = state.non_finalized_state_receiver.with_watch_data(
1285 |non_finalized_state| {
1286 read::tip(non_finalized_state.best_chain(), &state.db)
1287 },
1288 );
1289
1290 // The work is done in the future.
1291 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1292
1293 Ok(ReadResponse::Tip(tip))
1294 })
1295 })
1296 .wait_for_panics()
1297 }
1298
1299 // Used by `getblockchaininfo` RPC method.
1300 ReadRequest::TipPoolValues => {
1301 let state = self.clone();
1302
1303 tokio::task::spawn_blocking(move || {
1304 span.in_scope(move || {
1305 let tip_with_value_balance = state
1306 .non_finalized_state_receiver
1307 .with_watch_data(|non_finalized_state| {
1308 read::tip_with_value_balance(
1309 non_finalized_state.best_chain(),
1310 &state.db,
1311 )
1312 });
1313
1314 // The work is done in the future.
1315 // TODO: Do this in the Drop impl with the variant name?
1316 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1317
1318 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1319 .ok_or(BoxError::from("no chain tip available yet"))?;
1320
1321 Ok(ReadResponse::TipPoolValues {
1322 tip_height,
1323 tip_hash,
1324 value_balance,
1325 })
1326 })
1327 })
1328 .wait_for_panics()
1329 }
1330
1331 // Used by getblock
1332 ReadRequest::BlockInfo(hash_or_height) => {
1333 let state = self.clone();
1334
1335 tokio::task::spawn_blocking(move || {
1336 span.in_scope(move || {
1337 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1338 |non_finalized_state| {
1339 read::block_info(
1340 non_finalized_state.best_chain(),
1341 &state.db,
1342 hash_or_height,
1343 )
1344 },
1345 );
1346
1347 // The work is done in the future.
1348 // TODO: Do this in the Drop impl with the variant name?
1349 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1350
1351 Ok(ReadResponse::BlockInfo(value_balance))
1352 })
1353 })
1354 .wait_for_panics()
1355 }
1356
1357 // Used by the StateService.
1358 ReadRequest::Depth(hash) => {
1359 let state = self.clone();
1360
1361 tokio::task::spawn_blocking(move || {
1362 span.in_scope(move || {
1363 let depth = state.non_finalized_state_receiver.with_watch_data(
1364 |non_finalized_state| {
1365 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1366 },
1367 );
1368
1369 // The work is done in the future.
1370 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1371
1372 Ok(ReadResponse::Depth(depth))
1373 })
1374 })
1375 .wait_for_panics()
1376 }
1377
1378 // Used by the StateService.
1379 ReadRequest::BestChainNextMedianTimePast => {
1380 let state = self.clone();
1381
1382 tokio::task::spawn_blocking(move || {
1383 span.in_scope(move || {
1384 let non_finalized_state = state.latest_non_finalized_state();
1385 let median_time_past =
1386 read::next_median_time_past(&non_finalized_state, &state.db);
1387
1388 // The work is done in the future.
1389 timer.finish(
1390 module_path!(),
1391 line!(),
1392 "ReadRequest::BestChainNextMedianTimePast",
1393 );
1394
1395 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1396 })
1397 })
1398 .wait_for_panics()
1399 }
1400
1401 // Used by the get_block (raw) RPC and the StateService.
1402 ReadRequest::Block(hash_or_height) => {
1403 let state = self.clone();
1404
1405 tokio::task::spawn_blocking(move || {
1406 span.in_scope(move || {
1407 let block = state.non_finalized_state_receiver.with_watch_data(
1408 |non_finalized_state| {
1409 read::block(
1410 non_finalized_state.best_chain(),
1411 &state.db,
1412 hash_or_height,
1413 )
1414 },
1415 );
1416
1417 // The work is done in the future.
1418 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1419
1420 Ok(ReadResponse::Block(block))
1421 })
1422 })
1423 .wait_for_panics()
1424 }
1425
1426 // Used by the get_block (raw) RPC and the StateService.
1427 ReadRequest::BlockAndSize(hash_or_height) => {
1428 let state = self.clone();
1429
1430 tokio::task::spawn_blocking(move || {
1431 span.in_scope(move || {
1432 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1433 |non_finalized_state| {
1434 read::block_and_size(
1435 non_finalized_state.best_chain(),
1436 &state.db,
1437 hash_or_height,
1438 )
1439 },
1440 );
1441
1442 // The work is done in the future.
1443 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1444
1445 Ok(ReadResponse::BlockAndSize(block_and_size))
1446 })
1447 })
1448 .wait_for_panics()
1449 }
1450
1451 // Used by the get_block (verbose) RPC and the StateService.
1452 ReadRequest::BlockHeader(hash_or_height) => {
1453 let state = self.clone();
1454
1455 tokio::task::spawn_blocking(move || {
1456 span.in_scope(move || {
1457 let best_chain = state.latest_best_chain();
1458
1459 let height = hash_or_height
1460 .height_or_else(|hash| {
1461 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1462 })
1463 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1464
1465 let hash = hash_or_height
1466 .hash_or_else(|height| {
1467 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1468 })
1469 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1470
1471 let next_height = height.next()?;
1472 let next_block_hash =
1473 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1474
1475 let header = read::block_header(best_chain, &state.db, height.into())
1476 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1477
1478 // The work is done in the future.
1479 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1480
1481 Ok(ReadResponse::BlockHeader {
1482 header,
1483 hash,
1484 height,
1485 next_block_hash,
1486 })
1487 })
1488 })
1489 .wait_for_panics()
1490 }
1491
1492 // For the get_raw_transaction RPC and the StateService.
1493 ReadRequest::Transaction(hash) => {
1494 let state = self.clone();
1495
1496 tokio::task::spawn_blocking(move || {
1497 span.in_scope(move || {
1498 let response =
1499 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1500
1501 // The work is done in the future.
1502 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1503
1504 Ok(ReadResponse::Transaction(response))
1505 })
1506 })
1507 .wait_for_panics()
1508 }
1509
1510 // Used by the getblock (verbose) RPC.
1511 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1512 let state = self.clone();
1513
1514 tokio::task::spawn_blocking(move || {
1515 span.in_scope(move || {
1516 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1517 |non_finalized_state| {
1518 read::transaction_hashes_for_block(
1519 non_finalized_state.best_chain(),
1520 &state.db,
1521 hash_or_height,
1522 )
1523 },
1524 );
1525
1526 // The work is done in the future.
1527 timer.finish(
1528 module_path!(),
1529 line!(),
1530 "ReadRequest::TransactionIdsForBlock",
1531 );
1532
1533 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1534 })
1535 })
1536 .wait_for_panics()
1537 }
1538
1539 #[cfg(feature = "indexer")]
1540 ReadRequest::SpendingTransactionId(spend) => {
1541 let state = self.clone();
1542
1543 tokio::task::spawn_blocking(move || {
1544 span.in_scope(move || {
1545 let spending_transaction_id = state
1546 .non_finalized_state_receiver
1547 .with_watch_data(|non_finalized_state| {
1548 read::spending_transaction_hash(
1549 non_finalized_state.best_chain(),
1550 &state.db,
1551 spend,
1552 )
1553 });
1554
1555 // The work is done in the future.
1556 timer.finish(
1557 module_path!(),
1558 line!(),
1559 "ReadRequest::TransactionIdForSpentOutPoint",
1560 );
1561
1562 Ok(ReadResponse::TransactionId(spending_transaction_id))
1563 })
1564 })
1565 .wait_for_panics()
1566 }
1567
1568 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1569 let state = self.clone();
1570
1571 tokio::task::spawn_blocking(move || {
1572 span.in_scope(move || {
1573 let utxo = state.non_finalized_state_receiver.with_watch_data(
1574 |non_finalized_state| {
1575 read::unspent_utxo(
1576 non_finalized_state.best_chain(),
1577 &state.db,
1578 outpoint,
1579 )
1580 },
1581 );
1582
1583 // The work is done in the future.
1584 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1585
1586 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1587 })
1588 })
1589 .wait_for_panics()
1590 }
1591
1592 // Manually used by the StateService to implement part of AwaitUtxo.
1593 ReadRequest::AnyChainUtxo(outpoint) => {
1594 let state = self.clone();
1595
1596 tokio::task::spawn_blocking(move || {
1597 span.in_scope(move || {
1598 let utxo = state.non_finalized_state_receiver.with_watch_data(
1599 |non_finalized_state| {
1600 read::any_utxo(non_finalized_state, &state.db, outpoint)
1601 },
1602 );
1603
1604 // The work is done in the future.
1605 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1606
1607 Ok(ReadResponse::AnyChainUtxo(utxo))
1608 })
1609 })
1610 .wait_for_panics()
1611 }
1612
1613 // Used by the StateService.
1614 ReadRequest::BlockLocator => {
1615 let state = self.clone();
1616
1617 tokio::task::spawn_blocking(move || {
1618 span.in_scope(move || {
1619 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1620 |non_finalized_state| {
1621 read::block_locator(non_finalized_state.best_chain(), &state.db)
1622 },
1623 );
1624
1625 // The work is done in the future.
1626 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1627
1628 Ok(ReadResponse::BlockLocator(
1629 block_locator.unwrap_or_default(),
1630 ))
1631 })
1632 })
1633 .wait_for_panics()
1634 }
1635
1636 // Used by the StateService.
1637 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1638 let state = self.clone();
1639
1640 tokio::task::spawn_blocking(move || {
1641 span.in_scope(move || {
1642 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1643 |non_finalized_state| {
1644 read::find_chain_hashes(
1645 non_finalized_state.best_chain(),
1646 &state.db,
1647 known_blocks,
1648 stop,
1649 MAX_FIND_BLOCK_HASHES_RESULTS,
1650 )
1651 },
1652 );
1653
1654 // The work is done in the future.
1655 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1656
1657 Ok(ReadResponse::BlockHashes(block_hashes))
1658 })
1659 })
1660 .wait_for_panics()
1661 }
1662
1663 // Used by the StateService.
1664 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1665 let state = self.clone();
1666
1667 tokio::task::spawn_blocking(move || {
1668 span.in_scope(move || {
1669 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1670 |non_finalized_state| {
1671 read::find_chain_headers(
1672 non_finalized_state.best_chain(),
1673 &state.db,
1674 known_blocks,
1675 stop,
1676 MAX_FIND_BLOCK_HEADERS_RESULTS,
1677 )
1678 },
1679 );
1680
1681 let block_headers = block_headers
1682 .into_iter()
1683 .map(|header| CountedHeader { header })
1684 .collect();
1685
1686 // The work is done in the future.
1687 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1688
1689 Ok(ReadResponse::BlockHeaders(block_headers))
1690 })
1691 })
1692 .wait_for_panics()
1693 }
1694
1695 ReadRequest::SaplingTree(hash_or_height) => {
1696 let state = self.clone();
1697
1698 tokio::task::spawn_blocking(move || {
1699 span.in_scope(move || {
1700 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1701 |non_finalized_state| {
1702 read::sapling_tree(
1703 non_finalized_state.best_chain(),
1704 &state.db,
1705 hash_or_height,
1706 )
1707 },
1708 );
1709
1710 // The work is done in the future.
1711 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1712
1713 Ok(ReadResponse::SaplingTree(sapling_tree))
1714 })
1715 })
1716 .wait_for_panics()
1717 }
1718
1719 ReadRequest::OrchardTree(hash_or_height) => {
1720 let state = self.clone();
1721
1722 tokio::task::spawn_blocking(move || {
1723 span.in_scope(move || {
1724 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1725 |non_finalized_state| {
1726 read::orchard_tree(
1727 non_finalized_state.best_chain(),
1728 &state.db,
1729 hash_or_height,
1730 )
1731 },
1732 );
1733
1734 // The work is done in the future.
1735 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1736
1737 Ok(ReadResponse::OrchardTree(orchard_tree))
1738 })
1739 })
1740 .wait_for_panics()
1741 }
1742
1743 ReadRequest::SaplingSubtrees { start_index, limit } => {
1744 let state = self.clone();
1745
1746 tokio::task::spawn_blocking(move || {
1747 span.in_scope(move || {
1748 let end_index = limit
1749 .and_then(|limit| start_index.0.checked_add(limit.0))
1750 .map(NoteCommitmentSubtreeIndex);
1751
1752 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1753 |non_finalized_state| {
1754 if let Some(end_index) = end_index {
1755 read::sapling_subtrees(
1756 non_finalized_state.best_chain(),
1757 &state.db,
1758 start_index..end_index,
1759 )
1760 } else {
1761 // If there is no end bound, just return all the trees.
1762 // If the end bound would overflow, just returns all the trees, because that's what
1763 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1764 // the trees run out.)
1765 read::sapling_subtrees(
1766 non_finalized_state.best_chain(),
1767 &state.db,
1768 start_index..,
1769 )
1770 }
1771 },
1772 );
1773
1774 // The work is done in the future.
1775 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1776
1777 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1778 })
1779 })
1780 .wait_for_panics()
1781 }
1782
1783 ReadRequest::OrchardSubtrees { start_index, limit } => {
1784 let state = self.clone();
1785
1786 tokio::task::spawn_blocking(move || {
1787 span.in_scope(move || {
1788 let end_index = limit
1789 .and_then(|limit| start_index.0.checked_add(limit.0))
1790 .map(NoteCommitmentSubtreeIndex);
1791
1792 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1793 |non_finalized_state| {
1794 if let Some(end_index) = end_index {
1795 read::orchard_subtrees(
1796 non_finalized_state.best_chain(),
1797 &state.db,
1798 start_index..end_index,
1799 )
1800 } else {
1801 // If there is no end bound, just return all the trees.
1802 // If the end bound would overflow, just returns all the trees, because that's what
1803 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1804 // the trees run out.)
1805 read::orchard_subtrees(
1806 non_finalized_state.best_chain(),
1807 &state.db,
1808 start_index..,
1809 )
1810 }
1811 },
1812 );
1813
1814 // The work is done in the future.
1815 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1816
1817 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1818 })
1819 })
1820 .wait_for_panics()
1821 }
1822
1823 // For the get_address_balance RPC.
1824 ReadRequest::AddressBalance(addresses) => {
1825 let state = self.clone();
1826
1827 tokio::task::spawn_blocking(move || {
1828 span.in_scope(move || {
1829 let (balance, received) = state
1830 .non_finalized_state_receiver
1831 .with_watch_data(|non_finalized_state| {
1832 read::transparent_balance(
1833 non_finalized_state.best_chain().cloned(),
1834 &state.db,
1835 addresses,
1836 )
1837 })?;
1838
1839 // The work is done in the future.
1840 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1841
1842 Ok(ReadResponse::AddressBalance { balance, received })
1843 })
1844 })
1845 .wait_for_panics()
1846 }
1847
1848 // For the get_address_tx_ids RPC.
1849 ReadRequest::TransactionIdsByAddresses {
1850 addresses,
1851 height_range,
1852 } => {
1853 let state = self.clone();
1854
1855 tokio::task::spawn_blocking(move || {
1856 span.in_scope(move || {
1857 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1858 |non_finalized_state| {
1859 read::transparent_tx_ids(
1860 non_finalized_state.best_chain(),
1861 &state.db,
1862 addresses,
1863 height_range,
1864 )
1865 },
1866 );
1867
1868 // The work is done in the future.
1869 timer.finish(
1870 module_path!(),
1871 line!(),
1872 "ReadRequest::TransactionIdsByAddresses",
1873 );
1874
1875 tx_ids.map(ReadResponse::AddressesTransactionIds)
1876 })
1877 })
1878 .wait_for_panics()
1879 }
1880
1881 // For the get_address_utxos RPC.
1882 ReadRequest::UtxosByAddresses(addresses) => {
1883 let state = self.clone();
1884
1885 tokio::task::spawn_blocking(move || {
1886 span.in_scope(move || {
1887 let utxos = state.non_finalized_state_receiver.with_watch_data(
1888 |non_finalized_state| {
1889 read::address_utxos(
1890 &state.network,
1891 non_finalized_state.best_chain(),
1892 &state.db,
1893 addresses,
1894 )
1895 },
1896 );
1897
1898 // The work is done in the future.
1899 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1900
1901 utxos.map(ReadResponse::AddressUtxos)
1902 })
1903 })
1904 .wait_for_panics()
1905 }
1906
1907 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1908 let state = self.clone();
1909
1910 tokio::task::spawn_blocking(move || {
1911 span.in_scope(move || {
1912 let latest_non_finalized_best_chain =
1913 state.latest_non_finalized_state().best_chain().cloned();
1914
1915 check::nullifier::tx_no_duplicates_in_chain(
1916 &state.db,
1917 latest_non_finalized_best_chain.as_ref(),
1918 &unmined_tx.transaction,
1919 )?;
1920
1921 check::anchors::tx_anchors_refer_to_final_treestates(
1922 &state.db,
1923 latest_non_finalized_best_chain.as_ref(),
1924 &unmined_tx,
1925 )?;
1926
1927 // The work is done in the future.
1928 timer.finish(
1929 module_path!(),
1930 line!(),
1931 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1932 );
1933
1934 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1935 })
1936 })
1937 .wait_for_panics()
1938 }
1939
1940 // Used by the get_block and get_block_hash RPCs.
1941 ReadRequest::BestChainBlockHash(height) => {
1942 let state = self.clone();
1943
1944 // # Performance
1945 //
1946 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1947
1948 tokio::task::spawn_blocking(move || {
1949 span.in_scope(move || {
1950 let hash = state.non_finalized_state_receiver.with_watch_data(
1951 |non_finalized_state| {
1952 read::hash_by_height(
1953 non_finalized_state.best_chain(),
1954 &state.db,
1955 height,
1956 )
1957 },
1958 );
1959
1960 // The work is done in the future.
1961 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
1962
1963 Ok(ReadResponse::BlockHash(hash))
1964 })
1965 })
1966 .wait_for_panics()
1967 }
1968
1969 // Used by get_block_template and getblockchaininfo RPCs.
1970 ReadRequest::ChainInfo => {
1971 let state = self.clone();
1972 let latest_non_finalized_state = self.latest_non_finalized_state();
1973
1974 // # Performance
1975 //
1976 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1977
1978 tokio::task::spawn_blocking(move || {
1979 span.in_scope(move || {
1980 // # Correctness
1981 //
1982 // It is ok to do these lookups using multiple database calls. Finalized state updates
1983 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1984 //
1985 // If there is a large overlap between the non-finalized and finalized states,
1986 // where the finalized tip is above the non-finalized tip,
1987 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1988 //
1989 // In that case, the `getblocktemplate` RPC will return an error because Zebra
1990 // is not synced to the tip. That check happens before the RPC makes this request.
1991 let get_block_template_info =
1992 read::difficulty::get_block_template_chain_info(
1993 &latest_non_finalized_state,
1994 &state.db,
1995 &state.network,
1996 );
1997
1998 // The work is done in the future.
1999 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
2000
2001 get_block_template_info.map(ReadResponse::ChainInfo)
2002 })
2003 })
2004 .wait_for_panics()
2005 }
2006
2007 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
2008 ReadRequest::SolutionRate { num_blocks, height } => {
2009 let state = self.clone();
2010
2011 // # Performance
2012 //
2013 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2014
2015 tokio::task::spawn_blocking(move || {
2016 span.in_scope(move || {
2017 let latest_non_finalized_state = state.latest_non_finalized_state();
2018 // # Correctness
2019 //
2020 // It is ok to do these lookups using multiple database calls. Finalized state updates
2021 // can only add overlapping blocks, and block hashes are unique across all chain forks.
2022 //
2023 // The worst that can happen here is that the default `start_hash` will be below
2024 // the chain tip.
2025 let (tip_height, tip_hash) =
2026 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
2027 Some(tip_hash) => tip_hash,
2028 None => return Ok(ReadResponse::SolutionRate(None)),
2029 };
2030
2031 let start_hash = match height {
2032 Some(height) if height < tip_height => read::hash_by_height(
2033 latest_non_finalized_state.best_chain(),
2034 &state.db,
2035 height,
2036 ),
2037 // use the chain tip hash if height is above it or not provided.
2038 _ => Some(tip_hash),
2039 };
2040
2041 let solution_rate = start_hash.and_then(|start_hash| {
2042 read::difficulty::solution_rate(
2043 &latest_non_finalized_state,
2044 &state.db,
2045 num_blocks,
2046 start_hash,
2047 )
2048 });
2049
2050 // The work is done in the future.
2051 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
2052
2053 Ok(ReadResponse::SolutionRate(solution_rate))
2054 })
2055 })
2056 .wait_for_panics()
2057 }
2058
2059 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
2060 let state = self.clone();
2061
2062 // # Performance
2063 //
2064 // Allow other async tasks to make progress while concurrently reading blocks from disk.
2065
2066 tokio::task::spawn_blocking(move || {
2067 span.in_scope(move || {
2068 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
2069 let mut latest_non_finalized_state = state.latest_non_finalized_state();
2070
2071 // The previous block of a valid proposal must be on the best chain tip.
2072 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
2073 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
2074 };
2075
2076 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
2077 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
2078 }
2079
2080 // This clone of the non-finalized state is dropped when this closure returns.
2081 // The non-finalized state that's used in the rest of the state (including finalizing
2082 // blocks into the db) is not mutated here.
2083 //
2084 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2085 latest_non_finalized_state.disable_metrics();
2086
2087 write::validate_and_commit_non_finalized(
2088 &state.db,
2089 &mut latest_non_finalized_state,
2090 semantically_verified,
2091 )?;
2092
2093 // The work is done in the future.
2094 timer.finish(
2095 module_path!(),
2096 line!(),
2097 "ReadRequest::CheckBlockProposalValidity",
2098 );
2099
2100 Ok(ReadResponse::ValidBlockProposal)
2101 })
2102 })
2103 .wait_for_panics()
2104 }
2105
2106 ReadRequest::TipBlockSize => {
2107 let state = self.clone();
2108
2109 tokio::task::spawn_blocking(move || {
2110 span.in_scope(move || {
2111 // Get the best chain tip height.
2112 let tip_height = state
2113 .non_finalized_state_receiver
2114 .with_watch_data(|non_finalized_state| {
2115 read::tip_height(non_finalized_state.best_chain(), &state.db)
2116 })
2117 .unwrap_or(Height(0));
2118
2119 // Get the block at the best chain tip height.
2120 let block = state.non_finalized_state_receiver.with_watch_data(
2121 |non_finalized_state| {
2122 read::block(
2123 non_finalized_state.best_chain(),
2124 &state.db,
2125 tip_height.into(),
2126 )
2127 },
2128 );
2129
2130 // The work is done in the future.
2131 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2132
2133 // Respond with the length of the obtained block if any.
2134 match block {
2135 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2136 b.zcash_serialize_to_vec()?.len(),
2137 ))),
2138 None => Ok(ReadResponse::TipBlockSize(None)),
2139 }
2140 })
2141 })
2142 .wait_for_panics()
2143 }
2144
2145 ReadRequest::NonFinalizedBlocksListener => {
2146 // The non-finalized blocks listener is used to notify the state service
2147 // about new blocks that have been added to the non-finalized state.
2148 let non_finalized_blocks_listener = NonFinalizedBlocksListener::spawn(
2149 self.network.clone(),
2150 self.non_finalized_state_receiver.clone(),
2151 );
2152
2153 async move {
2154 timer.finish(
2155 module_path!(),
2156 line!(),
2157 "ReadRequest::NonFinalizedBlocksListener",
2158 );
2159
2160 Ok(ReadResponse::NonFinalizedBlocksListener(
2161 non_finalized_blocks_listener,
2162 ))
2163 }
2164 .boxed()
2165 }
2166 }
2167 }
2168}
2169
2170/// Initialize a state service from the provided [`Config`].
2171/// Returns a boxed state service, a read-only state service,
2172/// and receivers for state chain tip updates.
2173///
2174/// Each `network` has its own separate on-disk database.
2175///
2176/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2177/// to work out when it is near the final checkpoint.
2178///
2179/// To share access to the state, wrap the returned service in a `Buffer`,
2180/// or clone the returned [`ReadStateService`].
2181///
2182/// It's possible to construct multiple state services in the same application (as
2183/// long as they, e.g., use different storage locations), but doing so is
2184/// probably not what you want.
2185pub fn init(
2186 config: Config,
2187 network: &Network,
2188 max_checkpoint_height: block::Height,
2189 checkpoint_verify_concurrency_limit: usize,
2190) -> (
2191 BoxService<Request, Response, BoxError>,
2192 ReadStateService,
2193 LatestChainTip,
2194 ChainTipChange,
2195) {
2196 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2197 StateService::new(
2198 config,
2199 network,
2200 max_checkpoint_height,
2201 checkpoint_verify_concurrency_limit,
2202 );
2203
2204 (
2205 BoxService::new(state_service),
2206 read_only_state_service,
2207 latest_chain_tip,
2208 chain_tip_change,
2209 )
2210}
2211
2212/// Initialize a read state service from the provided [`Config`].
2213/// Returns a read-only state service,
2214///
2215/// Each `network` has its own separate on-disk database.
2216///
2217/// To share access to the state, clone the returned [`ReadStateService`].
2218pub fn init_read_only(
2219 config: Config,
2220 network: &Network,
2221) -> (
2222 ReadStateService,
2223 ZebraDb,
2224 tokio::sync::watch::Sender<NonFinalizedState>,
2225) {
2226 let finalized_state = FinalizedState::new_with_debug(
2227 &config,
2228 network,
2229 true,
2230 #[cfg(feature = "elasticsearch")]
2231 false,
2232 true,
2233 );
2234 let (non_finalized_state_sender, non_finalized_state_receiver) =
2235 tokio::sync::watch::channel(NonFinalizedState::new(network));
2236
2237 (
2238 ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
2239 finalized_state.db.clone(),
2240 non_finalized_state_sender,
2241 )
2242}
2243
2244/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2245/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2246pub fn spawn_init_read_only(
2247 config: Config,
2248 network: &Network,
2249) -> tokio::task::JoinHandle<(
2250 ReadStateService,
2251 ZebraDb,
2252 tokio::sync::watch::Sender<NonFinalizedState>,
2253)> {
2254 let network = network.clone();
2255 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2256}
2257
2258/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
2259/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
2260/// a read state service, and receivers for state chain tip updates.
2261pub fn spawn_init(
2262 config: Config,
2263 network: &Network,
2264 max_checkpoint_height: block::Height,
2265 checkpoint_verify_concurrency_limit: usize,
2266) -> tokio::task::JoinHandle<(
2267 BoxService<Request, Response, BoxError>,
2268 ReadStateService,
2269 LatestChainTip,
2270 ChainTipChange,
2271)> {
2272 let network = network.clone();
2273 tokio::task::spawn_blocking(move || {
2274 init(
2275 config,
2276 &network,
2277 max_checkpoint_height,
2278 checkpoint_verify_concurrency_limit,
2279 )
2280 })
2281}
2282
2283/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2284///
2285/// This can be used to create a state service for testing. See also [`init`].
2286#[cfg(any(test, feature = "proptest-impl"))]
2287pub fn init_test(network: &Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2288 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2289 // if we ever need to test final checkpoint sent UTXO queries
2290 let (state_service, _, _, _) =
2291 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2292
2293 Buffer::new(BoxService::new(state_service), 1)
2294}
2295
2296/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2297/// then returns the read-write service, read-only service, and tip watch channels.
2298///
2299/// This can be used to create a state service for testing. See also [`init`].
2300#[cfg(any(test, feature = "proptest-impl"))]
2301pub fn init_test_services(
2302 network: &Network,
2303) -> (
2304 Buffer<BoxService<Request, Response, BoxError>, Request>,
2305 ReadStateService,
2306 LatestChainTip,
2307 ChainTipChange,
2308) {
2309 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2310 // if we ever need to test final checkpoint sent UTXO queries
2311 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2312 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2313
2314 let state_service = Buffer::new(BoxService::new(state_service), 1);
2315
2316 (
2317 state_service,
2318 read_state_service,
2319 latest_chain_tip,
2320 chain_tip_change,
2321 )
2322}