zebra_state/service/
finalized_state.rs

1//! The primary implementation of the `zebra_state::Service` built upon rocksdb.
2//!
3//! Zebra's database is implemented in 4 layers:
4//! - [`FinalizedState`]: queues, validates, and commits blocks, using...
5//! - [`ZebraDb`]: reads and writes [`zebra_chain`] types to the state database, using...
6//! - [`DiskDb`]: reads and writes generic types to any column family in the database, using...
7//! - [`disk_format`]: converts types to raw database bytes.
8//!
9//! These layers allow us to split [`zebra_chain`] types for efficient database storage.
10//! They reduce the risk of data corruption bugs, runtime inconsistencies, and panics.
11//!
12//! # Correctness
13//!
14//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
15//! each time the database format (column, serialization, etc) changes.
16
17use std::{
18    io::{stderr, stdout, Write},
19    sync::Arc,
20};
21
22use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network};
23use zebra_db::{
24    chain::BLOCK_INFO,
25    transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC},
26};
27
28use crate::{
29    constants::{state_database_format_version_in_code, STATE_DATABASE_KIND},
30    request::{FinalizableBlock, FinalizedBlock, Treestate},
31    service::{check, QueuedCheckpointVerified},
32    BoxError, CheckpointVerifiedBlock, CloneError, Config,
33};
34
35pub mod column_family;
36
37mod disk_db;
38mod disk_format;
39mod zebra_db;
40
41#[cfg(any(test, feature = "proptest-impl"))]
42mod arbitrary;
43
44#[cfg(test)]
45mod tests;
46
47#[allow(unused_imports)]
48pub use column_family::{TypedColumnFamily, WriteTypedBatch};
49#[allow(unused_imports)]
50pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk};
51#[allow(unused_imports)]
52pub use disk_format::{
53    FromDisk, IntoDisk, OutputLocation, RawBytes, TransactionIndex, TransactionLocation,
54    MAX_ON_DISK_HEIGHT,
55};
56pub use zebra_db::ZebraDb;
57
58#[cfg(any(test, feature = "proptest-impl"))]
59pub use disk_format::KV;
60
61pub use disk_format::upgrade::restorable_db_versions;
62
63/// The column families supported by the running `zebra-state` database code.
64///
65/// Existing column families that aren't listed here are preserved when the database is opened.
66pub const STATE_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
67    // Blocks
68    "hash_by_height",
69    "height_by_hash",
70    "block_header_by_height",
71    // Transactions
72    "tx_by_loc",
73    "hash_by_tx_loc",
74    "tx_loc_by_hash",
75    // Transparent
76    BALANCE_BY_TRANSPARENT_ADDR,
77    "tx_loc_by_transparent_addr_loc",
78    "utxo_by_out_loc",
79    "utxo_loc_by_transparent_addr_loc",
80    TX_LOC_BY_SPENT_OUT_LOC,
81    // Sprout
82    "sprout_nullifiers",
83    "sprout_anchors",
84    "sprout_note_commitment_tree",
85    // Sapling
86    "sapling_nullifiers",
87    "sapling_anchors",
88    "sapling_note_commitment_tree",
89    "sapling_note_commitment_subtree",
90    // Orchard
91    "orchard_nullifiers",
92    "orchard_anchors",
93    "orchard_note_commitment_tree",
94    "orchard_note_commitment_subtree",
95    // Chain
96    "history_tree",
97    "tip_chain_value_pool",
98    BLOCK_INFO,
99];
100
101/// The finalized part of the chain state, stored in the db.
102///
103/// `rocksdb` allows concurrent writes through a shared reference,
104/// so clones of the finalized state represent the same database instance.
105/// When the final clone is dropped, the database is closed.
106///
107/// This is different from `NonFinalizedState::clone()`,
108/// which returns an independent copy of the chains.
109#[derive(Clone, Debug)]
110pub struct FinalizedState {
111    // Configuration
112    //
113    // This configuration cannot be modified after the database is initialized,
114    // because some clones would have different values.
115    //
116    /// The configured stop height.
117    ///
118    /// Commit blocks to the finalized state up to this height, then exit Zebra.
119    debug_stop_at_height: Option<block::Height>,
120
121    // Owned State
122    //
123    // Everything contained in this state must be shared by all clones, or read-only.
124    //
125    /// The underlying database.
126    ///
127    /// `rocksdb` allows reads and writes via a shared reference,
128    /// so this database object can be freely cloned.
129    /// The last instance that is dropped will close the underlying database.
130    pub db: ZebraDb,
131
132    #[cfg(feature = "elasticsearch")]
133    /// The elasticsearch handle.
134    pub elastic_db: Option<elasticsearch::Elasticsearch>,
135
136    #[cfg(feature = "elasticsearch")]
137    /// A collection of blocks to be sent to elasticsearch as a bulk.
138    pub elastic_blocks: Vec<String>,
139}
140
141impl FinalizedState {
142    /// Returns an on-disk database instance for `config`, `network`, and `elastic_db`.
143    /// If there is no existing database, creates a new database on disk.
144    pub fn new(
145        config: &Config,
146        network: &Network,
147        #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
148    ) -> Self {
149        Self::new_with_debug(
150            config,
151            network,
152            false,
153            #[cfg(feature = "elasticsearch")]
154            enable_elastic_db,
155            false,
156        )
157    }
158
159    /// Returns an on-disk database instance with the supplied production and debug settings.
160    /// If there is no existing database, creates a new database on disk.
161    ///
162    /// This method is intended for use in tests.
163    pub(crate) fn new_with_debug(
164        config: &Config,
165        network: &Network,
166        debug_skip_format_upgrades: bool,
167        #[cfg(feature = "elasticsearch")] enable_elastic_db: bool,
168        read_only: bool,
169    ) -> Self {
170        #[cfg(feature = "elasticsearch")]
171        let elastic_db = if enable_elastic_db {
172            use elasticsearch::{
173                auth::Credentials::Basic,
174                cert::CertificateValidation,
175                http::transport::{SingleNodeConnectionPool, TransportBuilder},
176                http::Url,
177                Elasticsearch,
178            };
179
180            let conn_pool = SingleNodeConnectionPool::new(
181                Url::parse(config.elasticsearch_url.as_str())
182                    .expect("configured elasticsearch url is invalid"),
183            );
184            let transport = TransportBuilder::new(conn_pool)
185                .cert_validation(CertificateValidation::None)
186                .auth(Basic(
187                    config.clone().elasticsearch_username,
188                    config.clone().elasticsearch_password,
189                ))
190                .build()
191                .expect("elasticsearch transport builder should not fail");
192
193            Some(Elasticsearch::new(transport))
194        } else {
195            None
196        };
197
198        let db = ZebraDb::new(
199            config,
200            STATE_DATABASE_KIND,
201            &state_database_format_version_in_code(),
202            network,
203            debug_skip_format_upgrades,
204            STATE_COLUMN_FAMILIES_IN_CODE
205                .iter()
206                .map(ToString::to_string),
207            read_only,
208        );
209
210        #[cfg(feature = "elasticsearch")]
211        let new_state = Self {
212            debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
213            db,
214            elastic_db,
215            elastic_blocks: vec![],
216        };
217
218        #[cfg(not(feature = "elasticsearch"))]
219        let new_state = Self {
220            debug_stop_at_height: config.debug_stop_at_height.map(block::Height),
221            db,
222        };
223
224        // TODO: move debug_stop_at_height into a task in the start command (#3442)
225        if let Some(tip_height) = new_state.db.finalized_tip_height() {
226            if new_state.is_at_stop_height(tip_height) {
227                let debug_stop_at_height = new_state
228                    .debug_stop_at_height
229                    .expect("true from `is_at_stop_height` implies `debug_stop_at_height` is Some");
230                let tip_hash = new_state.db.finalized_tip_hash();
231
232                if tip_height > debug_stop_at_height {
233                    tracing::error!(
234                        ?debug_stop_at_height,
235                        ?tip_height,
236                        ?tip_hash,
237                        "previous state height is greater than the stop height",
238                    );
239                }
240
241                tracing::info!(
242                    ?debug_stop_at_height,
243                    ?tip_height,
244                    ?tip_hash,
245                    "state is already at the configured height"
246                );
247
248                // RocksDB can do a cleanup when column families are opened.
249                // So we want to drop it before we exit.
250                std::mem::drop(new_state);
251
252                // Drops tracing log output that's hasn't already been written to stdout
253                // since this exits before calling drop on the WorkerGuard for the logger thread.
254                // This is okay for now because this is test-only code
255                //
256                // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
257                Self::exit_process();
258            }
259        }
260
261        new_state
262    }
263
264    /// Returns the configured network for this database.
265    pub fn network(&self) -> Network {
266        self.db.network()
267    }
268
269    /// Commit a checkpoint-verified block to the state.
270    ///
271    /// It's the caller's responsibility to ensure that blocks are committed in
272    /// order.
273    pub fn commit_finalized(
274        &mut self,
275        ordered_block: QueuedCheckpointVerified,
276        prev_note_commitment_trees: Option<NoteCommitmentTrees>,
277    ) -> Result<(CheckpointVerifiedBlock, NoteCommitmentTrees), BoxError> {
278        let (checkpoint_verified, rsp_tx) = ordered_block;
279        let result = self.commit_finalized_direct(
280            checkpoint_verified.clone().into(),
281            prev_note_commitment_trees,
282            "commit checkpoint-verified request",
283        );
284
285        if result.is_ok() {
286            metrics::counter!("state.checkpoint.finalized.block.count").increment(1);
287            metrics::gauge!("state.checkpoint.finalized.block.height")
288                .set(checkpoint_verified.height.0 as f64);
289
290            // This height gauge is updated for both fully verified and checkpoint blocks.
291            // These updates can't conflict, because the state makes sure that blocks
292            // are committed in order.
293            metrics::gauge!("zcash.chain.verified.block.height")
294                .set(checkpoint_verified.height.0 as f64);
295            metrics::counter!("zcash.chain.verified.block.total").increment(1);
296        } else {
297            metrics::counter!("state.checkpoint.error.block.count").increment(1);
298            metrics::gauge!("state.checkpoint.error.block.height")
299                .set(checkpoint_verified.height.0 as f64);
300        };
301
302        // Make the error cloneable, so we can send it to the block verify future,
303        // and the block write task.
304        let result = result.map_err(CloneError::from);
305
306        let _ = rsp_tx.send(result.clone().map(|(hash, _)| hash).map_err(BoxError::from));
307
308        result
309            .map(|(_hash, note_commitment_trees)| (checkpoint_verified, note_commitment_trees))
310            .map_err(BoxError::from)
311    }
312
313    /// Immediately commit a `finalized` block to the finalized state.
314    ///
315    /// This can be called either by the non-finalized state (when finalizing
316    /// a block) or by the checkpoint verifier.
317    ///
318    /// Use `source` as the source of the block in log messages.
319    ///
320    /// # Errors
321    ///
322    /// - Propagates any errors from writing to the DB
323    /// - Propagates any errors from updating history and note commitment trees
324    /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments`
325    ///   does not match the expected value
326    #[allow(clippy::unwrap_in_result)]
327    pub fn commit_finalized_direct(
328        &mut self,
329        finalizable_block: FinalizableBlock,
330        prev_note_commitment_trees: Option<NoteCommitmentTrees>,
331        source: &str,
332    ) -> Result<(block::Hash, NoteCommitmentTrees), BoxError> {
333        let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block {
334            FinalizableBlock::Checkpoint {
335                checkpoint_verified,
336            } => {
337                // Checkpoint-verified blocks don't have an associated treestate, so we retrieve the
338                // treestate of the finalized tip from the database and update it for the block
339                // being committed, assuming the retrieved treestate is the parent block's
340                // treestate. Later on, this function proves this assumption by asserting that the
341                // finalized tip is the parent block of the block being committed.
342
343                let block = checkpoint_verified.block.clone();
344                let mut history_tree = self.db.history_tree();
345                let prev_note_commitment_trees = prev_note_commitment_trees
346                    .unwrap_or_else(|| self.db.note_commitment_trees_for_tip());
347
348                // Update the note commitment trees.
349                let mut note_commitment_trees = prev_note_commitment_trees.clone();
350                note_commitment_trees.update_trees_parallel(&block)?;
351
352                // Check the block commitment if the history tree was not
353                // supplied by the non-finalized state. Note that we don't do
354                // this check for history trees supplied by the non-finalized
355                // state because the non-finalized state checks the block
356                // commitment.
357                //
358                // For Nu5-onward, the block hash commits only to
359                // non-authorizing data (see ZIP-244). This checks the
360                // authorizing data commitment, making sure the entire block
361                // contents were committed to. The test is done here (and not
362                // during semantic validation) because it needs the history tree
363                // root. While it _is_ checked during contextual validation,
364                // that is not called by the checkpoint verifier, and keeping a
365                // history tree there would be harder to implement.
366                //
367                // TODO: run this CPU-intensive cryptography in a parallel rayon
368                // thread, if it shows up in profiles
369                check::block_commitment_is_valid_for_chain_history(
370                    block.clone(),
371                    &self.network(),
372                    &history_tree,
373                )?;
374
375                // Update the history tree.
376                //
377                // TODO: run this CPU-intensive cryptography in a parallel rayon
378                // thread, if it shows up in profiles
379                let history_tree_mut = Arc::make_mut(&mut history_tree);
380                let sapling_root = note_commitment_trees.sapling.root();
381                let orchard_root = note_commitment_trees.orchard.root();
382                history_tree_mut.push(
383                    &self.network(),
384                    block.clone(),
385                    &sapling_root,
386                    &orchard_root,
387                )?;
388                let treestate = Treestate {
389                    note_commitment_trees,
390                    history_tree,
391                };
392
393                (
394                    checkpoint_verified.height,
395                    checkpoint_verified.hash,
396                    FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate),
397                    Some(prev_note_commitment_trees),
398                )
399            }
400            FinalizableBlock::Contextual {
401                contextually_verified,
402                treestate,
403            } => (
404                contextually_verified.height,
405                contextually_verified.hash,
406                FinalizedBlock::from_contextually_verified(contextually_verified, treestate),
407                prev_note_commitment_trees,
408            ),
409        };
410
411        let committed_tip_hash = self.db.finalized_tip_hash();
412        let committed_tip_height = self.db.finalized_tip_height();
413
414        // Assert that callers (including unit tests) get the chain order correct
415        if self.db.is_empty() {
416            assert_eq!(
417                committed_tip_hash, finalized.block.header.previous_block_hash,
418                "the first block added to an empty state must be a genesis block, source: {source}",
419            );
420            assert_eq!(
421                block::Height(0),
422                height,
423                "cannot commit genesis: invalid height, source: {source}",
424            );
425        } else {
426            assert_eq!(
427                committed_tip_height.expect("state must have a genesis block committed") + 1,
428                Some(height),
429                "committed block height must be 1 more than the finalized tip height, source: {source}",
430            );
431
432            assert_eq!(
433                committed_tip_hash, finalized.block.header.previous_block_hash,
434                "committed block must be a child of the finalized tip, source: {source}",
435            );
436        }
437
438        #[cfg(feature = "elasticsearch")]
439        let finalized_inner_block = finalized.block.clone();
440        let note_commitment_trees = finalized.treestate.note_commitment_trees.clone();
441
442        let result = self.db.write_block(
443            finalized,
444            prev_note_commitment_trees,
445            &self.network(),
446            source,
447        );
448
449        if result.is_ok() {
450            // Save blocks to elasticsearch if the feature is enabled.
451            #[cfg(feature = "elasticsearch")]
452            self.elasticsearch(&finalized_inner_block);
453
454            // TODO: move the stop height check to the syncer (#3442)
455            if self.is_at_stop_height(height) {
456                tracing::info!(
457                    ?height,
458                    ?hash,
459                    block_source = ?source,
460                    "stopping at configured height, flushing database to disk"
461                );
462
463                // We're just about to do a forced exit, so it's ok to do a forced db shutdown
464                self.db.shutdown(true);
465
466                // Drops tracing log output that's hasn't already been written to stdout
467                // since this exits before calling drop on the WorkerGuard for the logger thread.
468                // This is okay for now because this is test-only code
469                //
470                // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
471                Self::exit_process();
472            }
473        }
474
475        result.map(|hash| (hash, note_commitment_trees))
476    }
477
478    #[cfg(feature = "elasticsearch")]
479    /// Store finalized blocks into an elasticsearch database.
480    ///
481    /// We use the elasticsearch bulk api to index multiple blocks at a time while we are
482    /// synchronizing the chain, when we get close to tip we index blocks one by one.
483    pub fn elasticsearch(&mut self, block: &Arc<block::Block>) {
484        if let Some(client) = self.elastic_db.clone() {
485            let block_time = block.header.time.timestamp();
486            let local_time = chrono::Utc::now().timestamp();
487
488            // Bulk size is small enough to avoid the elasticsearch 100mb content length limitation.
489            // MAX_BLOCK_BYTES = 2MB but each block use around 4.1 MB of JSON.
490            // Each block count as 2 as we send them with a operation/header line. A value of 48
491            // is 24 blocks.
492            const AWAY_FROM_TIP_BULK_SIZE: usize = 48;
493
494            // The number of blocks the bulk will have when we are in sync.
495            // A value of 2 means only 1 block as we want to insert them as soon as we get
496            // them for a real time experience. This is the same for mainnet and testnet.
497            const CLOSE_TO_TIP_BULK_SIZE: usize = 2;
498
499            // We consider in sync when the local time and the blockchain time difference is
500            // less than this number of seconds.
501            const CLOSE_TO_TIP_SECONDS: i64 = 14400; // 4 hours
502
503            let mut blocks_size_to_dump = AWAY_FROM_TIP_BULK_SIZE;
504
505            // If we are close to the tip, index one block per bulk call.
506            if local_time - block_time < CLOSE_TO_TIP_SECONDS {
507                blocks_size_to_dump = CLOSE_TO_TIP_BULK_SIZE;
508            }
509
510            // Insert the operation line.
511            let height_number = block.coinbase_height().unwrap_or(block::Height(0)).0;
512            self.elastic_blocks.push(
513                serde_json::json!({
514                    "index": {
515                        "_id": height_number.to_string().as_str()
516                    }
517                })
518                .to_string(),
519            );
520
521            // Insert the block itself.
522            self.elastic_blocks
523                .push(serde_json::json!(block).to_string());
524
525            // We are in bulk time, insert to ES all we have.
526            if self.elastic_blocks.len() >= blocks_size_to_dump {
527                let rt = tokio::runtime::Runtime::new()
528                    .expect("runtime creation for elasticsearch should not fail.");
529                let blocks = self.elastic_blocks.clone();
530                let network = self.network();
531
532                rt.block_on(async move {
533                    // Send a ping to the server to check if it is available before inserting.
534                    if client.ping().send().await.is_err() {
535                        tracing::error!("Elasticsearch is not available, skipping block indexing");
536                        return;
537                    }
538
539                    let response = client
540                        .bulk(elasticsearch::BulkParts::Index(
541                            format!("zcash_{}", network.to_string().to_lowercase()).as_str(),
542                        ))
543                        .body(blocks)
544                        .send()
545                        .await
546                        .expect("ES Request should never fail");
547
548                    // Make sure no errors ever.
549                    let response_body = response
550                        .json::<serde_json::Value>()
551                        .await
552                        .expect("ES response parsing error. Maybe we are sending more than 100 mb of data (`http.max_content_length`)");
553                    let errors = response_body["errors"].as_bool().unwrap_or(true);
554                    assert!(!errors, "{}", format!("ES error: {response_body}"));
555                });
556
557                // Clean the block storage.
558                self.elastic_blocks.clear();
559            }
560        }
561    }
562
563    /// Stop the process if `block_height` is greater than or equal to the
564    /// configured stop height.
565    fn is_at_stop_height(&self, block_height: block::Height) -> bool {
566        let debug_stop_at_height = match self.debug_stop_at_height {
567            Some(debug_stop_at_height) => debug_stop_at_height,
568            None => return false,
569        };
570
571        if block_height < debug_stop_at_height {
572            return false;
573        }
574
575        true
576    }
577
578    /// Exit the host process.
579    ///
580    /// Designed for debugging and tests.
581    ///
582    /// TODO: move the stop height check to the syncer (#3442)
583    fn exit_process() -> ! {
584        tracing::info!("exiting Zebra");
585
586        // Some OSes require a flush to send all output to the terminal.
587        // Zebra's logging doesn't depend on `tokio`, so we flush the stdlib sync streams.
588        //
589        // TODO: if this doesn't work, send an empty line as well.
590        let _ = stdout().lock().flush();
591        let _ = stderr().lock().flush();
592
593        // Give some time to logger thread to flush out any remaining lines to stdout
594        // and yield so that tests pass on MacOS
595        std::thread::sleep(std::time::Duration::from_secs(3));
596
597        // Exits before calling drop on the WorkerGuard for the logger thread,
598        // dropping any lines that haven't already been written to stdout.
599        // This is okay for now because this is test-only code
600        std::process::exit(0);
601    }
602}