zebra_state/service/finalized_state.rs
1//! The primary implementation of the `zebra_state::Service` built upon rocksdb.
2//!
3//! Zebra's database is implemented in 4 layers:
4//! - [`FinalizedState`]: queues, validates, and commits blocks, using...
5//! - [`ZebraDb`]: reads and writes [`zebra_chain`] types to the state database, using...
6//! - [`DiskDb`]: reads and writes generic types to any column family in the database, using...
7//! - [`disk_format`]: converts types to raw database bytes.
8//!
9//! These layers allow us to split [`zebra_chain`] types for efficient database storage.
10//! They reduce the risk of data corruption bugs, runtime inconsistencies, and panics.
11//!
12//! # Correctness
13//!
14//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
15//! each time the database format (column, serialization, etc) changes.
16
17use std::{
18 io::{stderr, stdout, Write},
19 sync::Arc,
20};
21
22use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network};
23use zebra_db::{
24 chain::BLOCK_INFO,
25 transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC},
26};
27
28use crate::{
29 constants::{state_database_format_version_in_code, STATE_DATABASE_KIND},
30 request::{FinalizableBlock, FinalizedBlock, Treestate},
31 service::{check, QueuedCheckpointVerified},
32 BoxError, CheckpointVerifiedBlock, CloneError, Config,
33};
34
35pub mod column_family;
36
37mod disk_db;
38mod disk_format;
39mod zebra_db;
40
41#[cfg(any(test, feature = "proptest-impl"))]
42mod arbitrary;
43
44#[cfg(test)]
45mod tests;
46
47#[allow(unused_imports)]
48pub use column_family::{TypedColumnFamily, WriteTypedBatch};
49#[allow(unused_imports)]
50pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
51#[allow(unused_imports)]
52pub use disk_format::{
53 FromDisk, IntoDisk, OutputIndex, OutputLocation, RawBytes, TransactionIndex,
54 TransactionLocation, MAX_ON_DISK_HEIGHT,
55};
56pub use zebra_db::ZebraDb;
57
58#[cfg(feature = "shielded-scan")]
59pub use disk_format::{
60 SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult,
61 SaplingScanningKey,
62};
63
64#[cfg(any(test, feature = "proptest-impl"))]
65pub use disk_format::KV;
66
67pub use disk_format::upgrade::restorable_db_versions;
68
69/// The column families supported by the running `zebra-state` database code.
70///
71/// Existing column families that aren't listed here are preserved when the database is opened.
72pub const STATE_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
73 // Blocks
74 "hash_by_height",
75 "height_by_hash",
76 "block_header_by_height",
77 // Transactions
78 "tx_by_loc",
79 "hash_by_tx_loc",
80 "tx_loc_by_hash",
81 // Transparent
82 BALANCE_BY_TRANSPARENT_ADDR,
83 "tx_loc_by_transparent_addr_loc",
84 "utxo_by_out_loc",
85 "utxo_loc_by_transparent_addr_loc",
86 TX_LOC_BY_SPENT_OUT_LOC,
87 // Sprout
88 "sprout_nullifiers",
89 "sprout_anchors",
90 "sprout_note_commitment_tree",
91 // Sapling
92 "sapling_nullifiers",
93 "sapling_anchors",
94 "sapling_note_commitment_tree",
95 "sapling_note_commitment_subtree",
96 // Orchard
97 "orchard_nullifiers",
98 "orchard_anchors",
99 "orchard_note_commitment_tree",
100 "orchard_note_commitment_subtree",
101 // Chain
102 "history_tree",
103 "tip_chain_value_pool",
104 BLOCK_INFO,
105];
106
107/// The finalized part of the chain state, stored in the db.
108///
109/// `rocksdb` allows concurrent writes through a shared reference,
110/// so clones of the finalized state represent the same database instance.
111/// When the final clone is dropped, the database is closed.
112///
113/// This is different from `NonFinalizedState::clone()`,
114/// which returns an independent copy of the chains.
115#[derive(Clone, Debug)]
116pub struct FinalizedState {
117 // Configuration
118 //
119 // This configuration cannot be modified after the database is initialized,
120 // because some clones would have different values.
121 //
122 /// The configured stop height.
123 ///
124 /// Commit blocks to the finalized state up to this height, then exit Zebra.
125 debug_stop_at_height: Option<block::Height>,
126
127 // Owned State
128 //
129 // Everything contained in this state must be shared by all clones, or read-only.
130 //
131 /// The underlying database.
132 ///
133 /// `rocksdb` allows reads and writes via a shared reference,
134 /// so this database object can be freely cloned.
135 /// The last instance that is dropped will close the underlying database.
136 pub db: ZebraDb,
137
138 #[cfg(feature = "elasticsearch")]
139 /// The elasticsearch handle.
140 pub elastic_db: Option<elasticsearch::Elasticsearch>,
141
142 #[cfg(feature = "elasticsearch")]
143 /// A collection of blocks to be sent to elasticsearch as a bulk.
144 pub elastic_blocks: Vec<String>,
145}
146
147impl FinalizedState {
148 /// Returns an on-disk database instance for `config`, `network`, and `elastic_db`.
149 /// If there is no existing database, creates a new database on disk.
150 pub fn new(
151 config: &Config,
152 network: &Network,
153 #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
154 ) -> Self {
155 Self::new_with_debug(
156 config,
157 network,
158 false,
159 #[cfg(feature = "elasticsearch")]
160 enable_elastic_db,
161 false,
162 )
163 }
164
165 /// Returns an on-disk database instance with the supplied production and debug settings.
166 /// If there is no existing database, creates a new database on disk.
167 ///
168 /// This method is intended for use in tests.
169 pub(crate) fn new_with_debug(
170 config: &Config,
171 network: &Network,
172 debug_skip_format_upgrades: bool,
173 #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
174 read_only: bool,
175 ) -> Self {
176 #[cfg(feature = "elasticsearch")]
177 let elastic_db = if enable_elastic_db {
178 use elasticsearch::{
179 auth::Credentials::Basic,
180 cert::CertificateValidation,
181 http::transport::{SingleNodeConnectionPool, TransportBuilder},
182 http::Url,
183 Elasticsearch,
184 };
185
186 let conn_pool = SingleNodeConnectionPool::new(
187 Url::parse(config.elasticsearch_url.as_str())
188 .expect("configured elasticsearch url is invalid"),
189 );
190 let transport = TransportBuilder::new(conn_pool)
191 .cert_validation(CertificateValidation::None)
192 .auth(Basic(
193 config.clone().elasticsearch_username,
194 config.clone().elasticsearch_password,
195 ))
196 .build()
197 .expect("elasticsearch transport builder should not fail");
198
199 Some(Elasticsearch::new(transport))
200 } else {
201 None
202 };
203
204 let db = ZebraDb::new(
205 config,
206 STATE_DATABASE_KIND,
207 &state_database_format_version_in_code(),
208 network,
209 debug_skip_format_upgrades,
210 STATE_COLUMN_FAMILIES_IN_CODE
211 .iter()
212 .map(ToString::to_string),
213 read_only,
214 );
215
216 #[cfg(feature = "elasticsearch")]
217 let new_state = Self {
218 debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
219 db,
220 elastic_db,
221 elastic_blocks: vec![],
222 };
223
224 #[cfg(not(feature = "elasticsearch"))]
225 let new_state = Self {
226 debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
227 db,
228 };
229
230 // TODO: move debug_stop_at_height into a task in the start command (#3442)
231 if let Some(tip_height) = new_state.db.finalized_tip_height() {
232 if new_state.is_at_stop_height(tip_height) {
233 let debug_stop_at_height = new_state
234 .debug_stop_at_height
235 .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
236 let tip_hash = new_state.db.finalized_tip_hash();
237
238 if tip_height > debug_stop_at_height {
239 tracing::error!(
240 ?debug_stop_at_height,
241 ?tip_height,
242 ?tip_hash,
243 "previous state height is greater than the stop height",
244 );
245 }
246
247 tracing::info!(
248 ?debug_stop_at_height,
249 ?tip_height,
250 ?tip_hash,
251 "state is already at the configured height"
252 );
253
254 // RocksDB can do a cleanup when column families are opened.
255 // So we want to drop it before we exit.
256 std::mem::drop(new_state);
257
258 // Drops tracing log output that's hasn't already been written to stdout
259 // since this exits before calling drop on the WorkerGuard for the logger thread.
260 // This is okay for now because this is test-only code
261 //
262 // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
263 Self::exit_process();
264 }
265 }
266
267 tracing::info!(tip = ?new_state.db.tip(), "loaded Zebra state cache");
268
269 new_state
270 }
271
272 /// Returns the configured network for this database.
273 pub fn network(&self) -> Network {
274 self.db.network()
275 }
276
277 /// Commit a checkpoint-verified block to the state.
278 ///
279 /// It's the caller's responsibility to ensure that blocks are committed in
280 /// order.
281 pub fn commit_finalized(
282 &mut self,
283 ordered_block: QueuedCheckpointVerified,
284 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
285 ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), BoxError> {
286 let (checkpoint_verified, rsp_tx) = ordered_block;
287 let result = self.commit_finalized_direct(
288 checkpoint_verified.clone().into(),
289 prev_note_commitment_trees,
290 "commit checkpoint-verified request",
291 );
292
293 if result.is_ok() {
294 metrics::counter!("state.checkpoint.finalized.block.count").increment(1);
295 metrics::gauge!("state.checkpoint.finalized.block.height")
296 .set(checkpoint_verified.height.0 as f64);
297
298 // This height gauge is updated for both fully verified and checkpoint blocks.
299 // These updates can't conflict, because the state makes sure that blocks
300 // are committed in order.
301 metrics::gauge!("zcash.chain.verified.block.height")
302 .set(checkpoint_verified.height.0 as f64);
303 metrics::counter!("zcash.chain.verified.block.total").increment(1);
304 } else {
305 metrics::counter!("state.checkpoint.error.block.count").increment(1);
306 metrics::gauge!("state.checkpoint.error.block.height")
307 .set(checkpoint_verified.height.0 as f64);
308 };
309
310 // Make the error cloneable, so we can send it to the block verify future,
311 // and the block write task.
312 let result = result.map_err(CloneError::from);
313
314 let _ = rsp_tx.send(result.clone().map(|(hash, _)| hash).map_err(BoxError::from));
315
316 result
317 .map(|(_hash, note_commitment_trees)| (checkpoint_verified, note_commitment_trees))
318 .map_err(BoxError::from)
319 }
320
321 /// Immediately commit a `finalized` block to the finalized state.
322 ///
323 /// This can be called either by the non-finalized state (when finalizing
324 /// a block) or by the checkpoint verifier.
325 ///
326 /// Use `source` as the source of the block in log messages.
327 ///
328 /// # Errors
329 ///
330 /// - Propagates any errors from writing to the DB
331 /// - Propagates any errors from updating history and note commitment trees
332 /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments`
333 /// does not match the expected value
334 #[allow(clippy::unwrap_in_result)]
335 pub fn commit_finalized_direct(
336 &mut self,
337 finalizable_block: FinalizableBlock,
338 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
339 source: &str,
340 ) -> Result<(block::Hash, NoteCommitmentTrees), BoxError> {
341 let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block {
342 FinalizableBlock::Checkpoint {
343 checkpoint_verified,
344 } => {
345 // Checkpoint-verified blocks don't have an associated treestate, so we retrieve the
346 // treestate of the finalized tip from the database and update it for the block
347 // being committed, assuming the retrieved treestate is the parent block's
348 // treestate. Later on, this function proves this assumption by asserting that the
349 // finalized tip is the parent block of the block being committed.
350
351 let block = checkpoint_verified.block.clone();
352 let mut history_tree = self.db.history_tree();
353 let prev_note_commitment_trees = prev_note_commitment_trees
354 .unwrap_or_else(|| self.db.note_commitment_trees_for_tip());
355
356 // Update the note commitment trees.
357 let mut note_commitment_trees = prev_note_commitment_trees.clone();
358 note_commitment_trees.update_trees_parallel(&block)?;
359
360 // Check the block commitment if the history tree was not
361 // supplied by the non-finalized state. Note that we don't do
362 // this check for history trees supplied by the non-finalized
363 // state because the non-finalized state checks the block
364 // commitment.
365 //
366 // For Nu5-onward, the block hash commits only to
367 // non-authorizing data (see ZIP-244). This checks the
368 // authorizing data commitment, making sure the entire block
369 // contents were committed to. The test is done here (and not
370 // during semantic validation) because it needs the history tree
371 // root. While it _is_ checked during contextual validation,
372 // that is not called by the checkpoint verifier, and keeping a
373 // history tree there would be harder to implement.
374 //
375 // TODO: run this CPU-intensive cryptography in a parallel rayon
376 // thread, if it shows up in profiles
377 check::block_commitment_is_valid_for_chain_history(
378 block.clone(),
379 &self.network(),
380 &history_tree,
381 )?;
382
383 // Update the history tree.
384 //
385 // TODO: run this CPU-intensive cryptography in a parallel rayon
386 // thread, if it shows up in profiles
387 let history_tree_mut = Arc::make_mut(&mut history_tree);
388 let sapling_root = note_commitment_trees.sapling.root();
389 let orchard_root = note_commitment_trees.orchard.root();
390 history_tree_mut.push(
391 &self.network(),
392 block.clone(),
393 &sapling_root,
394 &orchard_root,
395 )?;
396 let treestate = Treestate {
397 note_commitment_trees,
398 history_tree,
399 };
400
401 (
402 checkpoint_verified.height,
403 checkpoint_verified.hash,
404 FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate),
405 Some(prev_note_commitment_trees),
406 )
407 }
408 FinalizableBlock::Contextual {
409 contextually_verified,
410 treestate,
411 } => (
412 contextually_verified.height,
413 contextually_verified.hash,
414 FinalizedBlock::from_contextually_verified(contextually_verified, treestate),
415 prev_note_commitment_trees,
416 ),
417 };
418
419 let committed_tip_hash = self.db.finalized_tip_hash();
420 let committed_tip_height = self.db.finalized_tip_height();
421
422 // Assert that callers (including unit tests) get the chain order correct
423 if self.db.is_empty() {
424 assert_eq!(
425 committed_tip_hash, finalized.block.header.previous_block_hash,
426 "the first block added to an empty state must be a genesis block, source: {source}",
427 );
428 assert_eq!(
429 block::Height(0),
430 height,
431 "cannot commit genesis: invalid height, source: {source}",
432 );
433 } else {
434 assert_eq!(
435 committed_tip_height.expect("state must have a genesis block committed") + 1,
436 Some(height),
437 "committed block height must be 1 more than the finalized tip height, source: {source}",
438 );
439
440 assert_eq!(
441 committed_tip_hash, finalized.block.header.previous_block_hash,
442 "committed block must be a child of the finalized tip, source: {source}",
443 );
444 }
445
446 #[cfg(feature = "elasticsearch")]
447 let finalized_inner_block = finalized.block.clone();
448 let note_commitment_trees = finalized.treestate.note_commitment_trees.clone();
449
450 let result = self.db.write_block(
451 finalized,
452 prev_note_commitment_trees,
453 &self.network(),
454 source,
455 );
456
457 if result.is_ok() {
458 // Save blocks to elasticsearch if the feature is enabled.
459 #[cfg(feature = "elasticsearch")]
460 self.elasticsearch(&finalized_inner_block);
461
462 // TODO: move the stop height check to the syncer (#3442)
463 if self.is_at_stop_height(height) {
464 tracing::info!(
465 ?height,
466 ?hash,
467 block_source = ?source,
468 "stopping at configured height, flushing database to disk"
469 );
470
471 // We're just about to do a forced exit, so it's ok to do a forced db shutdown
472 self.db.shutdown(true);
473
474 // Drops tracing log output that's hasn't already been written to stdout
475 // since this exits before calling drop on the WorkerGuard for the logger thread.
476 // This is okay for now because this is test-only code
477 //
478 // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
479 Self::exit_process();
480 }
481 }
482
483 result.map(|hash| (hash, note_commitment_trees))
484 }
485
486 #[cfg(feature = "elasticsearch")]
487 /// Store finalized blocks into an elasticsearch database.
488 ///
489 /// We use the elasticsearch bulk api to index multiple blocks at a time while we are
490 /// synchronizing the chain, when we get close to tip we index blocks one by one.
491 pub fn elasticsearch(&mut self, block: &Arc<block::Block>) {
492 if let Some(client) = self.elastic_db.clone() {
493 let block_time = block.header.time.timestamp();
494 let local_time = chrono::Utc::now().timestamp();
495
496 // Bulk size is small enough to avoid the elasticsearch 100mb content length limitation.
497 // MAX_BLOCK_BYTES = 2MB but each block use around 4.1 MB of JSON.
498 // Each block count as 2 as we send them with a operation/header line. A value of 48
499 // is 24 blocks.
500 const AWAY_FROM_TIP_BULK_SIZE: usize = 48;
501
502 // The number of blocks the bulk will have when we are in sync.
503 // A value of 2 means only 1 block as we want to insert them as soon as we get
504 // them for a real time experience. This is the same for mainnet and testnet.
505 const CLOSE_TO_TIP_BULK_SIZE: usize = 2;
506
507 // We consider in sync when the local time and the blockchain time difference is
508 // less than this number of seconds.
509 const CLOSE_TO_TIP_SECONDS: i64 = 14400; // 4 hours
510
511 let mut blocks_size_to_dump = AWAY_FROM_TIP_BULK_SIZE;
512
513 // If we are close to the tip, index one block per bulk call.
514 if local_time - block_time < CLOSE_TO_TIP_SECONDS {
515 blocks_size_to_dump = CLOSE_TO_TIP_BULK_SIZE;
516 }
517
518 // Insert the operation line.
519 let height_number = block.coinbase_height().unwrap_or(block::Height(0)).0;
520 self.elastic_blocks.push(
521 serde_json::json!({
522 "index": {
523 "_id": height_number.to_string().as_str()
524 }
525 })
526 .to_string(),
527 );
528
529 // Insert the block itself.
530 self.elastic_blocks
531 .push(serde_json::json!(block).to_string());
532
533 // We are in bulk time, insert to ES all we have.
534 if self.elastic_blocks.len() >= blocks_size_to_dump {
535 let rt = tokio::runtime::Runtime::new()
536 .expect("runtime creation for elasticsearch should not fail.");
537 let blocks = self.elastic_blocks.clone();
538 let network = self.network();
539
540 rt.block_on(async move {
541 // Send a ping to the server to check if it is available before inserting.
542 if client.ping().send().await.is_err() {
543 tracing::error!("Elasticsearch is not available, skipping block indexing");
544 return;
545 }
546
547 let response = client
548 .bulk(elasticsearch::BulkParts::Index(
549 format!("zcash_{}", network.to_string().to_lowercase()).as_str(),
550 ))
551 .body(blocks)
552 .send()
553 .await
554 .expect("ES Request should never fail");
555
556 // Make sure no errors ever.
557 let response_body = response
558 .json::<serde_json::Value>()
559 .await
560 .expect("ES response parsing error. Maybe we are sending more than 100 mb of data (`http.max_content_length`)");
561 let errors = response_body["errors"].as_bool().unwrap_or(true);
562 assert!(!errors, "{}", format!("ES error: {response_body}"));
563 });
564
565 // Clean the block storage.
566 self.elastic_blocks.clear();
567 }
568 }
569 }
570
571 /// Stop the process if `block_height` is greater than or equal to the
572 /// configured stop height.
573 fn is_at_stop_height(&self, block_height: block::Height) -> bool {
574 let debug_stop_at_height = match self.debug_stop_at_height {
575 Some(debug_stop_at_height) => debug_stop_at_height,
576 None => return false,
577 };
578
579 if block_height < debug_stop_at_height {
580 return false;
581 }
582
583 true
584 }
585
586 /// Exit the host process.
587 ///
588 /// Designed for debugging and tests.
589 ///
590 /// TODO: move the stop height check to the syncer (#3442)
591 fn exit_process() -> ! {
592 tracing::info!("exiting Zebra");
593
594 // Some OSes require a flush to send all output to the terminal.
595 // Zebra's logging doesn't depend on `tokio`, so we flush the stdlib sync streams.
596 //
597 // TODO: if this doesn't work, send an empty line as well.
598 let _ = stdout().lock().flush();
599 let _ = stderr().lock().flush();
600
601 // Give some time to logger thread to flush out any remaining lines to stdout
602 // and yield so that tests pass on MacOS
603 std::thread::sleep(std::time::Duration::from_secs(3));
604
605 // Exits before calling drop on the WorkerGuard for the logger thread,
606 // dropping any lines that haven't already been written to stdout.
607 // This is okay for now because this is test-only code
608 std::process::exit(0);
609 }
610}