zebra_state/service/finalized_state/zebra_db.rs
1//! Provides high-level access to the database using [`zebra_chain`] types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction, and
5//! - format-specific invariants are maintained.
6//!
7//! # Correctness
8//!
9//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
10//! each time the database format (column, serialization, etc) changes.
11
12use std::{path::Path, sync::Arc};
13
14use crossbeam_channel::bounded;
15use semver::Version;
16
17use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
18
19use crate::{
20 config::database_format_version_on_disk,
21 service::finalized_state::{
22 disk_db::DiskDb,
23 disk_format::{
24 block::MAX_ON_DISK_HEIGHT,
25 upgrade::{DbFormatChange, DbFormatChangeThreadHandle},
26 },
27 },
28 write_database_format_version_to_disk, BoxError, Config,
29};
30
31use super::disk_format::upgrade::restorable_db_versions;
32
33pub mod block;
34pub mod chain;
35pub mod metrics;
36pub mod shielded;
37pub mod transparent;
38
39#[cfg(any(test, feature = "proptest-impl"))]
40// TODO: when the database is split out of zebra-state, always expose these methods.
41pub mod arbitrary;
42
43/// Wrapper struct to ensure high-level `zebra-state` database access goes through the correct API.
44///
45/// `rocksdb` allows concurrent writes through a shared reference,
46/// so database instances are cloneable. When the final clone is dropped,
47/// the database is closed.
48#[derive(Clone, Debug)]
49pub struct ZebraDb {
50 // Configuration
51 //
52 // This configuration cannot be modified after the database is initialized,
53 // because some clones would have different values.
54 //
55 /// The configuration for the database.
56 //
57 // TODO: move the config to DiskDb
58 config: Arc<Config>,
59
60 /// Should format upgrades and format checks be skipped for this instance?
61 /// Only used in test code.
62 //
63 // TODO: move this to DiskDb
64 debug_skip_format_upgrades: bool,
65
66 // Owned State
67 //
68 // Everything contained in this state must be shared by all clones, or read-only.
69 //
70 /// A handle to a running format change task, which cancels the task when dropped.
71 ///
72 /// # Concurrency
73 ///
74 /// This field should be dropped before the database field, so the format upgrade task is
75 /// cancelled before the database is dropped. This helps avoid some kinds of deadlocks.
76 //
77 // TODO: move the generic upgrade code and fields to DiskDb
78 format_change_handle: Option<DbFormatChangeThreadHandle>,
79
80 /// The inner low-level database wrapper for the RocksDB database.
81 db: DiskDb,
82}
83
84impl ZebraDb {
85 /// Opens or creates the database at a path based on the kind, major version and network,
86 /// with the supplied column families, preserving any existing column families,
87 /// and returns a shared high-level typed database wrapper.
88 ///
89 /// If `debug_skip_format_upgrades` is true, don't do any format upgrades or format checks.
90 /// This argument is only used when running tests, it is ignored in production code.
91 //
92 // TODO: rename to StateDb and remove the db_kind and column_families_in_code arguments
93 pub fn new(
94 config: &Config,
95 db_kind: impl AsRef<str>,
96 format_version_in_code: &Version,
97 network: &Network,
98 debug_skip_format_upgrades: bool,
99 column_families_in_code: impl IntoIterator<Item = String>,
100 read_only: bool,
101 ) -> ZebraDb {
102 let disk_version = DiskDb::try_reusing_previous_db_after_major_upgrade(
103 &restorable_db_versions(),
104 format_version_in_code,
105 config,
106 &db_kind,
107 network,
108 )
109 .or_else(|| {
110 database_format_version_on_disk(config, &db_kind, format_version_in_code.major, network)
111 .expect("unable to read database format version file")
112 });
113
114 // Log any format changes before opening the database, in case opening fails.
115 let format_change = DbFormatChange::open_database(format_version_in_code, disk_version);
116
117 // Format upgrades try to write to the database, so we always skip them
118 // if `read_only` is `true`.
119 //
120 // We also allow skipping them when we are running tests.
121 let debug_skip_format_upgrades = read_only || (cfg!(test) && debug_skip_format_upgrades);
122
123 // Open the database and do initial checks.
124 let mut db = ZebraDb {
125 config: Arc::new(config.clone()),
126 debug_skip_format_upgrades,
127 format_change_handle: None,
128 // After the database directory is created, a newly created database temporarily
129 // changes to the default database version. Then we set the correct version in the
130 // upgrade thread. We need to do the version change in this order, because the version
131 // file can only be changed while we hold the RocksDB database lock.
132 db: DiskDb::new(
133 config,
134 db_kind,
135 format_version_in_code,
136 network,
137 column_families_in_code,
138 read_only,
139 ),
140 };
141
142 db.spawn_format_change(format_change);
143
144 db
145 }
146
147 /// Launch any required format changes or format checks, and store their thread handle.
148 pub fn spawn_format_change(&mut self, format_change: DbFormatChange) {
149 if self.debug_skip_format_upgrades {
150 return;
151 }
152
153 // We have to get this height before we spawn the upgrade task, because threads can take
154 // a while to start, and new blocks can be committed as soon as we return from this method.
155 let initial_tip_height = self.finalized_tip_height();
156
157 // `upgrade_db` is a special clone of this database, which can't be used to shut down
158 // the upgrade task. (Because the task hasn't been launched yet,
159 // its `db.format_change_handle` is always None.)
160 let upgrade_db = self.clone();
161
162 // TODO:
163 // - should debug_stop_at_height wait for the upgrade task to finish?
164 let format_change_handle =
165 format_change.spawn_format_change(upgrade_db, initial_tip_height);
166
167 self.format_change_handle = Some(format_change_handle);
168 }
169
170 /// Returns config for this database.
171 pub fn config(&self) -> &Config {
172 &self.config
173 }
174
175 /// Returns the configured database kind for this database.
176 pub fn db_kind(&self) -> String {
177 self.db.db_kind()
178 }
179
180 /// Returns the format version of the running code that created this `ZebraDb` instance in memory.
181 pub fn format_version_in_code(&self) -> Version {
182 self.db.format_version_in_code()
183 }
184
185 /// Returns the fixed major version for this database.
186 pub fn major_version(&self) -> u64 {
187 self.db.major_version()
188 }
189
190 /// Returns the format version of this database on disk.
191 ///
192 /// See `database_format_version_on_disk()` for details.
193 pub fn format_version_on_disk(&self) -> Result<Option<Version>, BoxError> {
194 database_format_version_on_disk(
195 self.config(),
196 self.db_kind(),
197 self.major_version(),
198 &self.network(),
199 )
200 }
201
202 /// Updates the format of this database on disk to the suppled version.
203 ///
204 /// See `write_database_format_version_to_disk()` for details.
205 pub(crate) fn update_format_version_on_disk(
206 &self,
207 new_version: &Version,
208 ) -> Result<(), BoxError> {
209 write_database_format_version_to_disk(
210 self.config(),
211 self.db_kind(),
212 self.major_version(),
213 new_version,
214 &self.network(),
215 )
216 }
217
218 /// Returns the configured network for this database.
219 pub fn network(&self) -> Network {
220 self.db.network()
221 }
222
223 /// Returns the `Path` where the files used by this database are located.
224 pub fn path(&self) -> &Path {
225 self.db.path()
226 }
227
228 /// Check for panics in code running in spawned threads.
229 /// If a thread exited with a panic, resume that panic.
230 ///
231 /// This method should be called regularly, so that panics are detected as soon as possible.
232 pub fn check_for_panics(&mut self) {
233 if let Some(format_change_handle) = self.format_change_handle.as_mut() {
234 format_change_handle.check_for_panics();
235 }
236 }
237
238 /// When called with a secondary DB instance, tries to catch up with the primary DB instance
239 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
240 self.db.try_catch_up_with_primary()
241 }
242
243 /// Spawns a blocking task to try catching up with the primary DB instance.
244 pub async fn spawn_try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
245 let db = self.clone();
246 tokio::task::spawn_blocking(move || {
247 let result = db.try_catch_up_with_primary();
248 if let Err(catch_up_error) = &result {
249 tracing::warn!(?catch_up_error, "failed to catch up to primary");
250 }
251 result
252 })
253 .wait_for_panics()
254 .await
255 }
256
257 /// Shut down the database, cleaning up background tasks and ephemeral data.
258 ///
259 /// If `force` is true, clean up regardless of any shared references.
260 /// `force` can cause errors accessing the database from other shared references.
261 /// It should only be used in debugging or test code, immediately before a manual shutdown.
262 ///
263 /// See [`DiskDb::shutdown`] for details.
264 pub fn shutdown(&mut self, force: bool) {
265 // Are we shutting down the underlying database instance?
266 let is_shutdown = force || self.db.shared_database_owners() <= 1;
267
268 // # Concurrency
269 //
270 // The format upgrade task should be cancelled before the database is flushed or shut down.
271 // This helps avoid some kinds of deadlocks.
272 //
273 // See also the correctness note in `DiskDb::shutdown()`.
274 if !self.debug_skip_format_upgrades && is_shutdown {
275 if let Some(format_change_handle) = self.format_change_handle.as_mut() {
276 format_change_handle.force_cancel();
277 }
278
279 // # Correctness
280 //
281 // Check that the database format is correct before shutting down.
282 // This lets users know to delete and re-sync their database immediately,
283 // rather than surprising them next time Zebra starts up.
284 //
285 // # Testinng
286 //
287 // In Zebra's CI, panicking here stops us writing invalid cached states,
288 // which would then make unrelated PRs fail when Zebra starts up.
289
290 // If the upgrade has completed, or we've done a downgrade, check the state is valid.
291 let disk_version = database_format_version_on_disk(
292 &self.config,
293 self.db_kind(),
294 self.major_version(),
295 &self.network(),
296 )
297 .expect("unexpected invalid or unreadable database version file");
298
299 if let Some(disk_version) = disk_version {
300 // We need to keep the cancel handle until the format check has finished,
301 // because dropping it cancels the format check.
302 let (_never_cancel_handle, never_cancel_receiver) = bounded(1);
303
304 // We block here because the checks are quick and database validity is
305 // consensus-critical.
306 if disk_version >= self.db.format_version_in_code() {
307 DbFormatChange::check_new_blocks(self)
308 .run_format_change_or_check(
309 self,
310 // The initial tip height is not used by the new blocks format check.
311 None,
312 &never_cancel_receiver,
313 )
314 .expect("cancel handle is never used");
315 }
316 }
317 }
318
319 self.check_for_panics();
320
321 self.db.shutdown(force);
322 }
323
324 /// Check that the on-disk height is well below the maximum supported database height.
325 ///
326 /// Zebra only supports on-disk heights up to 3 bytes.
327 ///
328 /// # Logs an Error
329 ///
330 /// If Zebra is storing block heights that are close to [`MAX_ON_DISK_HEIGHT`].
331 pub(crate) fn check_max_on_disk_tip_height(&self) -> Result<(), String> {
332 if let Some((tip_height, tip_hash)) = self.tip() {
333 if tip_height.0 > MAX_ON_DISK_HEIGHT.0 / 2 {
334 let err = Err(format!(
335 "unexpectedly large tip height, database format upgrade required: \
336 tip height: {tip_height:?}, tip hash: {tip_hash:?}, \
337 max height: {MAX_ON_DISK_HEIGHT:?}"
338 ));
339 error!(?err);
340 return err;
341 }
342 }
343
344 Ok(())
345 }
346
347 /// Logs metrics related to the underlying RocksDB instance.
348 ///
349 /// This function prints various metrics and statistics about the RocksDB database,
350 /// such as disk usage, memory usage, and other performance-related metrics.
351 pub fn print_db_metrics(&self) {
352 self.db.print_db_metrics();
353 }
354
355 /// Returns the estimated total disk space usage of the database.
356 pub fn size(&self) -> u64 {
357 self.db.size()
358 }
359}
360
361impl Drop for ZebraDb {
362 fn drop(&mut self) {
363 self.shutdown(false);
364 }
365}