zebra_state/service/finalized_state/
zebra_db.rs

1//! Provides high-level access to the database using [`zebra_chain`] types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction, and
5//! - format-specific invariants are maintained.
6//!
7//! # Correctness
8//!
9//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
10//! each time the database format (column, serialization, etc) changes.
11
12use std::{path::Path, sync::Arc};
13
14use crossbeam_channel::bounded;
15use semver::Version;
16
17use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
18
19use crate::{
20    config::database_format_version_on_disk,
21    service::finalized_state::{
22        disk_db::DiskDb,
23        disk_format::{
24            block::MAX_ON_DISK_HEIGHT,
25            upgrade::{DbFormatChange, DbFormatChangeThreadHandle},
26        },
27    },
28    write_database_format_version_to_disk, BoxError, Config,
29};
30
31use super::disk_format::upgrade::restorable_db_versions;
32
33pub mod block;
34pub mod chain;
35pub mod metrics;
36pub mod shielded;
37pub mod transparent;
38
39#[cfg(any(test, feature = "proptest-impl"))]
40// TODO: when the database is split out of zebra-state, always expose these methods.
41pub mod arbitrary;
42
43/// Wrapper struct to ensure high-level `zebra-state` database access goes through the correct API.
44///
45/// `rocksdb` allows concurrent writes through a shared reference,
46/// so database instances are cloneable. When the final clone is dropped,
47/// the database is closed.
48#[derive(Clone, Debug)]
49pub struct ZebraDb {
50    // Configuration
51    //
52    // This configuration cannot be modified after the database is initialized,
53    // because some clones would have different values.
54    //
55    /// The configuration for the database.
56    //
57    // TODO: move the config to DiskDb
58    config: Arc<Config>,
59
60    /// Should format upgrades and format checks be skipped for this instance?
61    /// Only used in test code.
62    //
63    // TODO: move this to DiskDb
64    debug_skip_format_upgrades: bool,
65
66    // Owned State
67    //
68    // Everything contained in this state must be shared by all clones, or read-only.
69    //
70    /// A handle to a running format change task, which cancels the task when dropped.
71    ///
72    /// # Concurrency
73    ///
74    /// This field should be dropped before the database field, so the format upgrade task is
75    /// cancelled before the database is dropped. This helps avoid some kinds of deadlocks.
76    //
77    // TODO: move the generic upgrade code and fields to DiskDb
78    format_change_handle: Option<DbFormatChangeThreadHandle>,
79
80    /// The inner low-level database wrapper for the RocksDB database.
81    db: DiskDb,
82}
83
84impl ZebraDb {
85    /// Opens or creates the database at a path based on the kind, major version and network,
86    /// with the supplied column families, preserving any existing column families,
87    /// and returns a shared high-level typed database wrapper.
88    ///
89    /// If `debug_skip_format_upgrades` is true, don't do any format upgrades or format checks.
90    /// This argument is only used when running tests, it is ignored in production code.
91    //
92    // TODO: rename to StateDb and remove the db_kind and column_families_in_code arguments
93    pub fn new(
94        config: &Config,
95        db_kind: impl AsRef<str>,
96        format_version_in_code: &Version,
97        network: &Network,
98        debug_skip_format_upgrades: bool,
99        column_families_in_code: impl IntoIterator<Item = String>,
100        read_only: bool,
101    ) -> ZebraDb {
102        let disk_version = DiskDb::try_reusing_previous_db_after_major_upgrade(
103            &restorable_db_versions(),
104            format_version_in_code,
105            config,
106            &db_kind,
107            network,
108        )
109        .or_else(|| {
110            database_format_version_on_disk(config, &db_kind, format_version_in_code.major, network)
111                .expect("unable to read database format version file")
112        });
113
114        // Log any format changes before opening the database, in case opening fails.
115        let format_change = DbFormatChange::open_database(format_version_in_code, disk_version);
116
117        // Format upgrades try to write to the database, so we always skip them
118        // if `read_only` is `true`.
119        //
120        // We also allow skipping them when we are running tests.
121        let debug_skip_format_upgrades = read_only || (cfg!(test) && debug_skip_format_upgrades);
122
123        // Open the database and do initial checks.
124        let mut db = ZebraDb {
125            config: Arc::new(config.clone()),
126            debug_skip_format_upgrades,
127            format_change_handle: None,
128            // After the database directory is created, a newly created database temporarily
129            // changes to the default database version. Then we set the correct version in the
130            // upgrade thread. We need to do the version change in this order, because the version
131            // file can only be changed while we hold the RocksDB database lock.
132            db: DiskDb::new(
133                config,
134                db_kind,
135                format_version_in_code,
136                network,
137                column_families_in_code,
138                read_only,
139            ),
140        };
141
142        db.spawn_format_change(format_change);
143
144        db
145    }
146
147    /// Launch any required format changes or format checks, and store their thread handle.
148    pub fn spawn_format_change(&mut self, format_change: DbFormatChange) {
149        if self.debug_skip_format_upgrades {
150            return;
151        }
152
153        // We have to get this height before we spawn the upgrade task, because threads can take
154        // a while to start, and new blocks can be committed as soon as we return from this method.
155        let initial_tip_height = self.finalized_tip_height();
156
157        // `upgrade_db` is a special clone of this database, which can't be used to shut down
158        // the upgrade task. (Because the task hasn't been launched yet,
159        // its `db.format_change_handle` is always None.)
160        let upgrade_db = self.clone();
161
162        // TODO:
163        // - should debug_stop_at_height wait for the upgrade task to finish?
164        let format_change_handle =
165            format_change.spawn_format_change(upgrade_db, initial_tip_height);
166
167        self.format_change_handle = Some(format_change_handle);
168    }
169
170    /// Returns config for this database.
171    pub fn config(&self) -> &Config {
172        &self.config
173    }
174
175    /// Returns the configured database kind for this database.
176    pub fn db_kind(&self) -> String {
177        self.db.db_kind()
178    }
179
180    /// Returns the format version of the running code that created this `ZebraDb` instance in memory.
181    pub fn format_version_in_code(&self) -> Version {
182        self.db.format_version_in_code()
183    }
184
185    /// Returns the fixed major version for this database.
186    pub fn major_version(&self) -> u64 {
187        self.db.major_version()
188    }
189
190    /// Returns the format version of this database on disk.
191    ///
192    /// See `database_format_version_on_disk()` for details.
193    pub fn format_version_on_disk(&self) -> Result<Option<Version>, BoxError> {
194        database_format_version_on_disk(
195            self.config(),
196            self.db_kind(),
197            self.major_version(),
198            &self.network(),
199        )
200    }
201
202    /// Updates the format of this database on disk to the suppled version.
203    ///
204    /// See `write_database_format_version_to_disk()` for details.
205    pub(crate) fn update_format_version_on_disk(
206        &self,
207        new_version: &Version,
208    ) -> Result<(), BoxError> {
209        write_database_format_version_to_disk(
210            self.config(),
211            self.db_kind(),
212            self.major_version(),
213            new_version,
214            &self.network(),
215        )
216    }
217
218    /// Returns the configured network for this database.
219    pub fn network(&self) -> Network {
220        self.db.network()
221    }
222
223    /// Returns the `Path` where the files used by this database are located.
224    pub fn path(&self) -> &Path {
225        self.db.path()
226    }
227
228    /// Check for panics in code running in spawned threads.
229    /// If a thread exited with a panic, resume that panic.
230    ///
231    /// This method should be called regularly, so that panics are detected as soon as possible.
232    pub fn check_for_panics(&mut self) {
233        if let Some(format_change_handle) = self.format_change_handle.as_mut() {
234            format_change_handle.check_for_panics();
235        }
236    }
237
238    /// When called with a secondary DB instance, tries to catch up with the primary DB instance
239    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
240        self.db.try_catch_up_with_primary()
241    }
242
243    /// Spawns a blocking task to try catching up with the primary DB instance.
244    pub async fn spawn_try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
245        let db = self.clone();
246        tokio::task::spawn_blocking(move || {
247            let result = db.try_catch_up_with_primary();
248            if let Err(catch_up_error) = &result {
249                tracing::warn!(?catch_up_error, "failed to catch up to primary");
250            }
251            result
252        })
253        .wait_for_panics()
254        .await
255    }
256
257    /// Shut down the database, cleaning up background tasks and ephemeral data.
258    ///
259    /// If `force` is true, clean up regardless of any shared references.
260    /// `force` can cause errors accessing the database from other shared references.
261    /// It should only be used in debugging or test code, immediately before a manual shutdown.
262    ///
263    /// See [`DiskDb::shutdown`] for details.
264    pub fn shutdown(&mut self, force: bool) {
265        // Are we shutting down the underlying database instance?
266        let is_shutdown = force || self.db.shared_database_owners() <= 1;
267
268        // # Concurrency
269        //
270        // The format upgrade task should be cancelled before the database is flushed or shut down.
271        // This helps avoid some kinds of deadlocks.
272        //
273        // See also the correctness note in `DiskDb::shutdown()`.
274        if !self.debug_skip_format_upgrades && is_shutdown {
275            if let Some(format_change_handle) = self.format_change_handle.as_mut() {
276                format_change_handle.force_cancel();
277            }
278
279            // # Correctness
280            //
281            // Check that the database format is correct before shutting down.
282            // This lets users know to delete and re-sync their database immediately,
283            // rather than surprising them next time Zebra starts up.
284            //
285            // # Testinng
286            //
287            // In Zebra's CI, panicking here stops us writing invalid cached states,
288            // which would then make unrelated PRs fail when Zebra starts up.
289
290            // If the upgrade has completed, or we've done a downgrade, check the state is valid.
291            let disk_version = database_format_version_on_disk(
292                &self.config,
293                self.db_kind(),
294                self.major_version(),
295                &self.network(),
296            )
297            .expect("unexpected invalid or unreadable database version file");
298
299            if let Some(disk_version) = disk_version {
300                // We need to keep the cancel handle until the format check has finished,
301                // because dropping it cancels the format check.
302                let (_never_cancel_handle, never_cancel_receiver) = bounded(1);
303
304                // We block here because the checks are quick and database validity is
305                // consensus-critical.
306                if disk_version >= self.db.format_version_in_code() {
307                    DbFormatChange::check_new_blocks(self)
308                        .run_format_change_or_check(
309                            self,
310                            // The initial tip height is not used by the new blocks format check.
311                            None,
312                            &never_cancel_receiver,
313                        )
314                        .expect("cancel handle is never used");
315                }
316            }
317        }
318
319        self.check_for_panics();
320
321        self.db.shutdown(force);
322    }
323
324    /// Check that the on-disk height is well below the maximum supported database height.
325    ///
326    /// Zebra only supports on-disk heights up to 3 bytes.
327    ///
328    /// # Logs an Error
329    ///
330    /// If Zebra is storing block heights that are close to [`MAX_ON_DISK_HEIGHT`].
331    pub(crate) fn check_max_on_disk_tip_height(&self) -> Result<(), String> {
332        if let Some((tip_height, tip_hash)) = self.tip() {
333            if tip_height.0 > MAX_ON_DISK_HEIGHT.0 / 2 {
334                let err = Err(format!(
335                    "unexpectedly large tip height, database format upgrade required: \
336                     tip height: {tip_height:?}, tip hash: {tip_hash:?}, \
337                     max height: {MAX_ON_DISK_HEIGHT:?}"
338                ));
339                error!(?err);
340                return err;
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Logs metrics related to the underlying RocksDB instance.
348    ///
349    /// This function prints various metrics and statistics about the RocksDB database,
350    /// such as disk usage, memory usage, and other performance-related metrics.
351    pub fn print_db_metrics(&self) {
352        self.db.print_db_metrics();
353    }
354
355    /// Returns the estimated total disk space usage of the database.
356    pub fn size(&self) -> u64 {
357        self.db.size()
358    }
359}
360
361impl Drop for ZebraDb {
362    fn drop(&mut self) {
363        self.shutdown(false);
364    }
365}