zebra_state/service/finalized_state/disk_db.rs
1//! Provides low-level access to RocksDB using some database-specific types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction
5//! ([`rocksdb::WriteBatch`]), and
6//! - format-specific invariants are maintained.
7//!
8//! # Correctness
9//!
10//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
11//! each time the database format (column, serialization, etc) changes.
12
13use std::{
14 collections::{BTreeMap, HashMap},
15 fmt::{Debug, Write},
16 fs,
17 ops::RangeBounds,
18 path::Path,
19 sync::Arc,
20};
21
22use itertools::Itertools;
23use rlimit::increase_nofile_limit;
24
25use rocksdb::{ColumnFamilyDescriptor, ErrorKind, Options, ReadOptions};
26use semver::Version;
27use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian};
28
29use crate::{
30 database_format_version_on_disk,
31 service::finalized_state::disk_format::{FromDisk, IntoDisk},
32 write_database_format_version_to_disk, Config,
33};
34
35use super::zebra_db::transparent::{
36 fetch_add_balance_and_received, BALANCE_BY_TRANSPARENT_ADDR,
37 BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
38};
39// Doc-only imports
40#[allow(unused_imports)]
41use super::{TypedColumnFamily, WriteTypedBatch};
42
43#[cfg(any(test, feature = "proptest-impl"))]
44mod tests;
45
46/// The [`rocksdb::ThreadMode`] used by the database.
47pub type DBThreadMode = rocksdb::SingleThreaded;
48
49/// The [`rocksdb`] database type, including thread mode.
50///
51/// Also the [`rocksdb::DBAccess`] used by database iterators.
52pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
53
54/// Wrapper struct to ensure low-level database access goes through the correct API.
55///
56/// `rocksdb` allows concurrent writes through a shared reference,
57/// so database instances are cloneable. When the final clone is dropped,
58/// the database is closed.
59///
60/// # Correctness
61///
62/// Reading transactions from the database using RocksDB iterators causes hangs.
63/// But creating iterators and reading the tip height works fine.
64///
65/// So these hangs are probably caused by holding column family locks to read:
66/// - multiple values, or
67/// - large values.
68///
69/// This bug might be fixed by moving database operations to blocking threads (#2188),
70/// so that they don't block the tokio executor.
71/// (Or it might be fixed by future RocksDB upgrades.)
72#[derive(Clone, Debug)]
73pub struct DiskDb {
74 // Configuration
75 //
76 // This configuration cannot be modified after the database is initialized,
77 // because some clones would have different values.
78 //
79 /// The configured database kind for this database.
80 db_kind: String,
81
82 /// The format version of the running Zebra code.
83 format_version_in_code: Version,
84
85 /// The configured network for this database.
86 network: Network,
87
88 /// The configured temporary database setting.
89 ///
90 /// If true, the database files are deleted on drop.
91 ephemeral: bool,
92
93 // Owned State
94 //
95 // Everything contained in this state must be shared by all clones, or read-only.
96 //
97 /// The shared inner RocksDB database.
98 ///
99 /// RocksDB allows reads and writes via a shared reference.
100 ///
101 /// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
102 /// column family changes and [`Drop`] require exclusive access.
103 ///
104 /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
105 /// only [`Drop`] requires exclusive access.
106 db: Arc<DB>,
107}
108
109/// Wrapper struct to ensure low-level database writes go through the correct API.
110///
111/// [`rocksdb::WriteBatch`] is a batched set of database updates,
112/// which must be written to the database using `DiskDb::write(batch)`.
113#[must_use = "batches must be written to the database"]
114#[derive(Default)]
115pub struct DiskWriteBatch {
116 /// The inner RocksDB write batch.
117 batch: rocksdb::WriteBatch,
118}
119
120impl Debug for DiskWriteBatch {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("DiskWriteBatch")
123 .field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
124 .finish()
125 }
126}
127
128impl PartialEq for DiskWriteBatch {
129 fn eq(&self, other: &Self) -> bool {
130 self.batch.data() == other.batch.data()
131 }
132}
133
134impl Eq for DiskWriteBatch {}
135
136/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
137///
138/// # Deprecation
139///
140/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
141//
142// TODO: replace uses of this trait with WriteTypedBatch,
143// implement these methods directly on WriteTypedBatch, and delete the trait.
144pub trait WriteDisk {
145 /// Serialize and insert the given key and value into a rocksdb column family,
146 /// overwriting any existing `value` for `key`.
147 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
148 where
149 C: rocksdb::AsColumnFamilyRef,
150 K: IntoDisk + Debug,
151 V: IntoDisk;
152
153 /// Serialize and merge the given key and value into a rocksdb column family,
154 /// merging with any existing `value` for `key`.
155 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
156 where
157 C: rocksdb::AsColumnFamilyRef,
158 K: IntoDisk + Debug,
159 V: IntoDisk;
160
161 /// Remove the given key from a rocksdb column family, if it exists.
162 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
163 where
164 C: rocksdb::AsColumnFamilyRef,
165 K: IntoDisk + Debug;
166
167 /// Delete the given key range from a rocksdb column family, if it exists, including `from`
168 /// and excluding `until_strictly_before`.
169 //
170 // TODO: convert zs_delete_range() to take std::ops::RangeBounds
171 // see zs_range_iter() for an example of the edge cases
172 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
173 where
174 C: rocksdb::AsColumnFamilyRef,
175 K: IntoDisk + Debug;
176}
177
178/// # Deprecation
179///
180/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
181//
182// TODO: replace uses of these impls with WriteTypedBatch,
183// implement these methods directly on WriteTypedBatch, and delete the trait.
184impl WriteDisk for DiskWriteBatch {
185 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
186 where
187 C: rocksdb::AsColumnFamilyRef,
188 K: IntoDisk + Debug,
189 V: IntoDisk,
190 {
191 let key_bytes = key.as_bytes();
192 let value_bytes = value.as_bytes();
193 self.batch.put_cf(cf, key_bytes, value_bytes);
194 }
195
196 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
197 where
198 C: rocksdb::AsColumnFamilyRef,
199 K: IntoDisk + Debug,
200 V: IntoDisk,
201 {
202 let key_bytes = key.as_bytes();
203 let value_bytes = value.as_bytes();
204 self.batch.merge_cf(cf, key_bytes, value_bytes);
205 }
206
207 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
208 where
209 C: rocksdb::AsColumnFamilyRef,
210 K: IntoDisk + Debug,
211 {
212 let key_bytes = key.as_bytes();
213 self.batch.delete_cf(cf, key_bytes);
214 }
215
216 // TODO: convert zs_delete_range() to take std::ops::RangeBounds
217 // see zs_range_iter() for an example of the edge cases
218 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
219 where
220 C: rocksdb::AsColumnFamilyRef,
221 K: IntoDisk + Debug,
222 {
223 let from_bytes = from.as_bytes();
224 let until_strictly_before_bytes = until_strictly_before.as_bytes();
225 self.batch
226 .delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
227 }
228}
229
230// Allow &mut DiskWriteBatch as well as owned DiskWriteBatch
231impl<T> WriteDisk for &mut T
232where
233 T: WriteDisk,
234{
235 fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
236 where
237 C: rocksdb::AsColumnFamilyRef,
238 K: IntoDisk + Debug,
239 V: IntoDisk,
240 {
241 (*self).zs_insert(cf, key, value)
242 }
243
244 fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
245 where
246 C: rocksdb::AsColumnFamilyRef,
247 K: IntoDisk + Debug,
248 V: IntoDisk,
249 {
250 (*self).zs_merge(cf, key, value)
251 }
252
253 fn zs_delete<C, K>(&mut self, cf: &C, key: K)
254 where
255 C: rocksdb::AsColumnFamilyRef,
256 K: IntoDisk + Debug,
257 {
258 (*self).zs_delete(cf, key)
259 }
260
261 fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
262 where
263 C: rocksdb::AsColumnFamilyRef,
264 K: IntoDisk + Debug,
265 {
266 (*self).zs_delete_range(cf, from, until_strictly_before)
267 }
268}
269
270/// Helper trait for retrieving and deserializing values from rocksdb column families.
271///
272/// # Deprecation
273///
274/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
275//
276// TODO: replace uses of this trait with TypedColumnFamily,
277// implement these methods directly on DiskDb, and delete the trait.
278pub trait ReadDisk {
279 /// Returns true if a rocksdb column family `cf` does not contain any entries.
280 fn zs_is_empty<C>(&self, cf: &C) -> bool
281 where
282 C: rocksdb::AsColumnFamilyRef;
283
284 /// Returns the value for `key` in the rocksdb column family `cf`, if present.
285 fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
286 where
287 C: rocksdb::AsColumnFamilyRef,
288 K: IntoDisk,
289 V: FromDisk;
290
291 /// Check if a rocksdb column family `cf` contains the serialized form of `key`.
292 fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
293 where
294 C: rocksdb::AsColumnFamilyRef,
295 K: IntoDisk;
296
297 /// Returns the lowest key in `cf`, and the corresponding value.
298 ///
299 /// Returns `None` if the column family is empty.
300 fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
301 where
302 C: rocksdb::AsColumnFamilyRef,
303 K: IntoDisk + FromDisk,
304 V: FromDisk;
305
306 /// Returns the highest key in `cf`, and the corresponding value.
307 ///
308 /// Returns `None` if the column family is empty.
309 fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
310 where
311 C: rocksdb::AsColumnFamilyRef,
312 K: IntoDisk + FromDisk,
313 V: FromDisk;
314
315 /// Returns the first key greater than or equal to `lower_bound` in `cf`,
316 /// and the corresponding value.
317 ///
318 /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
319 fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
320 where
321 C: rocksdb::AsColumnFamilyRef,
322 K: IntoDisk + FromDisk,
323 V: FromDisk;
324
325 /// Returns the first key strictly greater than `lower_bound` in `cf`,
326 /// and the corresponding value.
327 ///
328 /// Returns `None` if there are no keys greater than `lower_bound`.
329 fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
330 where
331 C: rocksdb::AsColumnFamilyRef,
332 K: IntoDisk + FromDisk,
333 V: FromDisk;
334
335 /// Returns the first key less than or equal to `upper_bound` in `cf`,
336 /// and the corresponding value.
337 ///
338 /// Returns `None` if there are no keys less than or equal to `upper_bound`.
339 fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
340 where
341 C: rocksdb::AsColumnFamilyRef,
342 K: IntoDisk + FromDisk,
343 V: FromDisk;
344
345 /// Returns the first key strictly less than `upper_bound` in `cf`,
346 /// and the corresponding value.
347 ///
348 /// Returns `None` if there are no keys less than `upper_bound`.
349 fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
350 where
351 C: rocksdb::AsColumnFamilyRef,
352 K: IntoDisk + FromDisk,
353 V: FromDisk;
354
355 /// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
356 ///
357 /// Holding this iterator open might delay block commit transactions.
358 fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
359 where
360 C: rocksdb::AsColumnFamilyRef,
361 K: IntoDisk + FromDisk + Ord,
362 V: FromDisk,
363 R: RangeBounds<K>;
364
365 /// Returns the keys and values in `cf` in `range`, in an unordered `HashMap`.
366 ///
367 /// Holding this iterator open might delay block commit transactions.
368 fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
369 where
370 C: rocksdb::AsColumnFamilyRef,
371 K: IntoDisk + FromDisk + Eq + std::hash::Hash,
372 V: FromDisk,
373 R: RangeBounds<K>;
374}
375
376impl PartialEq for DiskDb {
377 fn eq(&self, other: &Self) -> bool {
378 if self.db.path() == other.db.path() {
379 assert_eq!(
380 self.network, other.network,
381 "database with same path but different network configs",
382 );
383 assert_eq!(
384 self.ephemeral, other.ephemeral,
385 "database with same path but different ephemeral configs",
386 );
387
388 return true;
389 }
390
391 false
392 }
393}
394
395impl Eq for DiskDb {}
396
397/// # Deprecation
398///
399/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
400//
401// TODO: replace uses of these impls with TypedColumnFamily,
402// implement these methods directly on DiskDb, and delete the trait.
403impl ReadDisk for DiskDb {
404 fn zs_is_empty<C>(&self, cf: &C) -> bool
405 where
406 C: rocksdb::AsColumnFamilyRef,
407 {
408 // Empty column families return invalid forward iterators.
409 //
410 // Checking iterator validity does not seem to cause database hangs.
411 let iterator = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
412 let raw_iterator: rocksdb::DBRawIteratorWithThreadMode<DB> = iterator.into();
413
414 !raw_iterator.valid()
415 }
416
417 #[allow(clippy::unwrap_in_result)]
418 fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
419 where
420 C: rocksdb::AsColumnFamilyRef,
421 K: IntoDisk,
422 V: FromDisk,
423 {
424 let key_bytes = key.as_bytes();
425
426 // We use `get_pinned_cf` to avoid taking ownership of the serialized
427 // value, because we're going to deserialize it anyways, which avoids an
428 // extra copy
429 let value_bytes = self
430 .db
431 .get_pinned_cf(cf, key_bytes)
432 .expect("unexpected database failure");
433
434 value_bytes.map(V::from_bytes)
435 }
436
437 fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
438 where
439 C: rocksdb::AsColumnFamilyRef,
440 K: IntoDisk,
441 {
442 let key_bytes = key.as_bytes();
443
444 // We use `get_pinned_cf` to avoid taking ownership of the serialized
445 // value, because we don't use the value at all. This avoids an extra copy.
446 self.db
447 .get_pinned_cf(cf, key_bytes)
448 .expect("unexpected database failure")
449 .is_some()
450 }
451
452 fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
453 where
454 C: rocksdb::AsColumnFamilyRef,
455 K: IntoDisk + FromDisk,
456 V: FromDisk,
457 {
458 // Reading individual values from iterators does not seem to cause database hangs.
459 self.zs_forward_range_iter(cf, ..).next()
460 }
461
462 fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
463 where
464 C: rocksdb::AsColumnFamilyRef,
465 K: IntoDisk + FromDisk,
466 V: FromDisk,
467 {
468 // Reading individual values from iterators does not seem to cause database hangs.
469 self.zs_reverse_range_iter(cf, ..).next()
470 }
471
472 fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
473 where
474 C: rocksdb::AsColumnFamilyRef,
475 K: IntoDisk + FromDisk,
476 V: FromDisk,
477 {
478 self.zs_forward_range_iter(cf, lower_bound..).next()
479 }
480
481 fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
482 where
483 C: rocksdb::AsColumnFamilyRef,
484 K: IntoDisk + FromDisk,
485 V: FromDisk,
486 {
487 use std::ops::Bound::*;
488
489 // There is no standard syntax for an excluded start bound.
490 self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
491 .next()
492 }
493
494 fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
495 where
496 C: rocksdb::AsColumnFamilyRef,
497 K: IntoDisk + FromDisk,
498 V: FromDisk,
499 {
500 self.zs_reverse_range_iter(cf, ..=upper_bound).next()
501 }
502
503 fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
504 where
505 C: rocksdb::AsColumnFamilyRef,
506 K: IntoDisk + FromDisk,
507 V: FromDisk,
508 {
509 self.zs_reverse_range_iter(cf, ..upper_bound).next()
510 }
511
512 fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
513 where
514 C: rocksdb::AsColumnFamilyRef,
515 K: IntoDisk + FromDisk + Ord,
516 V: FromDisk,
517 R: RangeBounds<K>,
518 {
519 self.zs_forward_range_iter(cf, range).collect()
520 }
521
522 fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
523 where
524 C: rocksdb::AsColumnFamilyRef,
525 K: IntoDisk + FromDisk + Eq + std::hash::Hash,
526 V: FromDisk,
527 R: RangeBounds<K>,
528 {
529 self.zs_forward_range_iter(cf, range).collect()
530 }
531}
532
533impl DiskWriteBatch {
534 /// Creates and returns a new transactional batch write.
535 ///
536 /// # Correctness
537 ///
538 /// Each block must be written to the state inside a batch, so that:
539 /// - concurrent `ReadStateService` queries don't see half-written blocks, and
540 /// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
541 pub fn new() -> Self {
542 DiskWriteBatch {
543 batch: rocksdb::WriteBatch::default(),
544 }
545 }
546}
547
548impl DiskDb {
549 /// Prints rocksdb metrics for each column family along with total database disk size, live data disk size and database memory size.
550 pub fn print_db_metrics(&self) {
551 let mut total_size_on_disk = 0;
552 let mut total_live_size_on_disk = 0;
553 let mut total_size_in_mem = 0;
554 let db: &Arc<DB> = &self.db;
555 let db_options = DiskDb::options();
556 let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
557 let mut column_families_log_string = String::from("");
558
559 write!(column_families_log_string, "Column families and sizes: ").unwrap();
560
561 for cf_descriptor in column_families {
562 let cf_name = &cf_descriptor.name();
563 let cf_handle = db
564 .cf_handle(cf_name)
565 .expect("Column family handle must exist");
566 let live_data_size = db
567 .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
568 .unwrap_or(Some(0));
569 let total_sst_files_size = db
570 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
571 .unwrap_or(Some(0));
572 let cf_disk_size = total_sst_files_size.unwrap_or(0);
573 total_size_on_disk += cf_disk_size;
574 total_live_size_on_disk += live_data_size.unwrap_or(0);
575 let mem_table_size = db
576 .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
577 .unwrap_or(Some(0));
578 total_size_in_mem += mem_table_size.unwrap_or(0);
579
580 write!(
581 column_families_log_string,
582 "{} (Disk: {}, Memory: {})",
583 cf_name,
584 human_bytes::human_bytes(cf_disk_size as f64),
585 human_bytes::human_bytes(mem_table_size.unwrap_or(0) as f64)
586 )
587 .unwrap();
588 }
589
590 debug!("{}", column_families_log_string);
591 info!(
592 "Total Database Disk Size: {}",
593 human_bytes::human_bytes(total_size_on_disk as f64)
594 );
595 info!(
596 "Total Live Data Disk Size: {}",
597 human_bytes::human_bytes(total_live_size_on_disk as f64)
598 );
599 info!(
600 "Total Database Memory Size: {}",
601 human_bytes::human_bytes(total_size_in_mem as f64)
602 );
603 }
604
605 /// Returns the estimated total disk space usage of the database.
606 pub fn size(&self) -> u64 {
607 let db: &Arc<DB> = &self.db;
608 let db_options = DiskDb::options();
609 let mut total_size_on_disk = 0;
610 for cf_descriptor in DiskDb::construct_column_families(db_options, db.path(), []) {
611 let cf_name = &cf_descriptor.name();
612 let cf_handle = db
613 .cf_handle(cf_name)
614 .expect("Column family handle must exist");
615
616 total_size_on_disk += db
617 .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
618 .ok()
619 .flatten()
620 .unwrap_or(0);
621 }
622
623 total_size_on_disk
624 }
625
626 /// When called with a secondary DB instance, tries to catch up with the primary DB instance
627 pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
628 self.db.try_catch_up_with_primary()
629 }
630
631 /// Returns a forward iterator over the items in `cf` in `range`.
632 ///
633 /// Holding this iterator open might delay block commit transactions.
634 pub fn zs_forward_range_iter<C, K, V, R>(
635 &self,
636 cf: &C,
637 range: R,
638 ) -> impl Iterator<Item = (K, V)> + '_
639 where
640 C: rocksdb::AsColumnFamilyRef,
641 K: IntoDisk + FromDisk,
642 V: FromDisk,
643 R: RangeBounds<K>,
644 {
645 self.zs_range_iter_with_direction(cf, range, false)
646 }
647
648 /// Returns a reverse iterator over the items in `cf` in `range`.
649 ///
650 /// Holding this iterator open might delay block commit transactions.
651 pub fn zs_reverse_range_iter<C, K, V, R>(
652 &self,
653 cf: &C,
654 range: R,
655 ) -> impl Iterator<Item = (K, V)> + '_
656 where
657 C: rocksdb::AsColumnFamilyRef,
658 K: IntoDisk + FromDisk,
659 V: FromDisk,
660 R: RangeBounds<K>,
661 {
662 self.zs_range_iter_with_direction(cf, range, true)
663 }
664
665 /// Returns an iterator over the items in `cf` in `range`.
666 ///
667 /// RocksDB iterators are ordered by increasing key bytes by default.
668 /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
669 ///
670 /// Holding this iterator open might delay block commit transactions.
671 fn zs_range_iter_with_direction<C, K, V, R>(
672 &self,
673 cf: &C,
674 range: R,
675 reverse: bool,
676 ) -> impl Iterator<Item = (K, V)> + '_
677 where
678 C: rocksdb::AsColumnFamilyRef,
679 K: IntoDisk + FromDisk,
680 V: FromDisk,
681 R: RangeBounds<K>,
682 {
683 use std::ops::Bound::{self, *};
684
685 // Replace with map() when it stabilises:
686 // https://github.com/rust-lang/rust/issues/86026
687 let map_to_vec = |bound: Bound<&K>| -> Bound<Vec<u8>> {
688 match bound {
689 Unbounded => Unbounded,
690 Included(x) => Included(x.as_bytes().as_ref().to_vec()),
691 Excluded(x) => Excluded(x.as_bytes().as_ref().to_vec()),
692 }
693 };
694
695 let start_bound = map_to_vec(range.start_bound());
696 let end_bound = map_to_vec(range.end_bound());
697 let range = (start_bound, end_bound);
698
699 let mode = Self::zs_iter_mode(&range, reverse);
700 let opts = Self::zs_iter_opts(&range);
701
702 // Reading multiple items from iterators has caused database hangs,
703 // in previous RocksDB versions
704 self.db
705 .iterator_cf_opt(cf, opts, mode)
706 .map(|result| result.expect("unexpected database failure"))
707 .map(|(key, value)| (key.to_vec(), value))
708 // Skip excluded "from" bound and empty ranges. The `mode` already skips keys
709 // strictly before the "from" bound.
710 .skip_while({
711 let range = range.clone();
712 move |(key, _value)| !range.contains(key)
713 })
714 // Take until the excluded "to" bound is reached,
715 // or we're after the included "to" bound.
716 .take_while(move |(key, _value)| range.contains(key))
717 .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
718 }
719
720 /// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
721 fn zs_iter_opts<R>(range: &R) -> ReadOptions
722 where
723 R: RangeBounds<Vec<u8>>,
724 {
725 let mut opts = ReadOptions::default();
726 let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
727
728 if let Some(bound) = lower_bound {
729 opts.set_iterate_lower_bound(bound);
730 };
731
732 if let Some(bound) = upper_bound {
733 opts.set_iterate_upper_bound(bound);
734 };
735
736 opts
737 }
738
739 /// Returns a lower and upper iterate bounds for a range.
740 ///
741 /// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
742 /// will increment the upper bound by 1 if the end bound of the provided range
743 /// is inclusive.
744 fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
745 where
746 R: RangeBounds<Vec<u8>>,
747 {
748 use std::ops::Bound::*;
749
750 let lower_bound = match range.start_bound() {
751 Included(bound) | Excluded(bound) => Some(bound.clone()),
752 Unbounded => None,
753 };
754
755 let upper_bound = match range.end_bound().cloned() {
756 Included(mut bound) => {
757 // Increment the last byte in the upper bound that is less than u8::MAX, and
758 // clear any bytes after it to increment the next key in lexicographic order
759 // (next big-endian number). RocksDB uses lexicographic order for keys.
760 let is_wrapped_overflow = increment_big_endian(&mut bound);
761
762 if is_wrapped_overflow {
763 bound.insert(0, 0x01)
764 }
765
766 Some(bound)
767 }
768 Excluded(bound) => Some(bound),
769 Unbounded => None,
770 };
771
772 (lower_bound, upper_bound)
773 }
774
775 /// Returns the RocksDB iterator "from" mode for `range`.
776 ///
777 /// RocksDB iterators are ordered by increasing key bytes by default.
778 /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
779 fn zs_iter_mode<R>(range: &R, reverse: bool) -> rocksdb::IteratorMode
780 where
781 R: RangeBounds<Vec<u8>>,
782 {
783 use std::ops::Bound::*;
784
785 let from_bound = if reverse {
786 range.end_bound()
787 } else {
788 range.start_bound()
789 };
790
791 match from_bound {
792 Unbounded => {
793 if reverse {
794 // Reversed unbounded iterators start from the last item
795 rocksdb::IteratorMode::End
796 } else {
797 // Unbounded iterators start from the first item
798 rocksdb::IteratorMode::Start
799 }
800 }
801
802 Included(bound) | Excluded(bound) => {
803 let direction = if reverse {
804 rocksdb::Direction::Reverse
805 } else {
806 rocksdb::Direction::Forward
807 };
808
809 rocksdb::IteratorMode::From(bound.as_slice(), direction)
810 }
811 }
812 }
813
814 /// The ideal open file limit for Zebra
815 const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
816
817 /// The minimum number of open files for Zebra to operate normally. Also used
818 /// as the default open file limit, when the OS doesn't tell us how many
819 /// files we can use.
820 ///
821 /// We want 100+ file descriptors for peers, and 100+ for the database.
822 ///
823 /// On Windows, the default limit is 512 high-level I/O files, and 8192
824 /// low-level I/O files:
825 /// <https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks>
826 const MIN_OPEN_FILE_LIMIT: u64 = 512;
827
828 /// The number of files used internally by Zebra.
829 ///
830 /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
831 /// stdio (3), and other OS facilities (2+).
832 const RESERVED_FILE_COUNT: u64 = 48;
833
834 /// The size of the database memtable RAM cache in megabytes.
835 ///
836 /// <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#configuration-and-tuning>
837 const MEMTABLE_RAM_CACHE_MEGABYTES: usize = 128;
838
839 /// Build a vector of current column families on the disk and optionally any new column families.
840 /// Returns an iterable collection of all column families.
841 fn construct_column_families(
842 db_options: Options,
843 path: &Path,
844 column_families_in_code: impl IntoIterator<Item = String>,
845 ) -> impl Iterator<Item = ColumnFamilyDescriptor> {
846 // When opening the database in read/write mode, all column families must be opened.
847 //
848 // To make Zebra forward-compatible with databases updated by later versions,
849 // we read any existing column families off the disk, then add any new column families
850 // from the current implementation.
851 //
852 // <https://github.com/facebook/rocksdb/wiki/Column-Families#reference>
853 let column_families_on_disk = DB::list_cf(&db_options, path).unwrap_or_default();
854 let column_families_in_code = column_families_in_code.into_iter();
855
856 column_families_on_disk
857 .into_iter()
858 .chain(column_families_in_code)
859 .unique()
860 .map(move |cf_name: String| {
861 let mut cf_options = db_options.clone();
862
863 if cf_name == BALANCE_BY_TRANSPARENT_ADDR {
864 cf_options.set_merge_operator_associative(
865 BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
866 fetch_add_balance_and_received,
867 );
868 }
869
870 rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_options.clone())
871 })
872 }
873
874 /// Opens or creates the database at a path based on the kind, major version and network,
875 /// with the supplied column families, preserving any existing column families,
876 /// and returns a shared low-level database wrapper.
877 ///
878 /// # Panics
879 ///
880 /// - If the cache directory does not exist and can't be created.
881 /// - If the database cannot be opened for whatever reason.
882 pub fn new(
883 config: &Config,
884 db_kind: impl AsRef<str>,
885 format_version_in_code: &Version,
886 network: &Network,
887 column_families_in_code: impl IntoIterator<Item = String>,
888 read_only: bool,
889 ) -> DiskDb {
890 // If the database is ephemeral, we don't need to check the cache directory.
891 if !config.ephemeral {
892 DiskDb::validate_cache_dir(&config.cache_dir);
893 }
894
895 let db_kind = db_kind.as_ref();
896 let path = config.db_path(db_kind, format_version_in_code.major, network);
897
898 let db_options = DiskDb::options();
899
900 let column_families =
901 DiskDb::construct_column_families(db_options.clone(), &path, column_families_in_code);
902
903 let db_result = if read_only {
904 // Use a tempfile for the secondary instance cache directory
905 let secondary_config = Config {
906 ephemeral: true,
907 ..config.clone()
908 };
909 let secondary_path =
910 secondary_config.db_path("secondary_state", format_version_in_code.major, network);
911 let create_dir_result = std::fs::create_dir_all(&secondary_path);
912
913 info!(?create_dir_result, "creating secondary db directory");
914
915 DB::open_cf_descriptors_as_secondary(
916 &db_options,
917 &path,
918 &secondary_path,
919 column_families,
920 )
921 } else {
922 DB::open_cf_descriptors(&db_options, &path, column_families)
923 };
924
925 match db_result {
926 Ok(db) => {
927 info!("Opened Zebra state cache at {}", path.display());
928
929 let db = DiskDb {
930 db_kind: db_kind.to_string(),
931 format_version_in_code: format_version_in_code.clone(),
932 network: network.clone(),
933 ephemeral: config.ephemeral,
934 db: Arc::new(db),
935 };
936
937 db.assert_default_cf_is_empty();
938
939 db
940 }
941
942 Err(e) if matches!(e.kind(), ErrorKind::Busy | ErrorKind::IOError) => panic!(
943 "Database likely already open {path:?} \
944 Hint: Check if another zebrad process is running."
945 ),
946
947 Err(e) => panic!(
948 "Opening database {path:?} failed. \
949 Hint: Try changing the state cache_dir in the Zebra config. \
950 Error: {e}",
951 ),
952 }
953 }
954
955 // Accessor methods
956
957 /// Returns the configured database kind for this database.
958 pub fn db_kind(&self) -> String {
959 self.db_kind.clone()
960 }
961
962 /// Returns the format version of the running code that created this `DiskDb` instance in memory.
963 pub fn format_version_in_code(&self) -> Version {
964 self.format_version_in_code.clone()
965 }
966
967 /// Returns the fixed major version for this database.
968 pub fn major_version(&self) -> u64 {
969 self.format_version_in_code().major
970 }
971
972 /// Returns the configured network for this database.
973 pub fn network(&self) -> Network {
974 self.network.clone()
975 }
976
977 /// Returns the `Path` where the files used by this database are located.
978 pub fn path(&self) -> &Path {
979 self.db.path()
980 }
981
982 /// Returns the low-level rocksdb inner database.
983 #[allow(dead_code)]
984 fn inner(&self) -> &Arc<DB> {
985 &self.db
986 }
987
988 /// Returns the column family handle for `cf_name`.
989 pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
990 // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
991 // the shorter of &self and &str, but RocksDB clones column family names internally, so it
992 // should just be &self. To avoid this restriction, clone the string before passing it to
993 // this method. Currently Zebra uses static strings, so this doesn't matter.
994 self.db.cf_handle(cf_name)
995 }
996
997 // Read methods are located in the ReadDisk trait
998
999 // Write methods
1000 // Low-level write methods are located in the WriteDisk trait
1001
1002 /// Writes `batch` to the database.
1003 pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
1004 self.db.write(batch.batch)
1005 }
1006
1007 // Private methods
1008
1009 /// Tries to reuse an existing db after a major upgrade.
1010 ///
1011 /// If the current db version belongs to `restorable_db_versions`, the function moves a previous
1012 /// db to a new path so it can be used again. It does so by merely trying to rename the path
1013 /// corresponding to the db version directly preceding the current version to the path that is
1014 /// used by the current db. If successful, it also deletes the db version file.
1015 ///
1016 /// Returns the old disk version if one existed and the db directory was renamed, or None otherwise.
1017 // TODO: Update this function to rename older major db format version to the current version (#9565).
1018 #[allow(clippy::unwrap_in_result)]
1019 pub(crate) fn try_reusing_previous_db_after_major_upgrade(
1020 restorable_db_versions: &[u64],
1021 format_version_in_code: &Version,
1022 config: &Config,
1023 db_kind: impl AsRef<str>,
1024 network: &Network,
1025 ) -> Option<Version> {
1026 if let Some(&major_db_ver) = restorable_db_versions
1027 .iter()
1028 .find(|v| **v == format_version_in_code.major)
1029 {
1030 let db_kind = db_kind.as_ref();
1031
1032 let old_major_db_ver = major_db_ver - 1;
1033 let old_path = config.db_path(db_kind, old_major_db_ver, network);
1034 // Exit early if the path doesn't exist or there's an error checking it.
1035 if !fs::exists(&old_path).unwrap_or(false) {
1036 return None;
1037 }
1038
1039 let new_path = config.db_path(db_kind, major_db_ver, network);
1040
1041 let old_path = match fs::canonicalize(&old_path) {
1042 Ok(canonicalized_old_path) => canonicalized_old_path,
1043 Err(e) => {
1044 warn!("could not canonicalize {old_path:?}: {e}");
1045 return None;
1046 }
1047 };
1048
1049 let cache_path = match fs::canonicalize(&config.cache_dir) {
1050 Ok(canonicalized_cache_path) => canonicalized_cache_path,
1051 Err(e) => {
1052 warn!("could not canonicalize {:?}: {e}", config.cache_dir);
1053 return None;
1054 }
1055 };
1056
1057 // # Correctness
1058 //
1059 // Check that the path we're about to move is inside the cache directory.
1060 //
1061 // If the user has symlinked the state directory to a non-cache directory, we don't want
1062 // to move it, because it might contain other files.
1063 //
1064 // We don't attempt to guard against malicious symlinks created by attackers
1065 // (TOCTOU attacks). Zebra should not be run with elevated privileges.
1066 if !old_path.starts_with(&cache_path) {
1067 info!("skipped reusing previous state cache: state is outside cache directory");
1068 return None;
1069 }
1070
1071 let opts = DiskDb::options();
1072 let old_db_exists = DB::list_cf(&opts, &old_path).is_ok_and(|cf| !cf.is_empty());
1073 let new_db_exists = DB::list_cf(&opts, &new_path).is_ok_and(|cf| !cf.is_empty());
1074
1075 if old_db_exists && !new_db_exists {
1076 // Create the parent directory for the new db. This is because we can't directly
1077 // rename e.g. `state/v25/mainnet/` to `state/v26/mainnet/` with `fs::rename()` if
1078 // `state/v26/` does not exist.
1079 match fs::create_dir_all(
1080 new_path
1081 .parent()
1082 .expect("new state cache must have a parent path"),
1083 ) {
1084 Ok(()) => info!("created new directory for state cache at {new_path:?}"),
1085 Err(e) => {
1086 warn!(
1087 "could not create new directory for state cache at {new_path:?}: {e}"
1088 );
1089 return None;
1090 }
1091 };
1092
1093 match fs::rename(&old_path, &new_path) {
1094 Ok(()) => {
1095 info!("moved state cache from {old_path:?} to {new_path:?}");
1096
1097 let mut disk_version =
1098 database_format_version_on_disk(config, db_kind, major_db_ver, network)
1099 .expect("unable to read database format version file")
1100 .expect("unable to parse database format version");
1101
1102 disk_version.major = old_major_db_ver;
1103
1104 write_database_format_version_to_disk(
1105 config,
1106 db_kind,
1107 major_db_ver,
1108 &disk_version,
1109 network,
1110 )
1111 .expect("unable to write database format version file to disk");
1112
1113 // Get the parent of the old path, e.g. `state/v25/` and delete it if it is
1114 // empty.
1115 let old_path = old_path
1116 .parent()
1117 .expect("old state cache must have parent path");
1118
1119 if fs::read_dir(old_path)
1120 .expect("cached state dir needs to be readable")
1121 .next()
1122 .is_none()
1123 {
1124 match fs::remove_dir_all(old_path) {
1125 Ok(()) => {
1126 info!("removed empty old state cache directory at {old_path:?}")
1127 }
1128 Err(e) => {
1129 warn!(
1130 "could not remove empty old state cache directory \
1131 at {old_path:?}: {e}"
1132 )
1133 }
1134 }
1135 }
1136
1137 return Some(disk_version);
1138 }
1139 Err(e) => {
1140 warn!("could not move state cache from {old_path:?} to {new_path:?}: {e}");
1141 }
1142 };
1143 }
1144 };
1145
1146 None
1147 }
1148
1149 /// Returns the database options for the finalized state database.
1150 fn options() -> rocksdb::Options {
1151 let mut opts = rocksdb::Options::default();
1152 let mut block_based_opts = rocksdb::BlockBasedOptions::default();
1153
1154 const ONE_MEGABYTE: usize = 1024 * 1024;
1155
1156 opts.create_if_missing(true);
1157 opts.create_missing_column_families(true);
1158
1159 // Use the recommended Ribbon filter setting for all column families.
1160 //
1161 // Ribbon filters are faster than Bloom filters in Zebra, as of April 2022.
1162 // (They aren't needed for single-valued column families, but they don't hurt either.)
1163 block_based_opts.set_ribbon_filter(9.9);
1164
1165 // Use the recommended LZ4 compression type.
1166 //
1167 // https://github.com/facebook/rocksdb/wiki/Compression#configuration
1168 opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1169
1170 // Tune level-style database file compaction.
1171 //
1172 // This improves Zebra's initial sync speed slightly, as of April 2022.
1173 opts.optimize_level_style_compaction(Self::MEMTABLE_RAM_CACHE_MEGABYTES * ONE_MEGABYTE);
1174
1175 // Increase the process open file limit if needed,
1176 // then use it to set RocksDB's limit.
1177 let open_file_limit = DiskDb::increase_open_file_limit();
1178 let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
1179
1180 // If the current limit is very large, set the DB limit using the ideal limit
1181 let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
1182 .try_into()
1183 .expect("ideal open file limit fits in a c_int");
1184 let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
1185
1186 opts.set_max_open_files(db_file_limit);
1187
1188 // Set the block-based options
1189 opts.set_block_based_table_factory(&block_based_opts);
1190
1191 opts
1192 }
1193
1194 /// Calculate the database's share of `open_file_limit`
1195 fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
1196 // Give the DB half the files, and reserve half the files for peers
1197 (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
1198 }
1199
1200 /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
1201 /// If that fails, try `MIN_OPEN_FILE_LIMIT`.
1202 ///
1203 /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
1204 /// unchanged.
1205 ///
1206 /// Returns the current limit, after any successful increases.
1207 ///
1208 /// # Panics
1209 ///
1210 /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
1211 fn increase_open_file_limit() -> u64 {
1212 // Zebra mainly uses TCP sockets (`zebra-network`) and low-level files
1213 // (`zebra-state` database).
1214 //
1215 // On Unix-based platforms, `increase_nofile_limit` changes the limit for
1216 // both database files and TCP connections.
1217 //
1218 // But it doesn't do anything on Windows in rlimit 0.7.0.
1219 //
1220 // On Windows, the default limits are:
1221 // - 512 high-level stream I/O files (via the C standard functions),
1222 // - 8192 low-level I/O files (via the Unix C functions), and
1223 // - 1000 TCP Control Block entries (network connections).
1224 //
1225 // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
1226 // http://smallvoid.com/article/winnt-tcpip-max-limit.html
1227 //
1228 // `zebra-state`'s `IDEAL_OPEN_FILE_LIMIT` is much less than
1229 // the Windows low-level I/O file limit.
1230 //
1231 // The [`setmaxstdio` and `getmaxstdio`](https://docs.rs/rlimit/latest/rlimit/#windows)
1232 // functions from the `rlimit` crate only change the high-level I/O file limit.
1233 //
1234 // `zebra-network`'s default connection limit is much less than
1235 // the TCP Control Block limit on Windows.
1236
1237 // We try setting the ideal limit, then the minimum limit.
1238 let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
1239 Ok(current_limit) => current_limit,
1240 Err(limit_error) => {
1241 // These errors can happen due to sandboxing or unsupported system calls,
1242 // even if the file limit is high enough.
1243 info!(
1244 ?limit_error,
1245 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1246 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1247 "unable to increase the open file limit, \
1248 assuming Zebra can open a minimum number of files"
1249 );
1250
1251 return DiskDb::MIN_OPEN_FILE_LIMIT;
1252 }
1253 };
1254
1255 if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
1256 panic!(
1257 "open file limit too low: \
1258 unable to set the number of open files to {}, \
1259 the minimum number of files required by Zebra. \
1260 Current limit is {:?}. \
1261 Hint: Increase the open file limit to {} before launching Zebra",
1262 DiskDb::MIN_OPEN_FILE_LIMIT,
1263 current_limit,
1264 DiskDb::IDEAL_OPEN_FILE_LIMIT
1265 );
1266 } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
1267 warn!(
1268 ?current_limit,
1269 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1270 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1271 "the maximum number of open files is below Zebra's ideal limit. \
1272 Hint: Increase the open file limit to {} before launching Zebra",
1273 DiskDb::IDEAL_OPEN_FILE_LIMIT
1274 );
1275 } else if cfg!(windows) {
1276 // This log is verbose during tests.
1277 #[cfg(not(test))]
1278 info!(
1279 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1280 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1281 "assuming the open file limit is high enough for Zebra",
1282 );
1283 #[cfg(test)]
1284 debug!(
1285 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1286 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1287 "assuming the open file limit is high enough for Zebra",
1288 );
1289 } else {
1290 #[cfg(not(test))]
1291 debug!(
1292 ?current_limit,
1293 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1294 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1295 "the open file limit is high enough for Zebra",
1296 );
1297 #[cfg(test)]
1298 debug!(
1299 ?current_limit,
1300 min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1301 ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1302 "the open file limit is high enough for Zebra",
1303 );
1304 }
1305
1306 current_limit
1307 }
1308
1309 // Cleanup methods
1310
1311 /// Returns the number of shared instances of this database.
1312 ///
1313 /// # Concurrency
1314 ///
1315 /// The actual number of owners can be higher or lower than the returned value,
1316 /// because databases can simultaneously be cloned or dropped in other threads.
1317 ///
1318 /// However, if the number of owners is 1, and the caller has exclusive access,
1319 /// the count can't increase unless that caller clones the database.
1320 pub(crate) fn shared_database_owners(&self) -> usize {
1321 Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
1322 }
1323
1324 /// Shut down the database, cleaning up background tasks and ephemeral data.
1325 ///
1326 /// If `force` is true, clean up regardless of any shared references.
1327 /// `force` can cause errors accessing the database from other shared references.
1328 /// It should only be used in debugging or test code, immediately before a manual shutdown.
1329 ///
1330 /// TODO: make private after the stop height check has moved to the syncer (#3442)
1331 /// move shutting down the database to a blocking thread (#2188)
1332 pub(crate) fn shutdown(&mut self, force: bool) {
1333 // # Correctness
1334 //
1335 // If we're the only owner of the shared database instance,
1336 // then there are no other threads that can increase the strong or weak count.
1337 //
1338 // ## Implementation Requirements
1339 //
1340 // This function and all functions that it calls should avoid cloning the shared database
1341 // instance. If they do, they must drop it before:
1342 // - shutting down database threads, or
1343 // - deleting database files.
1344
1345 if self.shared_database_owners() > 1 {
1346 let path = self.path();
1347
1348 let mut ephemeral_note = "";
1349
1350 if force {
1351 if self.ephemeral {
1352 ephemeral_note = " and removing ephemeral files";
1353 }
1354
1355 // This log is verbose during tests.
1356 #[cfg(not(test))]
1357 info!(
1358 ?path,
1359 "forcing shutdown{} of a state database with multiple active instances",
1360 ephemeral_note,
1361 );
1362 #[cfg(test)]
1363 debug!(
1364 ?path,
1365 "forcing shutdown{} of a state database with multiple active instances",
1366 ephemeral_note,
1367 );
1368 } else {
1369 if self.ephemeral {
1370 ephemeral_note = " and files";
1371 }
1372
1373 debug!(
1374 ?path,
1375 "dropping DiskDb clone, \
1376 but keeping shared database instance{} until the last reference is dropped",
1377 ephemeral_note,
1378 );
1379 return;
1380 }
1381 }
1382
1383 self.assert_default_cf_is_empty();
1384
1385 // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
1386 //
1387 // Zebra's data should be fine if we don't clean up, because:
1388 // - the database flushes regularly anyway
1389 // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
1390 // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
1391 let path = self.path();
1392 debug!(?path, "flushing database to disk");
1393
1394 // These flushes can fail during forced shutdown or during Drop after a shutdown,
1395 // particularly in tests. If they fail, there's nothing we can do about it anyway.
1396 if let Err(error) = self.db.flush() {
1397 let error = format!("{error:?}");
1398 if error.to_ascii_lowercase().contains("shutdown in progress") {
1399 debug!(
1400 ?error,
1401 ?path,
1402 "expected shutdown error flushing database SST files to disk"
1403 );
1404 } else {
1405 info!(
1406 ?error,
1407 ?path,
1408 "unexpected error flushing database SST files to disk during shutdown"
1409 );
1410 }
1411 }
1412
1413 if let Err(error) = self.db.flush_wal(true) {
1414 let error = format!("{error:?}");
1415 if error.to_ascii_lowercase().contains("shutdown in progress") {
1416 debug!(
1417 ?error,
1418 ?path,
1419 "expected shutdown error flushing database WAL buffer to disk"
1420 );
1421 } else {
1422 info!(
1423 ?error,
1424 ?path,
1425 "unexpected error flushing database WAL buffer to disk during shutdown"
1426 );
1427 }
1428 }
1429
1430 // # Memory Safety
1431 //
1432 // We'd like to call `cancel_all_background_work()` before Zebra exits,
1433 // but when we call it, we get memory, thread, or C++ errors when the process exits.
1434 // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
1435 // all the threads have cleaned up.)
1436 //
1437 // # Change History
1438 //
1439 // We've changed this setting multiple times since 2021, in response to new RocksDB
1440 // and Rust compiler behaviour.
1441 //
1442 // We enabled cancel_all_background_work() due to failures on:
1443 // - Rust 1.57 on Linux
1444 //
1445 // We disabled cancel_all_background_work() due to failures on:
1446 // - Rust 1.64 on Linux
1447 //
1448 // We tried enabling cancel_all_background_work() due to failures on:
1449 // - Rust 1.70 on macOS 12.6.5 on x86_64
1450 // but it didn't stop the aborts happening (PR #6820).
1451 //
1452 // There weren't any failures with cancel_all_background_work() disabled on:
1453 // - Rust 1.69 or earlier
1454 // - Linux with Rust 1.70
1455 // And with cancel_all_background_work() enabled or disabled on:
1456 // - macOS 13.2 on aarch64 (M1), native and emulated x86_64, with Rust 1.70
1457 //
1458 // # Detailed Description
1459 //
1460 // We see these kinds of errors:
1461 // ```
1462 // pthread lock: Invalid argument
1463 // pure virtual method called
1464 // terminate called without an active exception
1465 // pthread destroy mutex: Device or resource busy
1466 // Aborted (core dumped)
1467 // signal: 6, SIGABRT: process abort signal
1468 // signal: 11, SIGSEGV: invalid memory reference
1469 // ```
1470 //
1471 // # Reference
1472 //
1473 // The RocksDB wiki says:
1474 // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
1475 // >
1476 // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
1477 // > You can speed up the waiting by calling CancelAllBackgroundWork().
1478 //
1479 // <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ>
1480 //
1481 // > rocksdb::DB instances need to be destroyed before your main function exits.
1482 // > RocksDB instances usually depend on some internal static variables.
1483 // > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
1484 //
1485 // <https://github.com/facebook/rocksdb/wiki/Known-Issues>
1486 //
1487 // # TODO
1488 //
1489 // Try re-enabling this code and fixing the underlying concurrency bug.
1490 //
1491 //info!(?path, "stopping background database tasks");
1492 //self.db.cancel_all_background_work(true);
1493
1494 // We'd like to drop the database before deleting its files,
1495 // because that closes the column families and the database correctly.
1496 // But Rust's ownership rules make that difficult,
1497 // so we just flush and delete ephemeral data instead.
1498 //
1499 // This implementation doesn't seem to cause any issues,
1500 // and the RocksDB Drop implementation handles any cleanup.
1501 self.delete_ephemeral();
1502 }
1503
1504 /// If the database is `ephemeral`, delete its files.
1505 fn delete_ephemeral(&mut self) {
1506 // # Correctness
1507 //
1508 // This function and all functions that it calls should avoid cloning the shared database
1509 // instance. See `shutdown()` for details.
1510
1511 if !self.ephemeral {
1512 return;
1513 }
1514
1515 let path = self.path();
1516
1517 // This log is verbose during tests.
1518 #[cfg(not(test))]
1519 info!(?path, "removing temporary database files");
1520 #[cfg(test)]
1521 debug!(?path, "removing temporary database files");
1522
1523 // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
1524 // but the Zcash blockchain might not fit in memory. So we just
1525 // delete the database files instead.
1526 //
1527 // We'd also like to call `DB::destroy` here, but calling destroy on a
1528 // live DB is undefined behaviour:
1529 // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
1530 //
1531 // So we assume that all the database files are under `path`, and
1532 // delete them using standard filesystem APIs. Deleting open files
1533 // might cause errors on non-Unix platforms, so we ignore the result.
1534 // (The OS will delete them eventually anyway, if they are in a temporary directory.)
1535 let result = std::fs::remove_dir_all(path);
1536
1537 if result.is_err() {
1538 // This log is verbose during tests.
1539 #[cfg(not(test))]
1540 info!(
1541 ?result,
1542 ?path,
1543 "removing temporary database files caused an error",
1544 );
1545 #[cfg(test)]
1546 debug!(
1547 ?result,
1548 ?path,
1549 "removing temporary database files caused an error",
1550 );
1551 } else {
1552 debug!(
1553 ?result,
1554 ?path,
1555 "successfully removed temporary database files",
1556 );
1557 }
1558 }
1559
1560 /// Check that the "default" column family is empty.
1561 ///
1562 /// # Panics
1563 ///
1564 /// If Zebra has a bug where it is storing data in the wrong column family.
1565 fn assert_default_cf_is_empty(&self) {
1566 // # Correctness
1567 //
1568 // This function and all functions that it calls should avoid cloning the shared database
1569 // instance. See `shutdown()` for details.
1570
1571 if let Some(default_cf) = self.cf_handle("default") {
1572 assert!(
1573 self.zs_is_empty(&default_cf),
1574 "Zebra should not store data in the 'default' column family"
1575 );
1576 }
1577 }
1578
1579 // Validates a cache directory and creates it if it doesn't exist.
1580 // If the directory cannot be created, it panics with a specific error message.
1581 fn validate_cache_dir(cache_dir: &std::path::PathBuf) {
1582 if let Err(e) = fs::create_dir_all(cache_dir) {
1583 match e.kind() {
1584 std::io::ErrorKind::PermissionDenied => panic!(
1585 "Permission denied creating {cache_dir:?}. \
1586 Hint: check if cache directory exist and has write permissions."
1587 ),
1588 std::io::ErrorKind::StorageFull => panic!(
1589 "No space left on device creating {cache_dir:?}. \
1590 Hint: check if the disk is full."
1591 ),
1592 _ => panic!("Could not create cache dir {:?}: {}", cache_dir, e),
1593 }
1594 }
1595 }
1596}
1597
1598impl Drop for DiskDb {
1599 fn drop(&mut self) {
1600 let path = self.path();
1601 debug!(?path, "dropping DiskDb instance");
1602
1603 self.shutdown(false);
1604 }
1605}