zebra_state/service/finalized_state.rs
1//! The primary implementation of the `zebra_state::Service` built upon rocksdb.
2//!
3//! Zebra's database is implemented in 4 layers:
4//! - [`FinalizedState`]: queues, validates, and commits blocks, using...
5//! - [`ZebraDb`]: reads and writes [`zebra_chain`] types to the state database, using...
6//! - [`DiskDb`]: reads and writes generic types to any column family in the database, using...
7//! - [`disk_format`]: converts types to raw database bytes.
8//!
9//! These layers allow us to split [`zebra_chain`] types for efficient database storage.
10//! They reduce the risk of data corruption bugs, runtime inconsistencies, and panics.
11//!
12//! # Correctness
13//!
14//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
15//! each time the database format (column, serialization, etc) changes.
16
17use std::{
18 io::{stderr, stdout, Write},
19 sync::Arc,
20};
21
22use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network};
23use zebra_db::{
24 chain::BLOCK_INFO,
25 transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC},
26};
27
28use crate::{
29 constants::{state_database_format_version_in_code, STATE_DATABASE_KIND},
30 request::{FinalizableBlock, FinalizedBlock, Treestate},
31 service::{check, QueuedCheckpointVerified},
32 BoxError, CheckpointVerifiedBlock, CloneError, Config,
33};
34
35pub mod column_family;
36
37mod disk_db;
38mod disk_format;
39mod zebra_db;
40
41#[cfg(any(test, feature = "proptest-impl"))]
42mod arbitrary;
43
44#[cfg(test)]
45mod tests;
46
47#[allow(unused_imports)]
48pub use column_family::{TypedColumnFamily, WriteTypedBatch};
49#[allow(unused_imports)]
50pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
51#[allow(unused_imports)]
52pub use disk_format::{
53 FromDisk, IntoDisk, OutputLocation, RawBytes, TransactionIndex, TransactionLocation,
54 MAX_ON_DISK_HEIGHT,
55};
56pub use zebra_db::ZebraDb;
57
58#[cfg(any(test, feature = "proptest-impl"))]
59pub use disk_format::KV;
60
61pub use disk_format::upgrade::restorable_db_versions;
62
63/// The column families supported by the running `zebra-state` database code.
64///
65/// Existing column families that aren't listed here are preserved when the database is opened.
66pub const STATE_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
67 // Blocks
68 "hash_by_height",
69 "height_by_hash",
70 "block_header_by_height",
71 // Transactions
72 "tx_by_loc",
73 "hash_by_tx_loc",
74 "tx_loc_by_hash",
75 // Transparent
76 BALANCE_BY_TRANSPARENT_ADDR,
77 "tx_loc_by_transparent_addr_loc",
78 "utxo_by_out_loc",
79 "utxo_loc_by_transparent_addr_loc",
80 TX_LOC_BY_SPENT_OUT_LOC,
81 // Sprout
82 "sprout_nullifiers",
83 "sprout_anchors",
84 "sprout_note_commitment_tree",
85 // Sapling
86 "sapling_nullifiers",
87 "sapling_anchors",
88 "sapling_note_commitment_tree",
89 "sapling_note_commitment_subtree",
90 // Orchard
91 "orchard_nullifiers",
92 "orchard_anchors",
93 "orchard_note_commitment_tree",
94 "orchard_note_commitment_subtree",
95 // Chain
96 "history_tree",
97 "tip_chain_value_pool",
98 BLOCK_INFO,
99];
100
101/// The finalized part of the chain state, stored in the db.
102///
103/// `rocksdb` allows concurrent writes through a shared reference,
104/// so clones of the finalized state represent the same database instance.
105/// When the final clone is dropped, the database is closed.
106///
107/// This is different from `NonFinalizedState::clone()`,
108/// which returns an independent copy of the chains.
109#[derive(Clone, Debug)]
110pub struct FinalizedState {
111 // Configuration
112 //
113 // This configuration cannot be modified after the database is initialized,
114 // because some clones would have different values.
115 //
116 /// The configured stop height.
117 ///
118 /// Commit blocks to the finalized state up to this height, then exit Zebra.
119 debug_stop_at_height: Option<block::Height>,
120
121 // Owned State
122 //
123 // Everything contained in this state must be shared by all clones, or read-only.
124 //
125 /// The underlying database.
126 ///
127 /// `rocksdb` allows reads and writes via a shared reference,
128 /// so this database object can be freely cloned.
129 /// The last instance that is dropped will close the underlying database.
130 pub db: ZebraDb,
131
132 #[cfg(feature = "elasticsearch")]
133 /// The elasticsearch handle.
134 pub elastic_db: Option<elasticsearch::Elasticsearch>,
135
136 #[cfg(feature = "elasticsearch")]
137 /// A collection of blocks to be sent to elasticsearch as a bulk.
138 pub elastic_blocks: Vec<String>,
139}
140
141impl FinalizedState {
142 /// Returns an on-disk database instance for `config`, `network`, and `elastic_db`.
143 /// If there is no existing database, creates a new database on disk.
144 pub fn new(
145 config: &Config,
146 network: &Network,
147 #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
148 ) -> Self {
149 Self::new_with_debug(
150 config,
151 network,
152 false,
153 #[cfg(feature = "elasticsearch")]
154 enable_elastic_db,
155 false,
156 )
157 }
158
159 /// Returns an on-disk database instance with the supplied production and debug settings.
160 /// If there is no existing database, creates a new database on disk.
161 ///
162 /// This method is intended for use in tests.
163 pub(crate) fn new_with_debug(
164 config: &Config,
165 network: &Network,
166 debug_skip_format_upgrades: bool,
167 #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
168 read_only: bool,
169 ) -> Self {
170 #[cfg(feature = "elasticsearch")]
171 let elastic_db = if enable_elastic_db {
172 use elasticsearch::{
173 auth::Credentials::Basic,
174 cert::CertificateValidation,
175 http::transport::{SingleNodeConnectionPool, TransportBuilder},
176 http::Url,
177 Elasticsearch,
178 };
179
180 let conn_pool = SingleNodeConnectionPool::new(
181 Url::parse(config.elasticsearch_url.as_str())
182 .expect("configured elasticsearch url is invalid"),
183 );
184 let transport = TransportBuilder::new(conn_pool)
185 .cert_validation(CertificateValidation::None)
186 .auth(Basic(
187 config.clone().elasticsearch_username,
188 config.clone().elasticsearch_password,
189 ))
190 .build()
191 .expect("elasticsearch transport builder should not fail");
192
193 Some(Elasticsearch::new(transport))
194 } else {
195 None
196 };
197
198 let db = ZebraDb::new(
199 config,
200 STATE_DATABASE_KIND,
201 &state_database_format_version_in_code(),
202 network,
203 debug_skip_format_upgrades,
204 STATE_COLUMN_FAMILIES_IN_CODE
205 .iter()
206 .map(ToString::to_string),
207 read_only,
208 );
209
210 #[cfg(feature = "elasticsearch")]
211 let new_state = Self {
212 debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
213 db,
214 elastic_db,
215 elastic_blocks: vec![],
216 };
217
218 #[cfg(not(feature = "elasticsearch"))]
219 let new_state = Self {
220 debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
221 db,
222 };
223
224 // TODO: move debug_stop_at_height into a task in the start command (#3442)
225 if let Some(tip_height) = new_state.db.finalized_tip_height() {
226 if new_state.is_at_stop_height(tip_height) {
227 let debug_stop_at_height = new_state
228 .debug_stop_at_height
229 .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
230 let tip_hash = new_state.db.finalized_tip_hash();
231
232 if tip_height > debug_stop_at_height {
233 tracing::error!(
234 ?debug_stop_at_height,
235 ?tip_height,
236 ?tip_hash,
237 "previous state height is greater than the stop height",
238 );
239 }
240
241 tracing::info!(
242 ?debug_stop_at_height,
243 ?tip_height,
244 ?tip_hash,
245 "state is already at the configured height"
246 );
247
248 // RocksDB can do a cleanup when column families are opened.
249 // So we want to drop it before we exit.
250 std::mem::drop(new_state);
251
252 // Drops tracing log output that's hasn't already been written to stdout
253 // since this exits before calling drop on the WorkerGuard for the logger thread.
254 // This is okay for now because this is test-only code
255 //
256 // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
257 Self::exit_process();
258 }
259 }
260
261 tracing::info!(tip = ?new_state.db.tip(), "loaded Zebra state cache");
262
263 new_state
264 }
265
266 /// Returns the configured network for this database.
267 pub fn network(&self) -> Network {
268 self.db.network()
269 }
270
271 /// Commit a checkpoint-verified block to the state.
272 ///
273 /// It's the caller's responsibility to ensure that blocks are committed in
274 /// order.
275 pub fn commit_finalized(
276 &mut self,
277 ordered_block: QueuedCheckpointVerified,
278 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
279 ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), BoxError> {
280 let (checkpoint_verified, rsp_tx) = ordered_block;
281 let result = self.commit_finalized_direct(
282 checkpoint_verified.clone().into(),
283 prev_note_commitment_trees,
284 "commit checkpoint-verified request",
285 );
286
287 if result.is_ok() {
288 metrics::counter!("state.checkpoint.finalized.block.count").increment(1);
289 metrics::gauge!("state.checkpoint.finalized.block.height")
290 .set(checkpoint_verified.height.0 as f64);
291
292 // This height gauge is updated for both fully verified and checkpoint blocks.
293 // These updates can't conflict, because the state makes sure that blocks
294 // are committed in order.
295 metrics::gauge!("zcash.chain.verified.block.height")
296 .set(checkpoint_verified.height.0 as f64);
297 metrics::counter!("zcash.chain.verified.block.total").increment(1);
298 } else {
299 metrics::counter!("state.checkpoint.error.block.count").increment(1);
300 metrics::gauge!("state.checkpoint.error.block.height")
301 .set(checkpoint_verified.height.0 as f64);
302 };
303
304 // Make the error cloneable, so we can send it to the block verify future,
305 // and the block write task.
306 let result = result.map_err(CloneError::from);
307
308 let _ = rsp_tx.send(result.clone().map(|(hash, _)| hash).map_err(BoxError::from));
309
310 result
311 .map(|(_hash, note_commitment_trees)| (checkpoint_verified, note_commitment_trees))
312 .map_err(BoxError::from)
313 }
314
315 /// Immediately commit a `finalized` block to the finalized state.
316 ///
317 /// This can be called either by the non-finalized state (when finalizing
318 /// a block) or by the checkpoint verifier.
319 ///
320 /// Use `source` as the source of the block in log messages.
321 ///
322 /// # Errors
323 ///
324 /// - Propagates any errors from writing to the DB
325 /// - Propagates any errors from updating history and note commitment trees
326 /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments`
327 /// does not match the expected value
328 #[allow(clippy::unwrap_in_result)]
329 pub fn commit_finalized_direct(
330 &mut self,
331 finalizable_block: FinalizableBlock,
332 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
333 source: &str,
334 ) -> Result<(block::Hash, NoteCommitmentTrees), BoxError> {
335 let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block {
336 FinalizableBlock::Checkpoint {
337 checkpoint_verified,
338 } => {
339 // Checkpoint-verified blocks don't have an associated treestate, so we retrieve the
340 // treestate of the finalized tip from the database and update it for the block
341 // being committed, assuming the retrieved treestate is the parent block's
342 // treestate. Later on, this function proves this assumption by asserting that the
343 // finalized tip is the parent block of the block being committed.
344
345 let block = checkpoint_verified.block.clone();
346 let mut history_tree = self.db.history_tree();
347 let prev_note_commitment_trees = prev_note_commitment_trees
348 .unwrap_or_else(|| self.db.note_commitment_trees_for_tip());
349
350 // Update the note commitment trees.
351 let mut note_commitment_trees = prev_note_commitment_trees.clone();
352 note_commitment_trees.update_trees_parallel(&block)?;
353
354 // Check the block commitment if the history tree was not
355 // supplied by the non-finalized state. Note that we don't do
356 // this check for history trees supplied by the non-finalized
357 // state because the non-finalized state checks the block
358 // commitment.
359 //
360 // For Nu5-onward, the block hash commits only to
361 // non-authorizing data (see ZIP-244). This checks the
362 // authorizing data commitment, making sure the entire block
363 // contents were committed to. The test is done here (and not
364 // during semantic validation) because it needs the history tree
365 // root. While it _is_ checked during contextual validation,
366 // that is not called by the checkpoint verifier, and keeping a
367 // history tree there would be harder to implement.
368 //
369 // TODO: run this CPU-intensive cryptography in a parallel rayon
370 // thread, if it shows up in profiles
371 check::block_commitment_is_valid_for_chain_history(
372 block.clone(),
373 &self.network(),
374 &history_tree,
375 )?;
376
377 // Update the history tree.
378 //
379 // TODO: run this CPU-intensive cryptography in a parallel rayon
380 // thread, if it shows up in profiles
381 let history_tree_mut = Arc::make_mut(&mut history_tree);
382 let sapling_root = note_commitment_trees.sapling.root();
383 let orchard_root = note_commitment_trees.orchard.root();
384 history_tree_mut.push(
385 &self.network(),
386 block.clone(),
387 &sapling_root,
388 &orchard_root,
389 )?;
390 let treestate = Treestate {
391 note_commitment_trees,
392 history_tree,
393 };
394
395 (
396 checkpoint_verified.height,
397 checkpoint_verified.hash,
398 FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate),
399 Some(prev_note_commitment_trees),
400 )
401 }
402 FinalizableBlock::Contextual {
403 contextually_verified,
404 treestate,
405 } => (
406 contextually_verified.height,
407 contextually_verified.hash,
408 FinalizedBlock::from_contextually_verified(contextually_verified, treestate),
409 prev_note_commitment_trees,
410 ),
411 };
412
413 let committed_tip_hash = self.db.finalized_tip_hash();
414 let committed_tip_height = self.db.finalized_tip_height();
415
416 // Assert that callers (including unit tests) get the chain order correct
417 if self.db.is_empty() {
418 assert_eq!(
419 committed_tip_hash, finalized.block.header.previous_block_hash,
420 "the first block added to an empty state must be a genesis block, source: {source}",
421 );
422 assert_eq!(
423 block::Height(0),
424 height,
425 "cannot commit genesis: invalid height, source: {source}",
426 );
427 } else {
428 assert_eq!(
429 committed_tip_height.expect("state must have a genesis block committed") + 1,
430 Some(height),
431 "committed block height must be 1 more than the finalized tip height, source: {source}",
432 );
433
434 assert_eq!(
435 committed_tip_hash, finalized.block.header.previous_block_hash,
436 "committed block must be a child of the finalized tip, source: {source}",
437 );
438 }
439
440 #[cfg(feature = "elasticsearch")]
441 let finalized_inner_block = finalized.block.clone();
442 let note_commitment_trees = finalized.treestate.note_commitment_trees.clone();
443
444 let result = self.db.write_block(
445 finalized,
446 prev_note_commitment_trees,
447 &self.network(),
448 source,
449 );
450
451 if result.is_ok() {
452 // Save blocks to elasticsearch if the feature is enabled.
453 #[cfg(feature = "elasticsearch")]
454 self.elasticsearch(&finalized_inner_block);
455
456 // TODO: move the stop height check to the syncer (#3442)
457 if self.is_at_stop_height(height) {
458 tracing::info!(
459 ?height,
460 ?hash,
461 block_source = ?source,
462 "stopping at configured height, flushing database to disk"
463 );
464
465 // We're just about to do a forced exit, so it's ok to do a forced db shutdown
466 self.db.shutdown(true);
467
468 // Drops tracing log output that's hasn't already been written to stdout
469 // since this exits before calling drop on the WorkerGuard for the logger thread.
470 // This is okay for now because this is test-only code
471 //
472 // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
473 Self::exit_process();
474 }
475 }
476
477 result.map(|hash| (hash, note_commitment_trees))
478 }
479
480 #[cfg(feature = "elasticsearch")]
481 /// Store finalized blocks into an elasticsearch database.
482 ///
483 /// We use the elasticsearch bulk api to index multiple blocks at a time while we are
484 /// synchronizing the chain, when we get close to tip we index blocks one by one.
485 pub fn elasticsearch(&mut self, block: &Arc<block::Block>) {
486 if let Some(client) = self.elastic_db.clone() {
487 let block_time = block.header.time.timestamp();
488 let local_time = chrono::Utc::now().timestamp();
489
490 // Bulk size is small enough to avoid the elasticsearch 100mb content length limitation.
491 // MAX_BLOCK_BYTES = 2MB but each block use around 4.1 MB of JSON.
492 // Each block count as 2 as we send them with a operation/header line. A value of 48
493 // is 24 blocks.
494 const AWAY_FROM_TIP_BULK_SIZE: usize = 48;
495
496 // The number of blocks the bulk will have when we are in sync.
497 // A value of 2 means only 1 block as we want to insert them as soon as we get
498 // them for a real time experience. This is the same for mainnet and testnet.
499 const CLOSE_TO_TIP_BULK_SIZE: usize = 2;
500
501 // We consider in sync when the local time and the blockchain time difference is
502 // less than this number of seconds.
503 const CLOSE_TO_TIP_SECONDS: i64 = 14400; // 4 hours
504
505 let mut blocks_size_to_dump = AWAY_FROM_TIP_BULK_SIZE;
506
507 // If we are close to the tip, index one block per bulk call.
508 if local_time - block_time < CLOSE_TO_TIP_SECONDS {
509 blocks_size_to_dump = CLOSE_TO_TIP_BULK_SIZE;
510 }
511
512 // Insert the operation line.
513 let height_number = block.coinbase_height().unwrap_or(block::Height(0)).0;
514 self.elastic_blocks.push(
515 serde_json::json!({
516 "index": {
517 "_id": height_number.to_string().as_str()
518 }
519 })
520 .to_string(),
521 );
522
523 // Insert the block itself.
524 self.elastic_blocks
525 .push(serde_json::json!(block).to_string());
526
527 // We are in bulk time, insert to ES all we have.
528 if self.elastic_blocks.len() >= blocks_size_to_dump {
529 let rt = tokio::runtime::Runtime::new()
530 .expect("runtime creation for elasticsearch should not fail.");
531 let blocks = self.elastic_blocks.clone();
532 let network = self.network();
533
534 rt.block_on(async move {
535 // Send a ping to the server to check if it is available before inserting.
536 if client.ping().send().await.is_err() {
537 tracing::error!("Elasticsearch is not available, skipping block indexing");
538 return;
539 }
540
541 let response = client
542 .bulk(elasticsearch::BulkParts::Index(
543 format!("zcash_{}", network.to_string().to_lowercase()).as_str(),
544 ))
545 .body(blocks)
546 .send()
547 .await
548 .expect("ES Request should never fail");
549
550 // Make sure no errors ever.
551 let response_body = response
552 .json::<serde_json::Value>()
553 .await
554 .expect("ES response parsing error. Maybe we are sending more than 100 mb of data (`http.max_content_length`)");
555 let errors = response_body["errors"].as_bool().unwrap_or(true);
556 assert!(!errors, "{}", format!("ES error: {response_body}"));
557 });
558
559 // Clean the block storage.
560 self.elastic_blocks.clear();
561 }
562 }
563 }
564
565 /// Stop the process if `block_height` is greater than or equal to the
566 /// configured stop height.
567 fn is_at_stop_height(&self, block_height: block::Height) -> bool {
568 let debug_stop_at_height = match self.debug_stop_at_height {
569 Some(debug_stop_at_height) => debug_stop_at_height,
570 None => return false,
571 };
572
573 if block_height < debug_stop_at_height {
574 return false;
575 }
576
577 true
578 }
579
580 /// Exit the host process.
581 ///
582 /// Designed for debugging and tests.
583 ///
584 /// TODO: move the stop height check to the syncer (#3442)
585 fn exit_process() -> ! {
586 tracing::info!("exiting Zebra");
587
588 // Some OSes require a flush to send all output to the terminal.
589 // Zebra's logging doesn't depend on `tokio`, so we flush the stdlib sync streams.
590 //
591 // TODO: if this doesn't work, send an empty line as well.
592 let _ = stdout().lock().flush();
593 let _ = stderr().lock().flush();
594
595 // Give some time to logger thread to flush out any remaining lines to stdout
596 // and yield so that tests pass on MacOS
597 std::thread::sleep(std::time::Duration::from_secs(3));
598
599 // Exits before calling drop on the WorkerGuard for the logger thread,
600 // dropping any lines that haven't already been written to stdout.
601 // This is okay for now because this is test-only code
602 std::process::exit(0);
603 }
604}