zebra_state/service.rs
1//! [`tower::Service`]s for Zebra's cached chain state.
2//!
3//! Zebra provides cached state access via two main services:
4//! - [`StateService`]: a read-write service that writes blocks to the state,
5//! and redirects most read requests to the [`ReadStateService`].
6//! - [`ReadStateService`]: a read-only service that answers from the most
7//! recent committed block.
8//!
9//! Most users should prefer [`ReadStateService`], unless they need to write blocks to the state.
10//!
11//! Zebra also provides access to the best chain tip via:
12//! - [`LatestChainTip`]: a read-only channel that contains the latest committed
13//! tip.
14//! - [`ChainTipChange`]: a read-only channel that can asynchronously await
15//! chain tip changes.
16
17use std::{
18 collections::HashMap,
19 convert,
20 future::Future,
21 pin::Pin,
22 sync::Arc,
23 task::{Context, Poll},
24 time::{Duration, Instant},
25};
26
27use futures::future::FutureExt;
28use tokio::sync::{oneshot, watch};
29use tower::{util::BoxService, Service, ServiceExt};
30use tracing::{instrument, Instrument, Span};
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use tower::buffer::Buffer;
34
35use zebra_chain::{
36 block::{self, CountedHeader, HeightDiff},
37 diagnostic::{task::WaitForPanics, CodeTimer},
38 parameters::{Network, NetworkUpgrade},
39 subtree::NoteCommitmentSubtreeIndex,
40};
41
42use zebra_chain::{block::Height, serialization::ZcashSerialize};
43
44use crate::{
45 constants::{
46 MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS, MAX_LEGACY_CHAIN_BLOCKS,
47 },
48 service::{
49 block_iter::any_ancestor_blocks,
50 chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
51 finalized_state::{FinalizedState, ZebraDb},
52 non_finalized_state::{Chain, NonFinalizedState},
53 pending_utxos::PendingUtxos,
54 queued_blocks::QueuedBlocks,
55 watch_receiver::WatchReceiver,
56 },
57 BoxError, CheckpointVerifiedBlock, CloneError, Config, ReadRequest, ReadResponse, Request,
58 Response, SemanticallyVerifiedBlock,
59};
60
61pub mod block_iter;
62pub mod chain_tip;
63pub mod watch_receiver;
64
65pub mod check;
66
67pub(crate) mod finalized_state;
68pub(crate) mod non_finalized_state;
69mod pending_utxos;
70mod queued_blocks;
71pub(crate) mod read;
72mod write;
73
74#[cfg(any(test, feature = "proptest-impl"))]
75pub mod arbitrary;
76
77#[cfg(test)]
78mod tests;
79
80pub use finalized_state::{OutputIndex, OutputLocation, TransactionIndex, TransactionLocation};
81
82use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes};
83
84/// A read-write service for Zebra's cached blockchain state.
85///
86/// This service modifies and provides access to:
87/// - the non-finalized state: the ~100 most recent blocks.
88/// Zebra allows chain forks in the non-finalized state,
89/// stores it in memory, and re-downloads it when restarted.
90/// - the finalized state: older blocks that have many confirmations.
91/// Zebra stores the single best chain in the finalized state,
92/// and re-loads it from disk when restarted.
93///
94/// Read requests to this service are buffered, then processed concurrently.
95/// Block write requests are buffered, then queued, then processed in order by a separate task.
96///
97/// Most state users can get faster read responses using the [`ReadStateService`],
98/// because its requests do not share a [`tower::buffer::Buffer`] with block write requests.
99///
100/// To quickly get the latest block, use [`LatestChainTip`] or [`ChainTipChange`].
101/// They can read the latest block directly, without queueing any requests.
102#[derive(Debug)]
103pub(crate) struct StateService {
104 // Configuration
105 //
106 /// The configured Zcash network.
107 network: Network,
108
109 /// The height that we start storing UTXOs from finalized blocks.
110 ///
111 /// This height should be lower than the last few checkpoints,
112 /// so the full verifier can verify UTXO spends from those blocks,
113 /// even if they haven't been committed to the finalized state yet.
114 full_verifier_utxo_lookahead: block::Height,
115
116 // Queued Blocks
117 //
118 /// Queued blocks for the [`NonFinalizedState`] that arrived out of order.
119 /// These blocks are awaiting their parent blocks before they can do contextual verification.
120 non_finalized_state_queued_blocks: QueuedBlocks,
121
122 /// Queued blocks for the [`FinalizedState`] that arrived out of order.
123 /// These blocks are awaiting their parent blocks before they can do contextual verification.
124 ///
125 /// Indexed by their parent block hash.
126 finalized_state_queued_blocks: HashMap<block::Hash, QueuedCheckpointVerified>,
127
128 /// A channel to send blocks to the `block_write_task`,
129 /// so they can be written to the [`NonFinalizedState`].
130 non_finalized_block_write_sender:
131 Option<tokio::sync::mpsc::UnboundedSender<QueuedSemanticallyVerified>>,
132
133 /// A channel to send blocks to the `block_write_task`,
134 /// so they can be written to the [`FinalizedState`].
135 ///
136 /// This sender is dropped after the state has finished sending all the checkpointed blocks,
137 /// and the lowest semantically verified block arrives.
138 finalized_block_write_sender:
139 Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
140
141 /// The [`block::Hash`] of the most recent block sent on
142 /// `finalized_block_write_sender` or `non_finalized_block_write_sender`.
143 ///
144 /// On startup, this is:
145 /// - the finalized tip, if there are stored blocks, or
146 /// - the genesis block's parent hash, if the database is empty.
147 ///
148 /// If `invalid_block_write_reset_receiver` gets a reset, this is:
149 /// - the hash of the last valid committed block (the parent of the invalid block).
150 finalized_block_write_last_sent_hash: block::Hash,
151
152 /// A set of block hashes that have been sent to the block write task.
153 /// Hashes of blocks below the finalized tip height are periodically pruned.
154 non_finalized_block_write_sent_hashes: SentHashes,
155
156 /// If an invalid block is sent on `finalized_block_write_sender`
157 /// or `non_finalized_block_write_sender`,
158 /// this channel gets the [`block::Hash`] of the valid tip.
159 //
160 // TODO: add tests for finalized and non-finalized resets (#2654)
161 invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
162
163 // Pending UTXO Request Tracking
164 //
165 /// The set of outpoints with pending requests for their associated transparent::Output.
166 pending_utxos: PendingUtxos,
167
168 /// Instant tracking the last time `pending_utxos` was pruned.
169 last_prune: Instant,
170
171 // Updating Concurrently Readable State
172 //
173 /// A cloneable [`ReadStateService`], used to answer concurrent read requests.
174 ///
175 /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`.
176 read_service: ReadStateService,
177
178 // Metrics
179 //
180 /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks`
181 ///
182 /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs
183 /// as a break in the graph.
184 max_finalized_queue_height: f64,
185}
186
187/// A read-only service for accessing Zebra's cached blockchain state.
188///
189/// This service provides read-only access to:
190/// - the non-finalized state: the ~100 most recent blocks.
191/// - the finalized state: older blocks that have many confirmations.
192///
193/// Requests to this service are processed in parallel,
194/// ignoring any blocks queued by the read-write [`StateService`].
195///
196/// This quick response behavior is better for most state users.
197/// It allows other async tasks to make progress while concurrently reading data from disk.
198#[derive(Clone, Debug)]
199pub struct ReadStateService {
200 // Configuration
201 //
202 /// The configured Zcash network.
203 network: Network,
204
205 // Shared Concurrently Readable State
206 //
207 /// A watch channel with a cached copy of the [`NonFinalizedState`].
208 ///
209 /// This state is only updated between requests,
210 /// so it might include some block data that is also on `disk`.
211 non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
212
213 /// The shared inner on-disk database for the finalized state.
214 ///
215 /// RocksDB allows reads and writes via a shared reference,
216 /// but [`ZebraDb`] doesn't expose any write methods or types.
217 ///
218 /// This chain is updated concurrently with requests,
219 /// so it might include some block data that is also in `best_mem`.
220 db: ZebraDb,
221
222 /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`],
223 /// once the queues have received all their parent blocks.
224 ///
225 /// Used to check for panics when writing blocks.
226 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
227}
228
229impl Drop for StateService {
230 fn drop(&mut self) {
231 // The state service owns the state, tasks, and channels,
232 // so dropping it should shut down everything.
233
234 // Close the channels (non-blocking)
235 // This makes the block write thread exit the next time it checks the channels.
236 // We want to do this here so we get any errors or panics from the block write task before it shuts down.
237 self.invalid_block_write_reset_receiver.close();
238
239 std::mem::drop(self.finalized_block_write_sender.take());
240 std::mem::drop(self.non_finalized_block_write_sender.take());
241
242 self.clear_finalized_block_queue(
243 "dropping the state: dropped unused finalized state queue block",
244 );
245 self.clear_non_finalized_block_queue(
246 "dropping the state: dropped unused non-finalized state queue block",
247 );
248
249 // Log database metrics before shutting down
250 info!("dropping the state: logging database metrics");
251 self.log_db_metrics();
252
253 // Then drop self.read_service, which checks the block write task for panics,
254 // and tries to shut down the database.
255 }
256}
257
258impl Drop for ReadStateService {
259 fn drop(&mut self) {
260 // The read state service shares the state,
261 // so dropping it should check if we can shut down.
262
263 // TODO: move this into a try_shutdown() method
264 if let Some(block_write_task) = self.block_write_task.take() {
265 if let Some(block_write_task_handle) = Arc::into_inner(block_write_task) {
266 // We're the last database user, so we can tell it to shut down (blocking):
267 // - flushes the database to disk, and
268 // - drops the database, which cleans up any database tasks correctly.
269 self.db.shutdown(true);
270
271 // We are the last state with a reference to this thread, so we can
272 // wait until the block write task finishes, then check for panics (blocking).
273 // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.)
274
275 // This log is verbose during tests.
276 #[cfg(not(test))]
277 info!("waiting for the block write task to finish");
278 #[cfg(test)]
279 debug!("waiting for the block write task to finish");
280
281 // TODO: move this into a check_for_panics() method
282 if let Err(thread_panic) = block_write_task_handle.join() {
283 std::panic::resume_unwind(thread_panic);
284 } else {
285 debug!("shutting down the state because the block write task has finished");
286 }
287 }
288 } else {
289 // Even if we're not the last database user, try shutting it down.
290 //
291 // TODO: rename this to try_shutdown()?
292 self.db.shutdown(false);
293 }
294 }
295}
296
297impl StateService {
298 const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
299
300 /// Creates a new state service for the state `config` and `network`.
301 ///
302 /// Uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
303 /// to work out when it is near the final checkpoint.
304 ///
305 /// Returns the read-write and read-only state services,
306 /// and read-only watch channels for its best chain tip.
307 pub fn new(
308 config: Config,
309 network: &Network,
310 max_checkpoint_height: block::Height,
311 checkpoint_verify_concurrency_limit: usize,
312 ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) {
313 let timer = CodeTimer::start();
314 let finalized_state = FinalizedState::new(
315 &config,
316 network,
317 #[cfg(feature = "elasticsearch")]
318 true,
319 );
320 timer.finish(module_path!(), line!(), "opening finalized state database");
321
322 let timer = CodeTimer::start();
323 let initial_tip = finalized_state
324 .db
325 .tip_block()
326 .map(CheckpointVerifiedBlock::from)
327 .map(ChainTipBlock::from);
328
329 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
330 ChainTipSender::new(initial_tip, network);
331
332 let non_finalized_state = NonFinalizedState::new(network);
333
334 let (non_finalized_state_sender, non_finalized_state_receiver) =
335 watch::channel(NonFinalizedState::new(&finalized_state.network()));
336
337 // Security: The number of blocks in these channels is limited by
338 // the syncer and inbound lookahead limits.
339 let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
340 tokio::sync::mpsc::unbounded_channel();
341 let (finalized_block_write_sender, finalized_block_write_receiver) =
342 tokio::sync::mpsc::unbounded_channel();
343 let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
344 tokio::sync::mpsc::unbounded_channel();
345
346 let finalized_state_for_writing = finalized_state.clone();
347 let span = Span::current();
348 let block_write_task = std::thread::spawn(move || {
349 span.in_scope(move || {
350 write::write_blocks_from_channels(
351 finalized_block_write_receiver,
352 non_finalized_block_write_receiver,
353 finalized_state_for_writing,
354 non_finalized_state,
355 invalid_block_reset_sender,
356 chain_tip_sender,
357 non_finalized_state_sender,
358 )
359 })
360 });
361 let block_write_task = Arc::new(block_write_task);
362
363 let read_service = ReadStateService::new(
364 &finalized_state,
365 Some(block_write_task),
366 non_finalized_state_receiver,
367 );
368
369 let full_verifier_utxo_lookahead = max_checkpoint_height
370 - HeightDiff::try_from(checkpoint_verify_concurrency_limit)
371 .expect("fits in HeightDiff");
372 let full_verifier_utxo_lookahead =
373 full_verifier_utxo_lookahead.unwrap_or(block::Height::MIN);
374 let non_finalized_state_queued_blocks = QueuedBlocks::default();
375 let pending_utxos = PendingUtxos::default();
376
377 let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash();
378
379 let state = Self {
380 network: network.clone(),
381 full_verifier_utxo_lookahead,
382 non_finalized_state_queued_blocks,
383 finalized_state_queued_blocks: HashMap::new(),
384 non_finalized_block_write_sender: Some(non_finalized_block_write_sender),
385 finalized_block_write_sender: Some(finalized_block_write_sender),
386 finalized_block_write_last_sent_hash,
387 non_finalized_block_write_sent_hashes: SentHashes::default(),
388 invalid_block_write_reset_receiver,
389 pending_utxos,
390 last_prune: Instant::now(),
391 read_service: read_service.clone(),
392 max_finalized_queue_height: f64::NAN,
393 };
394 timer.finish(module_path!(), line!(), "initializing state service");
395
396 tracing::info!("starting legacy chain check");
397 let timer = CodeTimer::start();
398
399 if let (Some(tip), Some(nu5_activation_height)) = (
400 state.best_tip(),
401 NetworkUpgrade::Nu5.activation_height(network),
402 ) {
403 if let Err(error) = check::legacy_chain(
404 nu5_activation_height,
405 any_ancestor_blocks(
406 &state.read_service.latest_non_finalized_state(),
407 &state.read_service.db,
408 tip.1,
409 ),
410 &state.network,
411 MAX_LEGACY_CHAIN_BLOCKS,
412 ) {
413 let legacy_db_path = state.read_service.db.path().to_path_buf();
414 panic!(
415 "Cached state contains a legacy chain.\n\
416 An outdated Zebra version did not know about a recent network upgrade,\n\
417 so it followed a legacy chain using outdated consensus branch rules.\n\
418 Hint: Delete your database, and restart Zebra to do a full sync.\n\
419 Database path: {legacy_db_path:?}\n\
420 Error: {error:?}",
421 );
422 }
423 }
424
425 tracing::info!("cached state consensus branch is valid: no legacy chain found");
426 timer.finish(module_path!(), line!(), "legacy chain check");
427
428 (state, read_service, latest_chain_tip, chain_tip_change)
429 }
430
431 /// Call read only state service to log rocksdb database metrics.
432 pub fn log_db_metrics(&self) {
433 self.read_service.db.print_db_metrics();
434 }
435
436 /// Queue a checkpoint verified block for verification and storage in the finalized state.
437 ///
438 /// Returns a channel receiver that provides the result of the block commit.
439 fn queue_and_commit_to_finalized_state(
440 &mut self,
441 checkpoint_verified: CheckpointVerifiedBlock,
442 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
443 // # Correctness & Performance
444 //
445 // This method must not block, access the database, or perform CPU-intensive tasks,
446 // because it is called directly from the tokio executor's Future threads.
447
448 let queued_prev_hash = checkpoint_verified.block.header.previous_block_hash;
449 let queued_height = checkpoint_verified.height;
450
451 // If we're close to the final checkpoint, make the block's UTXOs available for
452 // semantic block verification, even when it is in the channel.
453 if self.is_close_to_final_checkpoint(queued_height) {
454 self.non_finalized_block_write_sent_hashes
455 .add_finalized(&checkpoint_verified)
456 }
457
458 let (rsp_tx, rsp_rx) = oneshot::channel();
459 let queued = (checkpoint_verified, rsp_tx);
460
461 if self.finalized_block_write_sender.is_some() {
462 // We're still committing checkpoint verified blocks
463 if let Some(duplicate_queued) = self
464 .finalized_state_queued_blocks
465 .insert(queued_prev_hash, queued)
466 {
467 Self::send_checkpoint_verified_block_error(
468 duplicate_queued,
469 "dropping older checkpoint verified block: got newer duplicate block",
470 );
471 }
472
473 self.drain_finalized_queue_and_commit();
474 } else {
475 // We've finished committing checkpoint verified blocks to the finalized state,
476 // so drop any repeated queued blocks, and return an error.
477 //
478 // TODO: track the latest sent height, and drop any blocks under that height
479 // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks)
480 Self::send_checkpoint_verified_block_error(
481 queued,
482 "already finished committing checkpoint verified blocks: dropped duplicate block, \
483 block is already committed to the state",
484 );
485
486 self.clear_finalized_block_queue(
487 "already finished committing checkpoint verified blocks: dropped duplicate block, \
488 block is already committed to the state",
489 );
490 }
491
492 if self.finalized_state_queued_blocks.is_empty() {
493 self.max_finalized_queue_height = f64::NAN;
494 } else if self.max_finalized_queue_height.is_nan()
495 || self.max_finalized_queue_height < queued_height.0 as f64
496 {
497 // if there are still blocks in the queue, then either:
498 // - the new block was lower than the old maximum, and there was a gap before it,
499 // so the maximum is still the same (and we skip this code), or
500 // - the new block is higher than the old maximum, and there is at least one gap
501 // between the finalized tip and the new maximum
502 self.max_finalized_queue_height = queued_height.0 as f64;
503 }
504
505 metrics::gauge!("state.checkpoint.queued.max.height").set(self.max_finalized_queue_height);
506 metrics::gauge!("state.checkpoint.queued.block.count")
507 .set(self.finalized_state_queued_blocks.len() as f64);
508
509 rsp_rx
510 }
511
512 /// Finds finalized state queue blocks to be committed to the state in order,
513 /// removes them from the queue, and sends them to the block commit task.
514 ///
515 /// After queueing a finalized block, this method checks whether the newly
516 /// queued block (and any of its descendants) can be committed to the state.
517 ///
518 /// Returns an error if the block commit channel has been closed.
519 pub fn drain_finalized_queue_and_commit(&mut self) {
520 use tokio::sync::mpsc::error::{SendError, TryRecvError};
521
522 // # Correctness & Performance
523 //
524 // This method must not block, access the database, or perform CPU-intensive tasks,
525 // because it is called directly from the tokio executor's Future threads.
526
527 // If a block failed, we need to start again from a valid tip.
528 match self.invalid_block_write_reset_receiver.try_recv() {
529 Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash,
530 Err(TryRecvError::Disconnected) => {
531 info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
532 return;
533 }
534 // There are no errors, so we can just use the last block hash we sent
535 Err(TryRecvError::Empty) => {}
536 }
537
538 while let Some(queued_block) = self
539 .finalized_state_queued_blocks
540 .remove(&self.finalized_block_write_last_sent_hash)
541 {
542 let last_sent_finalized_block_height = queued_block.0.height;
543
544 self.finalized_block_write_last_sent_hash = queued_block.0.hash;
545
546 // If we've finished sending finalized blocks, ignore any repeated blocks.
547 // (Blocks can be repeated after a syncer reset.)
548 if let Some(finalized_block_write_sender) = &self.finalized_block_write_sender {
549 let send_result = finalized_block_write_sender.send(queued_block);
550
551 // If the receiver is closed, we can't send any more blocks.
552 if let Err(SendError(queued)) = send_result {
553 // If Zebra is shutting down, drop blocks and return an error.
554 Self::send_checkpoint_verified_block_error(
555 queued,
556 "block commit task exited. Is Zebra shutting down?",
557 );
558
559 self.clear_finalized_block_queue(
560 "block commit task exited. Is Zebra shutting down?",
561 );
562 } else {
563 metrics::gauge!("state.checkpoint.sent.block.height")
564 .set(last_sent_finalized_block_height.0 as f64);
565 };
566 }
567 }
568 }
569
570 /// Drops all finalized state queue blocks, and sends an error on their result channels.
571 fn clear_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
572 for (_hash, queued) in self.finalized_state_queued_blocks.drain() {
573 Self::send_checkpoint_verified_block_error(queued, error.clone());
574 }
575 }
576
577 /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block
578 fn send_checkpoint_verified_block_error(
579 queued: QueuedCheckpointVerified,
580 error: impl Into<BoxError>,
581 ) {
582 let (finalized, rsp_tx) = queued;
583
584 // The block sender might have already given up on this block,
585 // so ignore any channel send errors.
586 let _ = rsp_tx.send(Err(error.into()));
587 std::mem::drop(finalized);
588 }
589
590 /// Drops all non-finalized state queue blocks, and sends an error on their result channels.
591 fn clear_non_finalized_block_queue(&mut self, error: impl Into<BoxError> + Clone) {
592 for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() {
593 Self::send_semantically_verified_block_error(queued, error.clone());
594 }
595 }
596
597 /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block
598 fn send_semantically_verified_block_error(
599 queued: QueuedSemanticallyVerified,
600 error: impl Into<BoxError>,
601 ) {
602 let (finalized, rsp_tx) = queued;
603
604 // The block sender might have already given up on this block,
605 // so ignore any channel send errors.
606 let _ = rsp_tx.send(Err(error.into()));
607 std::mem::drop(finalized);
608 }
609
610 /// Queue a semantically verified block for contextual verification and check if any queued
611 /// blocks are ready to be verified and committed to the state.
612 ///
613 /// This function encodes the logic for [committing non-finalized blocks][1]
614 /// in RFC0005.
615 ///
616 /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
617 #[instrument(level = "debug", skip(self, semantically_verrified))]
618 fn queue_and_commit_to_non_finalized_state(
619 &mut self,
620 semantically_verrified: SemanticallyVerifiedBlock,
621 ) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
622 tracing::debug!(block = %semantically_verrified.block, "queueing block for contextual verification");
623 let parent_hash = semantically_verrified.block.header.previous_block_hash;
624
625 if self
626 .non_finalized_block_write_sent_hashes
627 .contains(&semantically_verrified.hash)
628 {
629 let (rsp_tx, rsp_rx) = oneshot::channel();
630 let _ = rsp_tx.send(Err(
631 "block has already been sent to be committed to the state".into(),
632 ));
633 return rsp_rx;
634 }
635
636 if self
637 .read_service
638 .db
639 .contains_height(semantically_verrified.height)
640 {
641 let (rsp_tx, rsp_rx) = oneshot::channel();
642 let _ = rsp_tx.send(Err(
643 "block height is in the finalized state: block is already committed to the state"
644 .into(),
645 ));
646 return rsp_rx;
647 }
648
649 // [`Request::CommitSemanticallyVerifiedBlock`] contract: a request to commit a block which
650 // has been queued but not yet committed to the state fails the older request and replaces
651 // it with the newer request.
652 let rsp_rx = if let Some((_, old_rsp_tx)) = self
653 .non_finalized_state_queued_blocks
654 .get_mut(&semantically_verrified.hash)
655 {
656 tracing::debug!("replacing older queued request with new request");
657 let (mut rsp_tx, rsp_rx) = oneshot::channel();
658 std::mem::swap(old_rsp_tx, &mut rsp_tx);
659 let _ = rsp_tx.send(Err("replaced by newer request".into()));
660 rsp_rx
661 } else {
662 let (rsp_tx, rsp_rx) = oneshot::channel();
663 self.non_finalized_state_queued_blocks
664 .queue((semantically_verrified, rsp_tx));
665 rsp_rx
666 };
667
668 // We've finished sending checkpoint verified blocks when:
669 // - we've sent the verified block for the last checkpoint, and
670 // - it has been successfully written to disk.
671 //
672 // We detect the last checkpoint by looking for non-finalized blocks
673 // that are a child of the last block we sent.
674 //
675 // TODO: configure the state with the last checkpoint hash instead?
676 if self.finalized_block_write_sender.is_some()
677 && self
678 .non_finalized_state_queued_blocks
679 .has_queued_children(self.finalized_block_write_last_sent_hash)
680 && self.read_service.db.finalized_tip_hash()
681 == self.finalized_block_write_last_sent_hash
682 {
683 // Tell the block write task to stop committing checkpoint verified blocks to the finalized state,
684 // and move on to committing semantically verified blocks to the non-finalized state.
685 std::mem::drop(self.finalized_block_write_sender.take());
686 // Remove any checkpoint-verified block hashes from `non_finalized_block_write_sent_hashes`.
687 self.non_finalized_block_write_sent_hashes = SentHashes::default();
688 // Mark `SentHashes` as usable by the `can_fork_chain_at()` method.
689 self.non_finalized_block_write_sent_hashes
690 .can_fork_chain_at_hashes = true;
691 // Send blocks from non-finalized queue
692 self.send_ready_non_finalized_queued(self.finalized_block_write_last_sent_hash);
693 // We've finished committing checkpoint verified blocks to finalized state, so drop any repeated queued blocks.
694 self.clear_finalized_block_queue(
695 "already finished committing checkpoint verified blocks: dropped duplicate block, \
696 block is already committed to the state",
697 );
698 } else if !self.can_fork_chain_at(&parent_hash) {
699 tracing::trace!("unready to verify, returning early");
700 } else if self.finalized_block_write_sender.is_none() {
701 // Wait until block commit task is ready to write non-finalized blocks before dequeuing them
702 self.send_ready_non_finalized_queued(parent_hash);
703
704 let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
705 "Finalized state must have at least one block before committing non-finalized state",
706 );
707
708 self.non_finalized_state_queued_blocks
709 .prune_by_height(finalized_tip_height);
710
711 self.non_finalized_block_write_sent_hashes
712 .prune_by_height(finalized_tip_height);
713 }
714
715 rsp_rx
716 }
717
718 /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
719 fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
720 self.non_finalized_block_write_sent_hashes
721 .can_fork_chain_at(hash)
722 || &self.read_service.db.finalized_tip_hash() == hash
723 }
724
725 /// Returns `true` if `queued_height` is near the final checkpoint.
726 ///
727 /// The semantic block verifier needs access to UTXOs from checkpoint verified blocks
728 /// near the final checkpoint, so that it can verify blocks that spend those UTXOs.
729 ///
730 /// If it doesn't have the required UTXOs, some blocks will time out,
731 /// but succeed after a syncer restart.
732 fn is_close_to_final_checkpoint(&self, queued_height: block::Height) -> bool {
733 queued_height >= self.full_verifier_utxo_lookahead
734 }
735
736 /// Sends all queued blocks whose parents have recently arrived starting from `new_parent`
737 /// in breadth-first ordering to the block write task which will attempt to validate and commit them
738 #[tracing::instrument(level = "debug", skip(self, new_parent))]
739 fn send_ready_non_finalized_queued(&mut self, new_parent: block::Hash) {
740 use tokio::sync::mpsc::error::SendError;
741 if let Some(non_finalized_block_write_sender) = &self.non_finalized_block_write_sender {
742 let mut new_parents: Vec<block::Hash> = vec![new_parent];
743
744 while let Some(parent_hash) = new_parents.pop() {
745 let queued_children = self
746 .non_finalized_state_queued_blocks
747 .dequeue_children(parent_hash);
748
749 for queued_child in queued_children {
750 let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child;
751
752 self.non_finalized_block_write_sent_hashes
753 .add(&queued_child.0);
754 let send_result = non_finalized_block_write_sender.send(queued_child);
755
756 if let Err(SendError(queued)) = send_result {
757 // If Zebra is shutting down, drop blocks and return an error.
758 Self::send_semantically_verified_block_error(
759 queued,
760 "block commit task exited. Is Zebra shutting down?",
761 );
762
763 self.clear_non_finalized_block_queue(
764 "block commit task exited. Is Zebra shutting down?",
765 );
766
767 return;
768 };
769
770 new_parents.push(hash);
771 }
772 }
773
774 self.non_finalized_block_write_sent_hashes.finish_batch();
775 };
776 }
777
778 /// Return the tip of the current best chain.
779 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
780 self.read_service.best_tip()
781 }
782
783 /// Assert some assumptions about the semantically verified `block` before it is queued.
784 fn assert_block_can_be_validated(&self, block: &SemanticallyVerifiedBlock) {
785 // required by `Request::CommitSemanticallyVerifiedBlock` call
786 assert!(
787 block.height > self.network.mandatory_checkpoint_height(),
788 "invalid semantically verified block height: the canopy checkpoint is mandatory, pre-canopy \
789 blocks, and the canopy activation block, must be committed to the state as finalized \
790 blocks"
791 );
792 }
793}
794
795impl ReadStateService {
796 /// Creates a new read-only state service, using the provided finalized state and
797 /// block write task handle.
798 ///
799 /// Returns the newly created service,
800 /// and a watch channel for updating the shared recent non-finalized chain.
801 pub(crate) fn new(
802 finalized_state: &FinalizedState,
803 block_write_task: Option<Arc<std::thread::JoinHandle<()>>>,
804 non_finalized_state_receiver: watch::Receiver<NonFinalizedState>,
805 ) -> Self {
806 let read_service = Self {
807 network: finalized_state.network(),
808 db: finalized_state.db.clone(),
809 non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver),
810 block_write_task,
811 };
812
813 tracing::debug!("created new read-only state service");
814
815 read_service
816 }
817
818 /// Return the tip of the current best chain.
819 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
820 read::best_tip(&self.latest_non_finalized_state(), &self.db)
821 }
822
823 /// Gets a clone of the latest non-finalized state from the `non_finalized_state_receiver`
824 fn latest_non_finalized_state(&self) -> NonFinalizedState {
825 self.non_finalized_state_receiver.cloned_watch_data()
826 }
827
828 /// Gets a clone of the latest, best non-finalized chain from the `non_finalized_state_receiver`
829 #[allow(dead_code)]
830 fn latest_best_chain(&self) -> Option<Arc<Chain>> {
831 self.latest_non_finalized_state().best_chain().cloned()
832 }
833
834 /// Test-only access to the inner database.
835 /// Can be used to modify the database without doing any consensus checks.
836 #[cfg(any(test, feature = "proptest-impl"))]
837 pub fn db(&self) -> &ZebraDb {
838 &self.db
839 }
840
841 /// Logs rocksdb metrics using the read only state service.
842 pub fn log_db_metrics(&self) {
843 self.db.print_db_metrics();
844 }
845}
846
847impl Service<Request> for StateService {
848 type Response = Response;
849 type Error = BoxError;
850 type Future =
851 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
852
853 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
854 // Check for panics in the block write task
855 let poll = self.read_service.poll_ready(cx);
856
857 // Prune outdated UTXO requests
858 let now = Instant::now();
859
860 if self.last_prune + Self::PRUNE_INTERVAL < now {
861 let tip = self.best_tip();
862 let old_len = self.pending_utxos.len();
863
864 self.pending_utxos.prune();
865 self.last_prune = now;
866
867 let new_len = self.pending_utxos.len();
868 let prune_count = old_len
869 .checked_sub(new_len)
870 .expect("prune does not add any utxo requests");
871 if prune_count > 0 {
872 tracing::debug!(
873 ?old_len,
874 ?new_len,
875 ?prune_count,
876 ?tip,
877 "pruned utxo requests"
878 );
879 } else {
880 tracing::debug!(len = ?old_len, ?tip, "no utxo requests needed pruning");
881 }
882 }
883
884 poll
885 }
886
887 #[instrument(name = "state", skip(self, req))]
888 fn call(&mut self, req: Request) -> Self::Future {
889 req.count_metric();
890 let timer = CodeTimer::start();
891 let span = Span::current();
892
893 match req {
894 // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService
895 // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb.
896 Request::CommitSemanticallyVerifiedBlock(semantically_verified) => {
897 self.assert_block_can_be_validated(&semantically_verified);
898
899 self.pending_utxos
900 .check_against_ordered(&semantically_verified.new_outputs);
901
902 // # Performance
903 //
904 // Allow other async tasks to make progress while blocks are being verified
905 // and written to disk. But wait for the blocks to finish committing,
906 // so that `StateService` multi-block queries always observe a consistent state.
907 //
908 // Since each block is spawned into its own task,
909 // there shouldn't be any other code running in the same task,
910 // so we don't need to worry about blocking it:
911 // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
912
913 let rsp_rx = tokio::task::block_in_place(move || {
914 span.in_scope(|| {
915 self.queue_and_commit_to_non_finalized_state(semantically_verified)
916 })
917 });
918
919 // TODO:
920 // - check for panics in the block write task here,
921 // as well as in poll_ready()
922
923 // The work is all done, the future just waits on a channel for the result
924 timer.finish(module_path!(), line!(), "CommitSemanticallyVerifiedBlock");
925
926 let span = Span::current();
927 async move {
928 rsp_rx
929 .await
930 .map_err(|_recv_error| {
931 BoxError::from(
932 "block was dropped from the queue of non-finalized blocks",
933 )
934 })
935 // TODO: replace with Result::flatten once it stabilises
936 // https://github.com/rust-lang/rust/issues/70142
937 .and_then(convert::identity)
938 .map(Response::Committed)
939 }
940 .instrument(span)
941 .boxed()
942 }
943
944 // Uses finalized_state_queued_blocks and pending_utxos in the StateService.
945 // Accesses shared writeable state in the StateService.
946 Request::CommitCheckpointVerifiedBlock(finalized) => {
947 // # Consensus
948 //
949 // A semantic block verification could have called AwaitUtxo
950 // before this checkpoint verified block arrived in the state.
951 // So we need to check for pending UTXO requests sent by running
952 // semantic block verifications.
953 //
954 // This check is redundant for most checkpoint verified blocks,
955 // because semantic verification can only succeed near the final
956 // checkpoint, when all the UTXOs are available for the verifying block.
957 //
958 // (Checkpoint block UTXOs are verified using block hash checkpoints
959 // and transaction merkle tree block header commitments.)
960 self.pending_utxos
961 .check_against_ordered(&finalized.new_outputs);
962
963 // # Performance
964 //
965 // This method doesn't block, access the database, or perform CPU-intensive tasks,
966 // so we can run it directly in the tokio executor's Future threads.
967 let rsp_rx = self.queue_and_commit_to_finalized_state(finalized);
968
969 // TODO:
970 // - check for panics in the block write task here,
971 // as well as in poll_ready()
972
973 // The work is all done, the future just waits on a channel for the result
974 timer.finish(module_path!(), line!(), "CommitCheckpointVerifiedBlock");
975
976 async move {
977 rsp_rx
978 .await
979 .map_err(|_recv_error| {
980 BoxError::from("block was dropped from the queue of finalized blocks")
981 })
982 // TODO: replace with Result::flatten once it stabilises
983 // https://github.com/rust-lang/rust/issues/70142
984 .and_then(convert::identity)
985 .map(Response::Committed)
986 }
987 .instrument(span)
988 .boxed()
989 }
990
991 // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService.
992 // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService.
993 Request::AwaitUtxo(outpoint) => {
994 // Prepare the AwaitUtxo future from PendingUxtos.
995 let response_fut = self.pending_utxos.queue(outpoint);
996 // Only instrument `response_fut`, the ReadStateService already
997 // instruments its requests with the same span.
998
999 let response_fut = response_fut.instrument(span).boxed();
1000
1001 // Check the non-finalized block queue outside the returned future,
1002 // so we can access mutable state fields.
1003 if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) {
1004 self.pending_utxos.respond(&outpoint, utxo);
1005
1006 // We're finished, the returned future gets the UTXO from the respond() channel.
1007 timer.finish(module_path!(), line!(), "AwaitUtxo/queued-non-finalized");
1008
1009 return response_fut;
1010 }
1011
1012 // Check the sent non-finalized blocks
1013 if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) {
1014 self.pending_utxos.respond(&outpoint, utxo);
1015
1016 // We're finished, the returned future gets the UTXO from the respond() channel.
1017 timer.finish(module_path!(), line!(), "AwaitUtxo/sent-non-finalized");
1018
1019 return response_fut;
1020 }
1021
1022 // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks,
1023 // because it is only used during checkpoint verification.
1024 //
1025 // This creates a rare race condition, but it doesn't seem to happen much in practice.
1026 // See #5126 for details.
1027
1028 // Manually send a request to the ReadStateService,
1029 // to get UTXOs from any non-finalized chain or the finalized chain.
1030 let read_service = self.read_service.clone();
1031
1032 // Run the request in an async block, so we can await the response.
1033 async move {
1034 let req = ReadRequest::AnyChainUtxo(outpoint);
1035
1036 let rsp = read_service.oneshot(req).await?;
1037
1038 // Optional TODO:
1039 // - make pending_utxos.respond() async using a channel,
1040 // so we can respond to all waiting requests here
1041 //
1042 // This change is not required for correctness, because:
1043 // - any waiting requests should have returned when the block was sent to the state
1044 // - otherwise, the request returns immediately if:
1045 // - the block is in the non-finalized queue, or
1046 // - the block is in any non-finalized chain or the finalized state
1047 //
1048 // And if the block is in the finalized queue,
1049 // that's rare enough that a retry is ok.
1050 if let ReadResponse::AnyChainUtxo(Some(utxo)) = rsp {
1051 // We got a UTXO, so we replace the response future with the result own.
1052 timer.finish(module_path!(), line!(), "AwaitUtxo/any-chain");
1053
1054 return Ok(Response::Utxo(utxo));
1055 }
1056
1057 // We're finished, but the returned future is waiting on the respond() channel.
1058 timer.finish(module_path!(), line!(), "AwaitUtxo/waiting");
1059
1060 response_fut.await
1061 }
1062 .boxed()
1063 }
1064
1065 // Used by sync, inbound, and block verifier to check if a block is already in the state
1066 // before downloading or validating it.
1067 Request::KnownBlock(hash) => {
1068 let timer = CodeTimer::start();
1069
1070 let read_service = self.read_service.clone();
1071
1072 async move {
1073 let response = read::non_finalized_state_contains_block_hash(
1074 &read_service.latest_non_finalized_state(),
1075 hash,
1076 )
1077 .or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));
1078
1079 // The work is done in the future.
1080 timer.finish(module_path!(), line!(), "Request::KnownBlock");
1081
1082 Ok(Response::KnownBlock(response))
1083 }
1084 .boxed()
1085 }
1086
1087 // Runs concurrently using the ReadStateService
1088 Request::Tip
1089 | Request::Depth(_)
1090 | Request::BestChainNextMedianTimePast
1091 | Request::BestChainBlockHash(_)
1092 | Request::BlockLocator
1093 | Request::Transaction(_)
1094 | Request::UnspentBestChainUtxo(_)
1095 | Request::Block(_)
1096 | Request::BlockAndSize(_)
1097 | Request::BlockHeader(_)
1098 | Request::FindBlockHashes { .. }
1099 | Request::FindBlockHeaders { .. }
1100 | Request::CheckBestChainTipNullifiersAndAnchors(_) => {
1101 // Redirect the request to the concurrent ReadStateService
1102 let read_service = self.read_service.clone();
1103
1104 async move {
1105 let req = req
1106 .try_into()
1107 .expect("ReadRequest conversion should not fail");
1108
1109 let rsp = read_service.oneshot(req).await?;
1110 let rsp = rsp.try_into().expect("Response conversion should not fail");
1111
1112 Ok(rsp)
1113 }
1114 .boxed()
1115 }
1116
1117 Request::CheckBlockProposalValidity(_) => {
1118 // Redirect the request to the concurrent ReadStateService
1119 let read_service = self.read_service.clone();
1120
1121 async move {
1122 let req = req
1123 .try_into()
1124 .expect("ReadRequest conversion should not fail");
1125
1126 let rsp = read_service.oneshot(req).await?;
1127 let rsp = rsp.try_into().expect("Response conversion should not fail");
1128
1129 Ok(rsp)
1130 }
1131 .boxed()
1132 }
1133 }
1134 }
1135}
1136
1137impl Service<ReadRequest> for ReadStateService {
1138 type Response = ReadResponse;
1139 type Error = BoxError;
1140 type Future =
1141 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1142
1143 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1144 // Check for panics in the block write task
1145 //
1146 // TODO: move into a check_for_panics() method
1147 let block_write_task = self.block_write_task.take();
1148
1149 if let Some(block_write_task) = block_write_task {
1150 if block_write_task.is_finished() {
1151 if let Some(block_write_task) = Arc::into_inner(block_write_task) {
1152 // We are the last state with a reference to this task, so we can propagate any panics
1153 if let Err(thread_panic) = block_write_task.join() {
1154 std::panic::resume_unwind(thread_panic);
1155 }
1156 }
1157 } else {
1158 // It hasn't finished, so we need to put it back
1159 self.block_write_task = Some(block_write_task);
1160 }
1161 }
1162
1163 self.db.check_for_panics();
1164
1165 Poll::Ready(Ok(()))
1166 }
1167
1168 #[instrument(name = "read_state", skip(self, req))]
1169 fn call(&mut self, req: ReadRequest) -> Self::Future {
1170 req.count_metric();
1171 let timer = CodeTimer::start();
1172 let span = Span::current();
1173
1174 match req {
1175 // Used by the `getblockchaininfo` RPC.
1176 ReadRequest::UsageInfo => {
1177 let db = self.db.clone();
1178
1179 tokio::task::spawn_blocking(move || {
1180 span.in_scope(move || {
1181 // The work is done in the future.
1182
1183 let db_size = db.size();
1184
1185 timer.finish(module_path!(), line!(), "ReadRequest::UsageInfo");
1186
1187 Ok(ReadResponse::UsageInfo(db_size))
1188 })
1189 })
1190 .wait_for_panics()
1191 }
1192
1193 // Used by the StateService.
1194 ReadRequest::Tip => {
1195 let state = self.clone();
1196
1197 tokio::task::spawn_blocking(move || {
1198 span.in_scope(move || {
1199 let tip = state.non_finalized_state_receiver.with_watch_data(
1200 |non_finalized_state| {
1201 read::tip(non_finalized_state.best_chain(), &state.db)
1202 },
1203 );
1204
1205 // The work is done in the future.
1206 timer.finish(module_path!(), line!(), "ReadRequest::Tip");
1207
1208 Ok(ReadResponse::Tip(tip))
1209 })
1210 })
1211 .wait_for_panics()
1212 }
1213
1214 // Used by `getblockchaininfo` RPC method.
1215 ReadRequest::TipPoolValues => {
1216 let state = self.clone();
1217
1218 tokio::task::spawn_blocking(move || {
1219 span.in_scope(move || {
1220 let tip_with_value_balance = state
1221 .non_finalized_state_receiver
1222 .with_watch_data(|non_finalized_state| {
1223 read::tip_with_value_balance(
1224 non_finalized_state.best_chain(),
1225 &state.db,
1226 )
1227 });
1228
1229 // The work is done in the future.
1230 // TODO: Do this in the Drop impl with the variant name?
1231 timer.finish(module_path!(), line!(), "ReadRequest::TipPoolValues");
1232
1233 let (tip_height, tip_hash, value_balance) = tip_with_value_balance?
1234 .ok_or(BoxError::from("no chain tip available yet"))?;
1235
1236 Ok(ReadResponse::TipPoolValues {
1237 tip_height,
1238 tip_hash,
1239 value_balance,
1240 })
1241 })
1242 })
1243 .wait_for_panics()
1244 }
1245
1246 // Used by getblock
1247 ReadRequest::BlockInfo(hash_or_height) => {
1248 let state = self.clone();
1249
1250 tokio::task::spawn_blocking(move || {
1251 span.in_scope(move || {
1252 let value_balance = state.non_finalized_state_receiver.with_watch_data(
1253 |non_finalized_state| {
1254 read::block_info(
1255 non_finalized_state.best_chain(),
1256 &state.db,
1257 hash_or_height,
1258 )
1259 },
1260 );
1261
1262 // The work is done in the future.
1263 // TODO: Do this in the Drop impl with the variant name?
1264 timer.finish(module_path!(), line!(), "ReadRequest::BlockInfo");
1265
1266 Ok(ReadResponse::BlockInfo(value_balance))
1267 })
1268 })
1269 .wait_for_panics()
1270 }
1271
1272 // Used by the StateService.
1273 ReadRequest::Depth(hash) => {
1274 let state = self.clone();
1275
1276 tokio::task::spawn_blocking(move || {
1277 span.in_scope(move || {
1278 let depth = state.non_finalized_state_receiver.with_watch_data(
1279 |non_finalized_state| {
1280 read::depth(non_finalized_state.best_chain(), &state.db, hash)
1281 },
1282 );
1283
1284 // The work is done in the future.
1285 timer.finish(module_path!(), line!(), "ReadRequest::Depth");
1286
1287 Ok(ReadResponse::Depth(depth))
1288 })
1289 })
1290 .wait_for_panics()
1291 }
1292
1293 // Used by the StateService.
1294 ReadRequest::BestChainNextMedianTimePast => {
1295 let state = self.clone();
1296
1297 tokio::task::spawn_blocking(move || {
1298 span.in_scope(move || {
1299 let non_finalized_state = state.latest_non_finalized_state();
1300 let median_time_past =
1301 read::next_median_time_past(&non_finalized_state, &state.db);
1302
1303 // The work is done in the future.
1304 timer.finish(
1305 module_path!(),
1306 line!(),
1307 "ReadRequest::BestChainNextMedianTimePast",
1308 );
1309
1310 Ok(ReadResponse::BestChainNextMedianTimePast(median_time_past?))
1311 })
1312 })
1313 .wait_for_panics()
1314 }
1315
1316 // Used by the get_block (raw) RPC and the StateService.
1317 ReadRequest::Block(hash_or_height) => {
1318 let state = self.clone();
1319
1320 tokio::task::spawn_blocking(move || {
1321 span.in_scope(move || {
1322 let block = state.non_finalized_state_receiver.with_watch_data(
1323 |non_finalized_state| {
1324 read::block(
1325 non_finalized_state.best_chain(),
1326 &state.db,
1327 hash_or_height,
1328 )
1329 },
1330 );
1331
1332 // The work is done in the future.
1333 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1334
1335 Ok(ReadResponse::Block(block))
1336 })
1337 })
1338 .wait_for_panics()
1339 }
1340
1341 // Used by the get_block (raw) RPC and the StateService.
1342 ReadRequest::BlockAndSize(hash_or_height) => {
1343 let state = self.clone();
1344
1345 tokio::task::spawn_blocking(move || {
1346 span.in_scope(move || {
1347 let block_and_size = state.non_finalized_state_receiver.with_watch_data(
1348 |non_finalized_state| {
1349 read::block_and_size(
1350 non_finalized_state.best_chain(),
1351 &state.db,
1352 hash_or_height,
1353 )
1354 },
1355 );
1356
1357 // The work is done in the future.
1358 timer.finish(module_path!(), line!(), "ReadRequest::BlockAndSize");
1359
1360 Ok(ReadResponse::BlockAndSize(block_and_size))
1361 })
1362 })
1363 .wait_for_panics()
1364 }
1365
1366 // Used by the get_block (verbose) RPC and the StateService.
1367 ReadRequest::BlockHeader(hash_or_height) => {
1368 let state = self.clone();
1369
1370 tokio::task::spawn_blocking(move || {
1371 span.in_scope(move || {
1372 let best_chain = state.latest_best_chain();
1373
1374 let height = hash_or_height
1375 .height_or_else(|hash| {
1376 read::find::height_by_hash(best_chain.clone(), &state.db, hash)
1377 })
1378 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1379
1380 let hash = hash_or_height
1381 .hash_or_else(|height| {
1382 read::find::hash_by_height(best_chain.clone(), &state.db, height)
1383 })
1384 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1385
1386 let next_height = height.next()?;
1387 let next_block_hash =
1388 read::find::hash_by_height(best_chain.clone(), &state.db, next_height);
1389
1390 let header = read::block_header(best_chain, &state.db, height.into())
1391 .ok_or_else(|| BoxError::from("block hash or height not found"))?;
1392
1393 // The work is done in the future.
1394 timer.finish(module_path!(), line!(), "ReadRequest::Block");
1395
1396 Ok(ReadResponse::BlockHeader {
1397 header,
1398 hash,
1399 height,
1400 next_block_hash,
1401 })
1402 })
1403 })
1404 .wait_for_panics()
1405 }
1406
1407 // For the get_raw_transaction RPC and the StateService.
1408 ReadRequest::Transaction(hash) => {
1409 let state = self.clone();
1410
1411 tokio::task::spawn_blocking(move || {
1412 span.in_scope(move || {
1413 let response =
1414 read::mined_transaction(state.latest_best_chain(), &state.db, hash);
1415
1416 // The work is done in the future.
1417 timer.finish(module_path!(), line!(), "ReadRequest::Transaction");
1418
1419 Ok(ReadResponse::Transaction(response))
1420 })
1421 })
1422 .wait_for_panics()
1423 }
1424
1425 // Used by the getblock (verbose) RPC.
1426 ReadRequest::TransactionIdsForBlock(hash_or_height) => {
1427 let state = self.clone();
1428
1429 tokio::task::spawn_blocking(move || {
1430 span.in_scope(move || {
1431 let transaction_ids = state.non_finalized_state_receiver.with_watch_data(
1432 |non_finalized_state| {
1433 read::transaction_hashes_for_block(
1434 non_finalized_state.best_chain(),
1435 &state.db,
1436 hash_or_height,
1437 )
1438 },
1439 );
1440
1441 // The work is done in the future.
1442 timer.finish(
1443 module_path!(),
1444 line!(),
1445 "ReadRequest::TransactionIdsForBlock",
1446 );
1447
1448 Ok(ReadResponse::TransactionIdsForBlock(transaction_ids))
1449 })
1450 })
1451 .wait_for_panics()
1452 }
1453
1454 #[cfg(feature = "indexer")]
1455 ReadRequest::SpendingTransactionId(spend) => {
1456 let state = self.clone();
1457
1458 tokio::task::spawn_blocking(move || {
1459 span.in_scope(move || {
1460 let spending_transaction_id = state
1461 .non_finalized_state_receiver
1462 .with_watch_data(|non_finalized_state| {
1463 read::spending_transaction_hash(
1464 non_finalized_state.best_chain(),
1465 &state.db,
1466 spend,
1467 )
1468 });
1469
1470 // The work is done in the future.
1471 timer.finish(
1472 module_path!(),
1473 line!(),
1474 "ReadRequest::TransactionIdForSpentOutPoint",
1475 );
1476
1477 Ok(ReadResponse::TransactionId(spending_transaction_id))
1478 })
1479 })
1480 .wait_for_panics()
1481 }
1482
1483 ReadRequest::UnspentBestChainUtxo(outpoint) => {
1484 let state = self.clone();
1485
1486 tokio::task::spawn_blocking(move || {
1487 span.in_scope(move || {
1488 let utxo = state.non_finalized_state_receiver.with_watch_data(
1489 |non_finalized_state| {
1490 read::unspent_utxo(
1491 non_finalized_state.best_chain(),
1492 &state.db,
1493 outpoint,
1494 )
1495 },
1496 );
1497
1498 // The work is done in the future.
1499 timer.finish(module_path!(), line!(), "ReadRequest::UnspentBestChainUtxo");
1500
1501 Ok(ReadResponse::UnspentBestChainUtxo(utxo))
1502 })
1503 })
1504 .wait_for_panics()
1505 }
1506
1507 // Manually used by the StateService to implement part of AwaitUtxo.
1508 ReadRequest::AnyChainUtxo(outpoint) => {
1509 let state = self.clone();
1510
1511 tokio::task::spawn_blocking(move || {
1512 span.in_scope(move || {
1513 let utxo = state.non_finalized_state_receiver.with_watch_data(
1514 |non_finalized_state| {
1515 read::any_utxo(non_finalized_state, &state.db, outpoint)
1516 },
1517 );
1518
1519 // The work is done in the future.
1520 timer.finish(module_path!(), line!(), "ReadRequest::AnyChainUtxo");
1521
1522 Ok(ReadResponse::AnyChainUtxo(utxo))
1523 })
1524 })
1525 .wait_for_panics()
1526 }
1527
1528 // Used by the StateService.
1529 ReadRequest::BlockLocator => {
1530 let state = self.clone();
1531
1532 tokio::task::spawn_blocking(move || {
1533 span.in_scope(move || {
1534 let block_locator = state.non_finalized_state_receiver.with_watch_data(
1535 |non_finalized_state| {
1536 read::block_locator(non_finalized_state.best_chain(), &state.db)
1537 },
1538 );
1539
1540 // The work is done in the future.
1541 timer.finish(module_path!(), line!(), "ReadRequest::BlockLocator");
1542
1543 Ok(ReadResponse::BlockLocator(
1544 block_locator.unwrap_or_default(),
1545 ))
1546 })
1547 })
1548 .wait_for_panics()
1549 }
1550
1551 // Used by the StateService.
1552 ReadRequest::FindBlockHashes { known_blocks, stop } => {
1553 let state = self.clone();
1554
1555 tokio::task::spawn_blocking(move || {
1556 span.in_scope(move || {
1557 let block_hashes = state.non_finalized_state_receiver.with_watch_data(
1558 |non_finalized_state| {
1559 read::find_chain_hashes(
1560 non_finalized_state.best_chain(),
1561 &state.db,
1562 known_blocks,
1563 stop,
1564 MAX_FIND_BLOCK_HASHES_RESULTS,
1565 )
1566 },
1567 );
1568
1569 // The work is done in the future.
1570 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHashes");
1571
1572 Ok(ReadResponse::BlockHashes(block_hashes))
1573 })
1574 })
1575 .wait_for_panics()
1576 }
1577
1578 // Used by the StateService.
1579 ReadRequest::FindBlockHeaders { known_blocks, stop } => {
1580 let state = self.clone();
1581
1582 tokio::task::spawn_blocking(move || {
1583 span.in_scope(move || {
1584 let block_headers = state.non_finalized_state_receiver.with_watch_data(
1585 |non_finalized_state| {
1586 read::find_chain_headers(
1587 non_finalized_state.best_chain(),
1588 &state.db,
1589 known_blocks,
1590 stop,
1591 MAX_FIND_BLOCK_HEADERS_RESULTS,
1592 )
1593 },
1594 );
1595
1596 let block_headers = block_headers
1597 .into_iter()
1598 .map(|header| CountedHeader { header })
1599 .collect();
1600
1601 // The work is done in the future.
1602 timer.finish(module_path!(), line!(), "ReadRequest::FindBlockHeaders");
1603
1604 Ok(ReadResponse::BlockHeaders(block_headers))
1605 })
1606 })
1607 .wait_for_panics()
1608 }
1609
1610 ReadRequest::SaplingTree(hash_or_height) => {
1611 let state = self.clone();
1612
1613 tokio::task::spawn_blocking(move || {
1614 span.in_scope(move || {
1615 let sapling_tree = state.non_finalized_state_receiver.with_watch_data(
1616 |non_finalized_state| {
1617 read::sapling_tree(
1618 non_finalized_state.best_chain(),
1619 &state.db,
1620 hash_or_height,
1621 )
1622 },
1623 );
1624
1625 // The work is done in the future.
1626 timer.finish(module_path!(), line!(), "ReadRequest::SaplingTree");
1627
1628 Ok(ReadResponse::SaplingTree(sapling_tree))
1629 })
1630 })
1631 .wait_for_panics()
1632 }
1633
1634 ReadRequest::OrchardTree(hash_or_height) => {
1635 let state = self.clone();
1636
1637 tokio::task::spawn_blocking(move || {
1638 span.in_scope(move || {
1639 let orchard_tree = state.non_finalized_state_receiver.with_watch_data(
1640 |non_finalized_state| {
1641 read::orchard_tree(
1642 non_finalized_state.best_chain(),
1643 &state.db,
1644 hash_or_height,
1645 )
1646 },
1647 );
1648
1649 // The work is done in the future.
1650 timer.finish(module_path!(), line!(), "ReadRequest::OrchardTree");
1651
1652 Ok(ReadResponse::OrchardTree(orchard_tree))
1653 })
1654 })
1655 .wait_for_panics()
1656 }
1657
1658 ReadRequest::SaplingSubtrees { start_index, limit } => {
1659 let state = self.clone();
1660
1661 tokio::task::spawn_blocking(move || {
1662 span.in_scope(move || {
1663 let end_index = limit
1664 .and_then(|limit| start_index.0.checked_add(limit.0))
1665 .map(NoteCommitmentSubtreeIndex);
1666
1667 let sapling_subtrees = state.non_finalized_state_receiver.with_watch_data(
1668 |non_finalized_state| {
1669 if let Some(end_index) = end_index {
1670 read::sapling_subtrees(
1671 non_finalized_state.best_chain(),
1672 &state.db,
1673 start_index..end_index,
1674 )
1675 } else {
1676 // If there is no end bound, just return all the trees.
1677 // If the end bound would overflow, just returns all the trees, because that's what
1678 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1679 // the trees run out.)
1680 read::sapling_subtrees(
1681 non_finalized_state.best_chain(),
1682 &state.db,
1683 start_index..,
1684 )
1685 }
1686 },
1687 );
1688
1689 // The work is done in the future.
1690 timer.finish(module_path!(), line!(), "ReadRequest::SaplingSubtrees");
1691
1692 Ok(ReadResponse::SaplingSubtrees(sapling_subtrees))
1693 })
1694 })
1695 .wait_for_panics()
1696 }
1697
1698 ReadRequest::OrchardSubtrees { start_index, limit } => {
1699 let state = self.clone();
1700
1701 tokio::task::spawn_blocking(move || {
1702 span.in_scope(move || {
1703 let end_index = limit
1704 .and_then(|limit| start_index.0.checked_add(limit.0))
1705 .map(NoteCommitmentSubtreeIndex);
1706
1707 let orchard_subtrees = state.non_finalized_state_receiver.with_watch_data(
1708 |non_finalized_state| {
1709 if let Some(end_index) = end_index {
1710 read::orchard_subtrees(
1711 non_finalized_state.best_chain(),
1712 &state.db,
1713 start_index..end_index,
1714 )
1715 } else {
1716 // If there is no end bound, just return all the trees.
1717 // If the end bound would overflow, just returns all the trees, because that's what
1718 // `zcashd` does. (It never calculates an end bound, so it just keeps iterating until
1719 // the trees run out.)
1720 read::orchard_subtrees(
1721 non_finalized_state.best_chain(),
1722 &state.db,
1723 start_index..,
1724 )
1725 }
1726 },
1727 );
1728
1729 // The work is done in the future.
1730 timer.finish(module_path!(), line!(), "ReadRequest::OrchardSubtrees");
1731
1732 Ok(ReadResponse::OrchardSubtrees(orchard_subtrees))
1733 })
1734 })
1735 .wait_for_panics()
1736 }
1737
1738 // For the get_address_balance RPC.
1739 ReadRequest::AddressBalance(addresses) => {
1740 let state = self.clone();
1741
1742 tokio::task::spawn_blocking(move || {
1743 span.in_scope(move || {
1744 let (balance, received) = state
1745 .non_finalized_state_receiver
1746 .with_watch_data(|non_finalized_state| {
1747 read::transparent_balance(
1748 non_finalized_state.best_chain().cloned(),
1749 &state.db,
1750 addresses,
1751 )
1752 })?;
1753
1754 // The work is done in the future.
1755 timer.finish(module_path!(), line!(), "ReadRequest::AddressBalance");
1756
1757 Ok(ReadResponse::AddressBalance { balance, received })
1758 })
1759 })
1760 .wait_for_panics()
1761 }
1762
1763 // For the get_address_tx_ids RPC.
1764 ReadRequest::TransactionIdsByAddresses {
1765 addresses,
1766 height_range,
1767 } => {
1768 let state = self.clone();
1769
1770 tokio::task::spawn_blocking(move || {
1771 span.in_scope(move || {
1772 let tx_ids = state.non_finalized_state_receiver.with_watch_data(
1773 |non_finalized_state| {
1774 read::transparent_tx_ids(
1775 non_finalized_state.best_chain(),
1776 &state.db,
1777 addresses,
1778 height_range,
1779 )
1780 },
1781 );
1782
1783 // The work is done in the future.
1784 timer.finish(
1785 module_path!(),
1786 line!(),
1787 "ReadRequest::TransactionIdsByAddresses",
1788 );
1789
1790 tx_ids.map(ReadResponse::AddressesTransactionIds)
1791 })
1792 })
1793 .wait_for_panics()
1794 }
1795
1796 // For the get_address_utxos RPC.
1797 ReadRequest::UtxosByAddresses(addresses) => {
1798 let state = self.clone();
1799
1800 tokio::task::spawn_blocking(move || {
1801 span.in_scope(move || {
1802 let utxos = state.non_finalized_state_receiver.with_watch_data(
1803 |non_finalized_state| {
1804 read::address_utxos(
1805 &state.network,
1806 non_finalized_state.best_chain(),
1807 &state.db,
1808 addresses,
1809 )
1810 },
1811 );
1812
1813 // The work is done in the future.
1814 timer.finish(module_path!(), line!(), "ReadRequest::UtxosByAddresses");
1815
1816 utxos.map(ReadResponse::AddressUtxos)
1817 })
1818 })
1819 .wait_for_panics()
1820 }
1821
1822 ReadRequest::CheckBestChainTipNullifiersAndAnchors(unmined_tx) => {
1823 let state = self.clone();
1824
1825 tokio::task::spawn_blocking(move || {
1826 span.in_scope(move || {
1827 let latest_non_finalized_best_chain =
1828 state.latest_non_finalized_state().best_chain().cloned();
1829
1830 check::nullifier::tx_no_duplicates_in_chain(
1831 &state.db,
1832 latest_non_finalized_best_chain.as_ref(),
1833 &unmined_tx.transaction,
1834 )?;
1835
1836 check::anchors::tx_anchors_refer_to_final_treestates(
1837 &state.db,
1838 latest_non_finalized_best_chain.as_ref(),
1839 &unmined_tx,
1840 )?;
1841
1842 // The work is done in the future.
1843 timer.finish(
1844 module_path!(),
1845 line!(),
1846 "ReadRequest::CheckBestChainTipNullifiersAndAnchors",
1847 );
1848
1849 Ok(ReadResponse::ValidBestChainTipNullifiersAndAnchors)
1850 })
1851 })
1852 .wait_for_panics()
1853 }
1854
1855 // Used by the get_block and get_block_hash RPCs.
1856 ReadRequest::BestChainBlockHash(height) => {
1857 let state = self.clone();
1858
1859 // # Performance
1860 //
1861 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1862
1863 tokio::task::spawn_blocking(move || {
1864 span.in_scope(move || {
1865 let hash = state.non_finalized_state_receiver.with_watch_data(
1866 |non_finalized_state| {
1867 read::hash_by_height(
1868 non_finalized_state.best_chain(),
1869 &state.db,
1870 height,
1871 )
1872 },
1873 );
1874
1875 // The work is done in the future.
1876 timer.finish(module_path!(), line!(), "ReadRequest::BestChainBlockHash");
1877
1878 Ok(ReadResponse::BlockHash(hash))
1879 })
1880 })
1881 .wait_for_panics()
1882 }
1883
1884 // Used by get_block_template and getblockchaininfo RPCs.
1885 ReadRequest::ChainInfo => {
1886 let state = self.clone();
1887 let latest_non_finalized_state = self.latest_non_finalized_state();
1888
1889 // # Performance
1890 //
1891 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1892
1893 tokio::task::spawn_blocking(move || {
1894 span.in_scope(move || {
1895 // # Correctness
1896 //
1897 // It is ok to do these lookups using multiple database calls. Finalized state updates
1898 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1899 //
1900 // If there is a large overlap between the non-finalized and finalized states,
1901 // where the finalized tip is above the non-finalized tip,
1902 // Zebra is receiving a lot of blocks, or this request has been delayed for a long time.
1903 //
1904 // In that case, the `getblocktemplate` RPC will return an error because Zebra
1905 // is not synced to the tip. That check happens before the RPC makes this request.
1906 let get_block_template_info =
1907 read::difficulty::get_block_template_chain_info(
1908 &latest_non_finalized_state,
1909 &state.db,
1910 &state.network,
1911 );
1912
1913 // The work is done in the future.
1914 timer.finish(module_path!(), line!(), "ReadRequest::ChainInfo");
1915
1916 get_block_template_info.map(ReadResponse::ChainInfo)
1917 })
1918 })
1919 .wait_for_panics()
1920 }
1921
1922 // Used by getmininginfo, getnetworksolps, and getnetworkhashps RPCs.
1923 ReadRequest::SolutionRate { num_blocks, height } => {
1924 let state = self.clone();
1925
1926 // # Performance
1927 //
1928 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1929
1930 tokio::task::spawn_blocking(move || {
1931 span.in_scope(move || {
1932 let latest_non_finalized_state = state.latest_non_finalized_state();
1933 // # Correctness
1934 //
1935 // It is ok to do these lookups using multiple database calls. Finalized state updates
1936 // can only add overlapping blocks, and block hashes are unique across all chain forks.
1937 //
1938 // The worst that can happen here is that the default `start_hash` will be below
1939 // the chain tip.
1940 let (tip_height, tip_hash) =
1941 match read::tip(latest_non_finalized_state.best_chain(), &state.db) {
1942 Some(tip_hash) => tip_hash,
1943 None => return Ok(ReadResponse::SolutionRate(None)),
1944 };
1945
1946 let start_hash = match height {
1947 Some(height) if height < tip_height => read::hash_by_height(
1948 latest_non_finalized_state.best_chain(),
1949 &state.db,
1950 height,
1951 ),
1952 // use the chain tip hash if height is above it or not provided.
1953 _ => Some(tip_hash),
1954 };
1955
1956 let solution_rate = start_hash.and_then(|start_hash| {
1957 read::difficulty::solution_rate(
1958 &latest_non_finalized_state,
1959 &state.db,
1960 num_blocks,
1961 start_hash,
1962 )
1963 });
1964
1965 // The work is done in the future.
1966 timer.finish(module_path!(), line!(), "ReadRequest::SolutionRate");
1967
1968 Ok(ReadResponse::SolutionRate(solution_rate))
1969 })
1970 })
1971 .wait_for_panics()
1972 }
1973
1974 ReadRequest::CheckBlockProposalValidity(semantically_verified) => {
1975 let state = self.clone();
1976
1977 // # Performance
1978 //
1979 // Allow other async tasks to make progress while concurrently reading blocks from disk.
1980
1981 tokio::task::spawn_blocking(move || {
1982 span.in_scope(move || {
1983 tracing::debug!("attempting to validate and commit block proposal onto a cloned non-finalized state");
1984 let mut latest_non_finalized_state = state.latest_non_finalized_state();
1985
1986 // The previous block of a valid proposal must be on the best chain tip.
1987 let Some((_best_tip_height, best_tip_hash)) = read::best_tip(&latest_non_finalized_state, &state.db) else {
1988 return Err("state is empty: wait for Zebra to sync before submitting a proposal".into());
1989 };
1990
1991 if semantically_verified.block.header.previous_block_hash != best_tip_hash {
1992 return Err("proposal is not based on the current best chain tip: previous block hash must be the best chain tip".into());
1993 }
1994
1995 // This clone of the non-finalized state is dropped when this closure returns.
1996 // The non-finalized state that's used in the rest of the state (including finalizing
1997 // blocks into the db) is not mutated here.
1998 //
1999 // TODO: Convert `CommitSemanticallyVerifiedError` to a new `ValidateProposalError`?
2000 latest_non_finalized_state.disable_metrics();
2001
2002 write::validate_and_commit_non_finalized(
2003 &state.db,
2004 &mut latest_non_finalized_state,
2005 semantically_verified,
2006 )?;
2007
2008 // The work is done in the future.
2009 timer.finish(
2010 module_path!(),
2011 line!(),
2012 "ReadRequest::CheckBlockProposalValidity",
2013 );
2014
2015 Ok(ReadResponse::ValidBlockProposal)
2016 })
2017 })
2018 .wait_for_panics()
2019 }
2020
2021 ReadRequest::TipBlockSize => {
2022 let state = self.clone();
2023
2024 tokio::task::spawn_blocking(move || {
2025 span.in_scope(move || {
2026 // Get the best chain tip height.
2027 let tip_height = state
2028 .non_finalized_state_receiver
2029 .with_watch_data(|non_finalized_state| {
2030 read::tip_height(non_finalized_state.best_chain(), &state.db)
2031 })
2032 .unwrap_or(Height(0));
2033
2034 // Get the block at the best chain tip height.
2035 let block = state.non_finalized_state_receiver.with_watch_data(
2036 |non_finalized_state| {
2037 read::block(
2038 non_finalized_state.best_chain(),
2039 &state.db,
2040 tip_height.into(),
2041 )
2042 },
2043 );
2044
2045 // The work is done in the future.
2046 timer.finish(module_path!(), line!(), "ReadRequest::TipBlockSize");
2047
2048 // Respond with the length of the obtained block if any.
2049 match block {
2050 Some(b) => Ok(ReadResponse::TipBlockSize(Some(
2051 b.zcash_serialize_to_vec()?.len(),
2052 ))),
2053 None => Ok(ReadResponse::TipBlockSize(None)),
2054 }
2055 })
2056 })
2057 .wait_for_panics()
2058 }
2059 }
2060 }
2061}
2062
2063/// Initialize a state service from the provided [`Config`].
2064/// Returns a boxed state service, a read-only state service,
2065/// and receivers for state chain tip updates.
2066///
2067/// Each `network` has its own separate on-disk database.
2068///
2069/// The state uses the `max_checkpoint_height` and `checkpoint_verify_concurrency_limit`
2070/// to work out when it is near the final checkpoint.
2071///
2072/// To share access to the state, wrap the returned service in a `Buffer`,
2073/// or clone the returned [`ReadStateService`].
2074///
2075/// It's possible to construct multiple state services in the same application (as
2076/// long as they, e.g., use different storage locations), but doing so is
2077/// probably not what you want.
2078pub fn init(
2079 config: Config,
2080 network: &Network,
2081 max_checkpoint_height: block::Height,
2082 checkpoint_verify_concurrency_limit: usize,
2083) -> (
2084 BoxService<Request, Response, BoxError>,
2085 ReadStateService,
2086 LatestChainTip,
2087 ChainTipChange,
2088) {
2089 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
2090 StateService::new(
2091 config,
2092 network,
2093 max_checkpoint_height,
2094 checkpoint_verify_concurrency_limit,
2095 );
2096
2097 (
2098 BoxService::new(state_service),
2099 read_only_state_service,
2100 latest_chain_tip,
2101 chain_tip_change,
2102 )
2103}
2104
2105/// Initialize a read state service from the provided [`Config`].
2106/// Returns a read-only state service,
2107///
2108/// Each `network` has its own separate on-disk database.
2109///
2110/// To share access to the state, clone the returned [`ReadStateService`].
2111pub fn init_read_only(
2112 config: Config,
2113 network: &Network,
2114) -> (
2115 ReadStateService,
2116 ZebraDb,
2117 tokio::sync::watch::Sender<NonFinalizedState>,
2118) {
2119 let finalized_state = FinalizedState::new_with_debug(
2120 &config,
2121 network,
2122 true,
2123 #[cfg(feature = "elasticsearch")]
2124 false,
2125 true,
2126 );
2127 let (non_finalized_state_sender, non_finalized_state_receiver) =
2128 tokio::sync::watch::channel(NonFinalizedState::new(network));
2129
2130 (
2131 ReadStateService::new(&finalized_state, None, non_finalized_state_receiver),
2132 finalized_state.db.clone(),
2133 non_finalized_state_sender,
2134 )
2135}
2136
2137/// Calls [`init_read_only`] with the provided [`Config`] and [`Network`] from a blocking task.
2138/// Returns a [`tokio::task::JoinHandle`] with a read state service and chain tip sender.
2139pub fn spawn_init_read_only(
2140 config: Config,
2141 network: &Network,
2142) -> tokio::task::JoinHandle<(
2143 ReadStateService,
2144 ZebraDb,
2145 tokio::sync::watch::Sender<NonFinalizedState>,
2146)> {
2147 let network = network.clone();
2148 tokio::task::spawn_blocking(move || init_read_only(config, &network))
2149}
2150
2151/// Calls [`init`] with the provided [`Config`] and [`Network`] from a blocking task.
2152/// Returns a [`tokio::task::JoinHandle`] with a boxed state service,
2153/// a read state service, and receivers for state chain tip updates.
2154pub fn spawn_init(
2155 config: Config,
2156 network: &Network,
2157 max_checkpoint_height: block::Height,
2158 checkpoint_verify_concurrency_limit: usize,
2159) -> tokio::task::JoinHandle<(
2160 BoxService<Request, Response, BoxError>,
2161 ReadStateService,
2162 LatestChainTip,
2163 ChainTipChange,
2164)> {
2165 let network = network.clone();
2166 tokio::task::spawn_blocking(move || {
2167 init(
2168 config,
2169 &network,
2170 max_checkpoint_height,
2171 checkpoint_verify_concurrency_limit,
2172 )
2173 })
2174}
2175
2176/// Returns a [`StateService`] with an ephemeral [`Config`] and a buffer with a single slot.
2177///
2178/// This can be used to create a state service for testing. See also [`init`].
2179#[cfg(any(test, feature = "proptest-impl"))]
2180pub fn init_test(network: &Network) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
2181 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2182 // if we ever need to test final checkpoint sent UTXO queries
2183 let (state_service, _, _, _) =
2184 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2185
2186 Buffer::new(BoxService::new(state_service), 1)
2187}
2188
2189/// Initializes a state service with an ephemeral [`Config`] and a buffer with a single slot,
2190/// then returns the read-write service, read-only service, and tip watch channels.
2191///
2192/// This can be used to create a state service for testing. See also [`init`].
2193#[cfg(any(test, feature = "proptest-impl"))]
2194pub fn init_test_services(
2195 network: &Network,
2196) -> (
2197 Buffer<BoxService<Request, Response, BoxError>, Request>,
2198 ReadStateService,
2199 LatestChainTip,
2200 ChainTipChange,
2201) {
2202 // TODO: pass max_checkpoint_height and checkpoint_verify_concurrency limit
2203 // if we ever need to test final checkpoint sent UTXO queries
2204 let (state_service, read_state_service, latest_chain_tip, chain_tip_change) =
2205 StateService::new(Config::ephemeral(), network, block::Height::MAX, 0);
2206
2207 let state_service = Buffer::new(BoxService::new(state_service), 1);
2208
2209 (
2210 state_service,
2211 read_state_service,
2212 latest_chain_tip,
2213 chain_tip_change,
2214 )
2215}