zebra_state/service/finalized_state/zebra_db.rs
1//! Provides high-level access to the database using [`zebra_chain`] types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction, and
5//! - format-specific invariants are maintained.
6//!
7//! # Correctness
8//!
9//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
10//! each time the database format (column, serialization, etc) changes.
11
12use std::{path::Path, sync::Arc};
13
14use crossbeam_channel::bounded;
15use semver::Version;
16
17use zebra_chain::{block::Height, diagnostic::task::WaitForPanics, parameters::Network};
18
19use crate::{
20 config::database_format_version_on_disk,
21 service::finalized_state::{
22 disk_db::DiskDb,
23 disk_format::{
24 block::MAX_ON_DISK_HEIGHT,
25 transparent::AddressLocation,
26 upgrade::{DbFormatChange, DbFormatChangeThreadHandle},
27 },
28 },
29 write_database_format_version_to_disk, BoxError, Config,
30};
31
32use super::disk_format::upgrade::restorable_db_versions;
33
34pub mod block;
35pub mod chain;
36pub mod metrics;
37pub mod shielded;
38pub mod transparent;
39
40#[cfg(any(test, feature = "proptest-impl"))]
41// TODO: when the database is split out of zebra-state, always expose these methods.
42pub mod arbitrary;
43
44/// Wrapper struct to ensure high-level `zebra-state` database access goes through the correct API.
45///
46/// `rocksdb` allows concurrent writes through a shared reference,
47/// so database instances are cloneable. When the final clone is dropped,
48/// the database is closed.
49#[derive(Clone, Debug)]
50pub struct ZebraDb {
51 // Configuration
52 //
53 // This configuration cannot be modified after the database is initialized,
54 // because some clones would have different values.
55 //
56 /// The configuration for the database.
57 //
58 // TODO: move the config to DiskDb
59 config: Arc<Config>,
60
61 /// Should format upgrades and format checks be skipped for this instance?
62 /// Only used in test code.
63 //
64 // TODO: move this to DiskDb
65 debug_skip_format_upgrades: bool,
66
67 // Owned State
68 //
69 // Everything contained in this state must be shared by all clones, or read-only.
70 //
71 /// A handle to a running format change task, which cancels the task when dropped.
72 ///
73 /// # Concurrency
74 ///
75 /// This field should be dropped before the database field, so the format upgrade task is
76 /// cancelled before the database is dropped. This helps avoid some kinds of deadlocks.
77 //
78 // TODO: move the generic upgrade code and fields to DiskDb
79 format_change_handle: Option<DbFormatChangeThreadHandle>,
80
81 /// The inner low-level database wrapper for the RocksDB database.
82 db: DiskDb,
83}
84
85impl ZebraDb {
86 /// Opens or creates the database at a path based on the kind, major version and network,
87 /// with the supplied column families, preserving any existing column families,
88 /// and returns a shared high-level typed database wrapper.
89 ///
90 /// If `debug_skip_format_upgrades` is true, don't do any format upgrades or format checks.
91 /// This argument is only used when running tests, it is ignored in production code.
92 //
93 // TODO: rename to StateDb and remove the db_kind and column_families_in_code arguments
94 pub fn new(
95 config: &Config,
96 db_kind: impl AsRef<str>,
97 format_version_in_code: &Version,
98 network: &Network,
99 debug_skip_format_upgrades: bool,
100 column_families_in_code: impl IntoIterator<Item = String>,
101 read_only: bool,
102 ) -> ZebraDb {
103 let disk_version = DiskDb::try_reusing_previous_db_after_major_upgrade(
104 &restorable_db_versions(),
105 format_version_in_code,
106 config,
107 &db_kind,
108 network,
109 )
110 .or_else(|| {
111 database_format_version_on_disk(config, &db_kind, format_version_in_code.major, network)
112 .expect("unable to read database format version file")
113 });
114
115 // Log any format changes before opening the database, in case opening fails.
116 let format_change = DbFormatChange::open_database(format_version_in_code, disk_version);
117
118 // Format upgrades try to write to the database, so we always skip them
119 // if `read_only` is `true`.
120 //
121 // We also allow skipping them when we are running tests.
122 let debug_skip_format_upgrades = read_only || (cfg!(test) && debug_skip_format_upgrades);
123
124 // Open the database and do initial checks.
125 let mut db = ZebraDb {
126 config: Arc::new(config.clone()),
127 debug_skip_format_upgrades,
128 format_change_handle: None,
129 // After the database directory is created, a newly created database temporarily
130 // changes to the default database version. Then we set the correct version in the
131 // upgrade thread. We need to do the version change in this order, because the version
132 // file can only be changed while we hold the RocksDB database lock.
133 db: DiskDb::new(
134 config,
135 db_kind,
136 format_version_in_code,
137 network,
138 column_families_in_code,
139 read_only,
140 ),
141 };
142
143 let zero_location_utxos =
144 db.address_utxo_locations(AddressLocation::from_usize(Height(0), 0, 0));
145 if !zero_location_utxos.is_empty() {
146 warn!(
147 "You have been impacted by the Zebra 2.4.0 address indexer corruption bug. \
148 If you rely on the data from the RPC interface, you will need to recover your database. \
149 Follow the instructions in the 2.4.1 release notes: https://github.com/ZcashFoundation/zebra/releases/tag/v2.4.1 \
150 If you just run the node for consensus and don't use data from the RPC interface, you can ignore this warning."
151 )
152 }
153
154 db.spawn_format_change(format_change);
155
156 db
157 }
158
159 /// Launch any required format changes or format checks, and store their thread handle.
160 pub fn spawn_format_change(&mut self, format_change: DbFormatChange) {
161 if self.debug_skip_format_upgrades {
162 return;
163 }
164
165 // We have to get this height before we spawn the upgrade task, because threads can take
166 // a while to start, and new blocks can be committed as soon as we return from this method.
167 let initial_tip_height = self.finalized_tip_height();
168
169 // `upgrade_db` is a special clone of this database, which can't be used to shut down
170 // the upgrade task. (Because the task hasn't been launched yet,
171 // its `db.format_change_handle` is always None.)
172 let upgrade_db = self.clone();
173
174 // TODO:
175 // - should debug_stop_at_height wait for the upgrade task to finish?
176 let format_change_handle =
177 format_change.spawn_format_change(upgrade_db, initial_tip_height);
178
179 self.format_change_handle = Some(format_change_handle);
180 }
181
182 /// Returns config for this database.
183 pub fn config(&self) -> &Config {
184 &self.config
185 }
186
187 /// Returns the configured database kind for this database.
188 pub fn db_kind(&self) -> String {
189 self.db.db_kind()
190 }
191
192 /// Returns the format version of the running code that created this `ZebraDb` instance in memory.
193 pub fn format_version_in_code(&self) -> Version {
194 self.db.format_version_in_code()
195 }
196
197 /// Returns the fixed major version for this database.
198 pub fn major_version(&self) -> u64 {
199 self.db.major_version()
200 }
201
202 /// Returns the format version of this database on disk.
203 ///
204 /// See `database_format_version_on_disk()` for details.
205 pub fn format_version_on_disk(&self) -> Result<Option<Version>, BoxError> {
206 database_format_version_on_disk(
207 self.config(),
208 self.db_kind(),
209 self.major_version(),
210 &self.network(),
211 )
212 }
213
214 /// Updates the format of this database on disk to the suppled version.
215 ///
216 /// See `write_database_format_version_to_disk()` for details.
217 pub(crate) fn update_format_version_on_disk(
218 &self,
219 new_version: &Version,
220 ) -> Result<(), BoxError> {
221 write_database_format_version_to_disk(
222 self.config(),
223 self.db_kind(),
224 self.major_version(),
225 new_version,
226 &self.network(),
227 )
228 }
229
230 /// Returns the configured network for this database.
231 pub fn network(&self) -> Network {
232 self.db.network()
233 }
234
235 /// Returns the `Path` where the files used by this database are located.
236 pub fn path(&self) -> &Path {
237 self.db.path()
238 }
239
240 /// Check for panics in code running in spawned threads.
241 /// If a thread exited with a panic, resume that panic.
242 ///
243 /// This method should be called regularly, so that panics are detected as soon as possible.
244 pub fn check_for_panics(&mut self) {
245 if let Some(format_change_handle) = self.format_change_handle.as_mut() {
246 format_change_handle.check_for_panics();
247 }
248 }
249
250 /// When called with a secondary DB instance, tries to catch up with the primary DB instance
251 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
252 self.db.try_catch_up_with_primary()
253 }
254
255 /// Spawns a blocking task to try catching up with the primary DB instance.
256 pub async fn spawn_try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
257 let db = self.clone();
258 tokio::task::spawn_blocking(move || {
259 let result = db.try_catch_up_with_primary();
260 if let Err(catch_up_error) = &result {
261 tracing::warn!(?catch_up_error, "failed to catch up to primary");
262 }
263 result
264 })
265 .wait_for_panics()
266 .await
267 }
268
269 /// Shut down the database, cleaning up background tasks and ephemeral data.
270 ///
271 /// If `force` is true, clean up regardless of any shared references.
272 /// `force` can cause errors accessing the database from other shared references.
273 /// It should only be used in debugging or test code, immediately before a manual shutdown.
274 ///
275 /// See [`DiskDb::shutdown`] for details.
276 pub fn shutdown(&mut self, force: bool) {
277 // Are we shutting down the underlying database instance?
278 let is_shutdown = force || self.db.shared_database_owners() <= 1;
279
280 // # Concurrency
281 //
282 // The format upgrade task should be cancelled before the database is flushed or shut down.
283 // This helps avoid some kinds of deadlocks.
284 //
285 // See also the correctness note in `DiskDb::shutdown()`.
286 if !self.debug_skip_format_upgrades && is_shutdown {
287 if let Some(format_change_handle) = self.format_change_handle.as_mut() {
288 format_change_handle.force_cancel();
289 }
290
291 // # Correctness
292 //
293 // Check that the database format is correct before shutting down.
294 // This lets users know to delete and re-sync their database immediately,
295 // rather than surprising them next time Zebra starts up.
296 //
297 // # Testinng
298 //
299 // In Zebra's CI, panicking here stops us writing invalid cached states,
300 // which would then make unrelated PRs fail when Zebra starts up.
301
302 // If the upgrade has completed, or we've done a downgrade, check the state is valid.
303 let disk_version = database_format_version_on_disk(
304 &self.config,
305 self.db_kind(),
306 self.major_version(),
307 &self.network(),
308 )
309 .expect("unexpected invalid or unreadable database version file");
310
311 if let Some(disk_version) = disk_version {
312 // We need to keep the cancel handle until the format check has finished,
313 // because dropping it cancels the format check.
314 let (_never_cancel_handle, never_cancel_receiver) = bounded(1);
315
316 // We block here because the checks are quick and database validity is
317 // consensus-critical.
318 if disk_version >= self.db.format_version_in_code() {
319 DbFormatChange::check_new_blocks(self)
320 .run_format_change_or_check(
321 self,
322 // The initial tip height is not used by the new blocks format check.
323 None,
324 &never_cancel_receiver,
325 )
326 .expect("cancel handle is never used");
327 }
328 }
329 }
330
331 self.check_for_panics();
332
333 self.db.shutdown(force);
334 }
335
336 /// Check that the on-disk height is well below the maximum supported database height.
337 ///
338 /// Zebra only supports on-disk heights up to 3 bytes.
339 ///
340 /// # Logs an Error
341 ///
342 /// If Zebra is storing block heights that are close to [`MAX_ON_DISK_HEIGHT`].
343 pub(crate) fn check_max_on_disk_tip_height(&self) -> Result<(), String> {
344 if let Some((tip_height, tip_hash)) = self.tip() {
345 if tip_height.0 > MAX_ON_DISK_HEIGHT.0 / 2 {
346 let err = Err(format!(
347 "unexpectedly large tip height, database format upgrade required: \
348 tip height: {tip_height:?}, tip hash: {tip_hash:?}, \
349 max height: {MAX_ON_DISK_HEIGHT:?}"
350 ));
351 error!(?err);
352 return err;
353 }
354 }
355
356 Ok(())
357 }
358
359 /// Logs metrics related to the underlying RocksDB instance.
360 ///
361 /// This function prints various metrics and statistics about the RocksDB database,
362 /// such as disk usage, memory usage, and other performance-related metrics.
363 pub fn print_db_metrics(&self) {
364 self.db.print_db_metrics();
365 }
366
367 /// Returns the estimated total disk space usage of the database.
368 pub fn size(&self) -> u64 {
369 self.db.size()
370 }
371}
372
373impl Drop for ZebraDb {
374 fn drop(&mut self) {
375 self.shutdown(false);
376 }
377}