zebra_state/service/finalized_state/
disk_db.rs

1//! Provides low-level access to RocksDB using some database-specific types.
2//!
3//! This module makes sure that:
4//! - all disk writes happen inside a RocksDB transaction
5//!   ([`rocksdb::WriteBatch`]), and
6//! - format-specific invariants are maintained.
7//!
8//! # Correctness
9//!
10//! [`crate::constants::state_database_format_version_in_code()`] must be incremented
11//! each time the database format (column, serialization, etc) changes.
12
13use std::{
14    collections::{BTreeMap, HashMap},
15    fmt::{Debug, Write},
16    fs,
17    ops::RangeBounds,
18    path::Path,
19    sync::Arc,
20};
21
22use itertools::Itertools;
23use rlimit::increase_nofile_limit;
24
25use rocksdb::{ColumnFamilyDescriptor, ErrorKind, Options, ReadOptions};
26use semver::Version;
27use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian};
28
29use crate::{
30    database_format_version_on_disk,
31    service::finalized_state::disk_format::{FromDisk, IntoDisk},
32    write_database_format_version_to_disk, Config,
33};
34
35use super::zebra_db::transparent::{
36    fetch_add_balance_and_received, BALANCE_BY_TRANSPARENT_ADDR,
37    BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
38};
39// Doc-only imports
40#[allow(unused_imports)]
41use super::{TypedColumnFamily, WriteTypedBatch};
42
43#[cfg(any(test, feature = "proptest-impl"))]
44mod tests;
45
46/// The [`rocksdb::ThreadMode`] used by the database.
47pub type DBThreadMode = rocksdb::SingleThreaded;
48
49/// The [`rocksdb`] database type, including thread mode.
50///
51/// Also the [`rocksdb::DBAccess`] used by database iterators.
52pub type DB = rocksdb::DBWithThreadMode<DBThreadMode>;
53
54/// Wrapper struct to ensure low-level database access goes through the correct API.
55///
56/// `rocksdb` allows concurrent writes through a shared reference,
57/// so database instances are cloneable. When the final clone is dropped,
58/// the database is closed.
59///
60/// # Correctness
61///
62/// Reading transactions from the database using RocksDB iterators causes hangs.
63/// But creating iterators and reading the tip height works fine.
64///
65/// So these hangs are probably caused by holding column family locks to read:
66/// - multiple values, or
67/// - large values.
68///
69/// This bug might be fixed by moving database operations to blocking threads (#2188),
70/// so that they don't block the tokio executor.
71/// (Or it might be fixed by future RocksDB upgrades.)
72#[derive(Clone, Debug)]
73pub struct DiskDb {
74    // Configuration
75    //
76    // This configuration cannot be modified after the database is initialized,
77    // because some clones would have different values.
78    //
79    /// The configured database kind for this database.
80    db_kind: String,
81
82    /// The format version of the running Zebra code.
83    format_version_in_code: Version,
84
85    /// The configured network for this database.
86    network: Network,
87
88    /// The configured temporary database setting.
89    ///
90    /// If true, the database files are deleted on drop.
91    ephemeral: bool,
92
93    // Owned State
94    //
95    // Everything contained in this state must be shared by all clones, or read-only.
96    //
97    /// The shared inner RocksDB database.
98    ///
99    /// RocksDB allows reads and writes via a shared reference.
100    ///
101    /// In [`SingleThreaded`](rocksdb::SingleThreaded) mode,
102    /// column family changes and [`Drop`] require exclusive access.
103    ///
104    /// In [`MultiThreaded`](rocksdb::MultiThreaded) mode,
105    /// only [`Drop`] requires exclusive access.
106    db: Arc<DB>,
107}
108
109/// Wrapper struct to ensure low-level database writes go through the correct API.
110///
111/// [`rocksdb::WriteBatch`] is a batched set of database updates,
112/// which must be written to the database using `DiskDb::write(batch)`.
113#[must_use = "batches must be written to the database"]
114#[derive(Default)]
115pub struct DiskWriteBatch {
116    /// The inner RocksDB write batch.
117    batch: rocksdb::WriteBatch,
118}
119
120impl Debug for DiskWriteBatch {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("DiskWriteBatch")
123            .field("batch", &format!("{} bytes", self.batch.size_in_bytes()))
124            .finish()
125    }
126}
127
128impl PartialEq for DiskWriteBatch {
129    fn eq(&self, other: &Self) -> bool {
130        self.batch.data() == other.batch.data()
131    }
132}
133
134impl Eq for DiskWriteBatch {}
135
136/// Helper trait for inserting serialized typed (Key, Value) pairs into rocksdb.
137///
138/// # Deprecation
139///
140/// This trait should not be used in new code, use [`WriteTypedBatch`] instead.
141//
142// TODO: replace uses of this trait with WriteTypedBatch,
143//       implement these methods directly on WriteTypedBatch, and delete the trait.
144pub trait WriteDisk {
145    /// Serialize and insert the given key and value into a rocksdb column family,
146    /// overwriting any existing `value` for `key`.
147    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
148    where
149        C: rocksdb::AsColumnFamilyRef,
150        K: IntoDisk + Debug,
151        V: IntoDisk;
152
153    /// Serialize and merge the given key and value into a rocksdb column family,
154    /// merging with any existing `value` for `key`.
155    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
156    where
157        C: rocksdb::AsColumnFamilyRef,
158        K: IntoDisk + Debug,
159        V: IntoDisk;
160
161    /// Remove the given key from a rocksdb column family, if it exists.
162    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
163    where
164        C: rocksdb::AsColumnFamilyRef,
165        K: IntoDisk + Debug;
166
167    /// Delete the given key range from a rocksdb column family, if it exists, including `from`
168    /// and excluding `until_strictly_before`.
169    //
170    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
171    //       see zs_range_iter() for an example of the edge cases
172    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
173    where
174        C: rocksdb::AsColumnFamilyRef,
175        K: IntoDisk + Debug;
176}
177
178/// # Deprecation
179///
180/// These impls should not be used in new code, use [`WriteTypedBatch`] instead.
181//
182// TODO: replace uses of these impls with WriteTypedBatch,
183//       implement these methods directly on WriteTypedBatch, and delete the trait.
184impl WriteDisk for DiskWriteBatch {
185    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
186    where
187        C: rocksdb::AsColumnFamilyRef,
188        K: IntoDisk + Debug,
189        V: IntoDisk,
190    {
191        let key_bytes = key.as_bytes();
192        let value_bytes = value.as_bytes();
193        self.batch.put_cf(cf, key_bytes, value_bytes);
194    }
195
196    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
197    where
198        C: rocksdb::AsColumnFamilyRef,
199        K: IntoDisk + Debug,
200        V: IntoDisk,
201    {
202        let key_bytes = key.as_bytes();
203        let value_bytes = value.as_bytes();
204        self.batch.merge_cf(cf, key_bytes, value_bytes);
205    }
206
207    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
208    where
209        C: rocksdb::AsColumnFamilyRef,
210        K: IntoDisk + Debug,
211    {
212        let key_bytes = key.as_bytes();
213        self.batch.delete_cf(cf, key_bytes);
214    }
215
216    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
217    //       see zs_range_iter() for an example of the edge cases
218    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
219    where
220        C: rocksdb::AsColumnFamilyRef,
221        K: IntoDisk + Debug,
222    {
223        let from_bytes = from.as_bytes();
224        let until_strictly_before_bytes = until_strictly_before.as_bytes();
225        self.batch
226            .delete_range_cf(cf, from_bytes, until_strictly_before_bytes);
227    }
228}
229
230// Allow &mut DiskWriteBatch as well as owned DiskWriteBatch
231impl<T> WriteDisk for &mut T
232where
233    T: WriteDisk,
234{
235    fn zs_insert<C, K, V>(&mut self, cf: &C, key: K, value: V)
236    where
237        C: rocksdb::AsColumnFamilyRef,
238        K: IntoDisk + Debug,
239        V: IntoDisk,
240    {
241        (*self).zs_insert(cf, key, value)
242    }
243
244    fn zs_merge<C, K, V>(&mut self, cf: &C, key: K, value: V)
245    where
246        C: rocksdb::AsColumnFamilyRef,
247        K: IntoDisk + Debug,
248        V: IntoDisk,
249    {
250        (*self).zs_merge(cf, key, value)
251    }
252
253    fn zs_delete<C, K>(&mut self, cf: &C, key: K)
254    where
255        C: rocksdb::AsColumnFamilyRef,
256        K: IntoDisk + Debug,
257    {
258        (*self).zs_delete(cf, key)
259    }
260
261    fn zs_delete_range<C, K>(&mut self, cf: &C, from: K, until_strictly_before: K)
262    where
263        C: rocksdb::AsColumnFamilyRef,
264        K: IntoDisk + Debug,
265    {
266        (*self).zs_delete_range(cf, from, until_strictly_before)
267    }
268}
269
270/// Helper trait for retrieving and deserializing values from rocksdb column families.
271///
272/// # Deprecation
273///
274/// This trait should not be used in new code, use [`TypedColumnFamily`] instead.
275//
276// TODO: replace uses of this trait with TypedColumnFamily,
277//       implement these methods directly on DiskDb, and delete the trait.
278pub trait ReadDisk {
279    /// Returns true if a rocksdb column family `cf` does not contain any entries.
280    fn zs_is_empty<C>(&self, cf: &C) -> bool
281    where
282        C: rocksdb::AsColumnFamilyRef;
283
284    /// Returns the value for `key` in the rocksdb column family `cf`, if present.
285    fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
286    where
287        C: rocksdb::AsColumnFamilyRef,
288        K: IntoDisk,
289        V: FromDisk;
290
291    /// Check if a rocksdb column family `cf` contains the serialized form of `key`.
292    fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
293    where
294        C: rocksdb::AsColumnFamilyRef,
295        K: IntoDisk;
296
297    /// Returns the lowest key in `cf`, and the corresponding value.
298    ///
299    /// Returns `None` if the column family is empty.
300    fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
301    where
302        C: rocksdb::AsColumnFamilyRef,
303        K: IntoDisk + FromDisk,
304        V: FromDisk;
305
306    /// Returns the highest key in `cf`, and the corresponding value.
307    ///
308    /// Returns `None` if the column family is empty.
309    fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
310    where
311        C: rocksdb::AsColumnFamilyRef,
312        K: IntoDisk + FromDisk,
313        V: FromDisk;
314
315    /// Returns the first key greater than or equal to `lower_bound` in `cf`,
316    /// and the corresponding value.
317    ///
318    /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
319    fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
320    where
321        C: rocksdb::AsColumnFamilyRef,
322        K: IntoDisk + FromDisk,
323        V: FromDisk;
324
325    /// Returns the first key strictly greater than `lower_bound` in `cf`,
326    /// and the corresponding value.
327    ///
328    /// Returns `None` if there are no keys greater than `lower_bound`.
329    fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
330    where
331        C: rocksdb::AsColumnFamilyRef,
332        K: IntoDisk + FromDisk,
333        V: FromDisk;
334
335    /// Returns the first key less than or equal to `upper_bound` in `cf`,
336    /// and the corresponding value.
337    ///
338    /// Returns `None` if there are no keys less than or equal to `upper_bound`.
339    fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
340    where
341        C: rocksdb::AsColumnFamilyRef,
342        K: IntoDisk + FromDisk,
343        V: FromDisk;
344
345    /// Returns the first key strictly less than `upper_bound` in `cf`,
346    /// and the corresponding value.
347    ///
348    /// Returns `None` if there are no keys less than `upper_bound`.
349    fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
350    where
351        C: rocksdb::AsColumnFamilyRef,
352        K: IntoDisk + FromDisk,
353        V: FromDisk;
354
355    /// Returns the keys and values in `cf` in `range`, in an ordered `BTreeMap`.
356    ///
357    /// Holding this iterator open might delay block commit transactions.
358    fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
359    where
360        C: rocksdb::AsColumnFamilyRef,
361        K: IntoDisk + FromDisk + Ord,
362        V: FromDisk,
363        R: RangeBounds<K>;
364
365    /// Returns the keys and values in `cf` in `range`, in an unordered `HashMap`.
366    ///
367    /// Holding this iterator open might delay block commit transactions.
368    fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
369    where
370        C: rocksdb::AsColumnFamilyRef,
371        K: IntoDisk + FromDisk + Eq + std::hash::Hash,
372        V: FromDisk,
373        R: RangeBounds<K>;
374}
375
376impl PartialEq for DiskDb {
377    fn eq(&self, other: &Self) -> bool {
378        if self.db.path() == other.db.path() {
379            assert_eq!(
380                self.network, other.network,
381                "database with same path but different network configs",
382            );
383            assert_eq!(
384                self.ephemeral, other.ephemeral,
385                "database with same path but different ephemeral configs",
386            );
387
388            return true;
389        }
390
391        false
392    }
393}
394
395impl Eq for DiskDb {}
396
397/// # Deprecation
398///
399/// These impls should not be used in new code, use [`TypedColumnFamily`] instead.
400//
401// TODO: replace uses of these impls with TypedColumnFamily,
402//       implement these methods directly on DiskDb, and delete the trait.
403impl ReadDisk for DiskDb {
404    fn zs_is_empty<C>(&self, cf: &C) -> bool
405    where
406        C: rocksdb::AsColumnFamilyRef,
407    {
408        // Empty column families return invalid forward iterators.
409        //
410        // Checking iterator validity does not seem to cause database hangs.
411        let iterator = self.db.iterator_cf(cf, rocksdb::IteratorMode::Start);
412        let raw_iterator: rocksdb::DBRawIteratorWithThreadMode<DB> = iterator.into();
413
414        !raw_iterator.valid()
415    }
416
417    #[allow(clippy::unwrap_in_result)]
418    fn zs_get<C, K, V>(&self, cf: &C, key: &K) -> Option<V>
419    where
420        C: rocksdb::AsColumnFamilyRef,
421        K: IntoDisk,
422        V: FromDisk,
423    {
424        let key_bytes = key.as_bytes();
425
426        // We use `get_pinned_cf` to avoid taking ownership of the serialized
427        // value, because we're going to deserialize it anyways, which avoids an
428        // extra copy
429        let value_bytes = self
430            .db
431            .get_pinned_cf(cf, key_bytes)
432            .expect("unexpected database failure");
433
434        value_bytes.map(V::from_bytes)
435    }
436
437    fn zs_contains<C, K>(&self, cf: &C, key: &K) -> bool
438    where
439        C: rocksdb::AsColumnFamilyRef,
440        K: IntoDisk,
441    {
442        let key_bytes = key.as_bytes();
443
444        // We use `get_pinned_cf` to avoid taking ownership of the serialized
445        // value, because we don't use the value at all. This avoids an extra copy.
446        self.db
447            .get_pinned_cf(cf, key_bytes)
448            .expect("unexpected database failure")
449            .is_some()
450    }
451
452    fn zs_first_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
453    where
454        C: rocksdb::AsColumnFamilyRef,
455        K: IntoDisk + FromDisk,
456        V: FromDisk,
457    {
458        // Reading individual values from iterators does not seem to cause database hangs.
459        self.zs_forward_range_iter(cf, ..).next()
460    }
461
462    fn zs_last_key_value<C, K, V>(&self, cf: &C) -> Option<(K, V)>
463    where
464        C: rocksdb::AsColumnFamilyRef,
465        K: IntoDisk + FromDisk,
466        V: FromDisk,
467    {
468        // Reading individual values from iterators does not seem to cause database hangs.
469        self.zs_reverse_range_iter(cf, ..).next()
470    }
471
472    fn zs_next_key_value_from<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
473    where
474        C: rocksdb::AsColumnFamilyRef,
475        K: IntoDisk + FromDisk,
476        V: FromDisk,
477    {
478        self.zs_forward_range_iter(cf, lower_bound..).next()
479    }
480
481    fn zs_next_key_value_strictly_after<C, K, V>(&self, cf: &C, lower_bound: &K) -> Option<(K, V)>
482    where
483        C: rocksdb::AsColumnFamilyRef,
484        K: IntoDisk + FromDisk,
485        V: FromDisk,
486    {
487        use std::ops::Bound::*;
488
489        // There is no standard syntax for an excluded start bound.
490        self.zs_forward_range_iter(cf, (Excluded(lower_bound), Unbounded))
491            .next()
492    }
493
494    fn zs_prev_key_value_back_from<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
495    where
496        C: rocksdb::AsColumnFamilyRef,
497        K: IntoDisk + FromDisk,
498        V: FromDisk,
499    {
500        self.zs_reverse_range_iter(cf, ..=upper_bound).next()
501    }
502
503    fn zs_prev_key_value_strictly_before<C, K, V>(&self, cf: &C, upper_bound: &K) -> Option<(K, V)>
504    where
505        C: rocksdb::AsColumnFamilyRef,
506        K: IntoDisk + FromDisk,
507        V: FromDisk,
508    {
509        self.zs_reverse_range_iter(cf, ..upper_bound).next()
510    }
511
512    fn zs_items_in_range_ordered<C, K, V, R>(&self, cf: &C, range: R) -> BTreeMap<K, V>
513    where
514        C: rocksdb::AsColumnFamilyRef,
515        K: IntoDisk + FromDisk + Ord,
516        V: FromDisk,
517        R: RangeBounds<K>,
518    {
519        self.zs_forward_range_iter(cf, range).collect()
520    }
521
522    fn zs_items_in_range_unordered<C, K, V, R>(&self, cf: &C, range: R) -> HashMap<K, V>
523    where
524        C: rocksdb::AsColumnFamilyRef,
525        K: IntoDisk + FromDisk + Eq + std::hash::Hash,
526        V: FromDisk,
527        R: RangeBounds<K>,
528    {
529        self.zs_forward_range_iter(cf, range).collect()
530    }
531}
532
533impl DiskWriteBatch {
534    /// Creates and returns a new transactional batch write.
535    ///
536    /// # Correctness
537    ///
538    /// Each block must be written to the state inside a batch, so that:
539    /// - concurrent `ReadStateService` queries don't see half-written blocks, and
540    /// - if Zebra calls `exit`, panics, or crashes, half-written blocks are rolled back.
541    pub fn new() -> Self {
542        DiskWriteBatch {
543            batch: rocksdb::WriteBatch::default(),
544        }
545    }
546}
547
548impl DiskDb {
549    /// Prints rocksdb metrics for each column family along with total database disk size, live data disk size and database memory size.
550    pub fn print_db_metrics(&self) {
551        let mut total_size_on_disk = 0;
552        let mut total_live_size_on_disk = 0;
553        let mut total_size_in_mem = 0;
554        let db: &Arc<DB> = &self.db;
555        let db_options = DiskDb::options();
556        let column_families = DiskDb::construct_column_families(db_options, db.path(), []);
557        let mut column_families_log_string = String::from("");
558
559        write!(column_families_log_string, "Column families and sizes: ").unwrap();
560
561        for cf_descriptor in column_families {
562            let cf_name = &cf_descriptor.name();
563            let cf_handle = db
564                .cf_handle(cf_name)
565                .expect("Column family handle must exist");
566            let live_data_size = db
567                .property_int_value_cf(cf_handle, "rocksdb.estimate-live-data-size")
568                .unwrap_or(Some(0));
569            let total_sst_files_size = db
570                .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
571                .unwrap_or(Some(0));
572            let cf_disk_size = total_sst_files_size.unwrap_or(0);
573            total_size_on_disk += cf_disk_size;
574            total_live_size_on_disk += live_data_size.unwrap_or(0);
575            let mem_table_size = db
576                .property_int_value_cf(cf_handle, "rocksdb.size-all-mem-tables")
577                .unwrap_or(Some(0));
578            total_size_in_mem += mem_table_size.unwrap_or(0);
579
580            write!(
581                column_families_log_string,
582                "{} (Disk: {}, Memory: {})",
583                cf_name,
584                human_bytes::human_bytes(cf_disk_size as f64),
585                human_bytes::human_bytes(mem_table_size.unwrap_or(0) as f64)
586            )
587            .unwrap();
588        }
589
590        debug!("{}", column_families_log_string);
591        info!(
592            "Total Database Disk Size: {}",
593            human_bytes::human_bytes(total_size_on_disk as f64)
594        );
595        info!(
596            "Total Live Data Disk Size: {}",
597            human_bytes::human_bytes(total_live_size_on_disk as f64)
598        );
599        info!(
600            "Total Database Memory Size: {}",
601            human_bytes::human_bytes(total_size_in_mem as f64)
602        );
603    }
604
605    /// Returns the estimated total disk space usage of the database.
606    pub fn size(&self) -> u64 {
607        let db: &Arc<DB> = &self.db;
608        let db_options = DiskDb::options();
609        let mut total_size_on_disk = 0;
610        for cf_descriptor in DiskDb::construct_column_families(db_options, db.path(), []) {
611            let cf_name = &cf_descriptor.name();
612            let cf_handle = db
613                .cf_handle(cf_name)
614                .expect("Column family handle must exist");
615
616            total_size_on_disk += db
617                .property_int_value_cf(cf_handle, "rocksdb.total-sst-files-size")
618                .ok()
619                .flatten()
620                .unwrap_or(0);
621        }
622
623        total_size_on_disk
624    }
625
626    /// When called with a secondary DB instance, tries to catch up with the primary DB instance
627    pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
628        self.db.try_catch_up_with_primary()
629    }
630
631    /// Returns a forward iterator over the items in `cf` in `range`.
632    ///
633    /// Holding this iterator open might delay block commit transactions.
634    pub fn zs_forward_range_iter<C, K, V, R>(
635        &self,
636        cf: &C,
637        range: R,
638    ) -> impl Iterator<Item = (K, V)> + '_
639    where
640        C: rocksdb::AsColumnFamilyRef,
641        K: IntoDisk + FromDisk,
642        V: FromDisk,
643        R: RangeBounds<K>,
644    {
645        self.zs_range_iter_with_direction(cf, range, false)
646    }
647
648    /// Returns a reverse iterator over the items in `cf` in `range`.
649    ///
650    /// Holding this iterator open might delay block commit transactions.
651    pub fn zs_reverse_range_iter<C, K, V, R>(
652        &self,
653        cf: &C,
654        range: R,
655    ) -> impl Iterator<Item = (K, V)> + '_
656    where
657        C: rocksdb::AsColumnFamilyRef,
658        K: IntoDisk + FromDisk,
659        V: FromDisk,
660        R: RangeBounds<K>,
661    {
662        self.zs_range_iter_with_direction(cf, range, true)
663    }
664
665    /// Returns an iterator over the items in `cf` in `range`.
666    ///
667    /// RocksDB iterators are ordered by increasing key bytes by default.
668    /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
669    ///
670    /// Holding this iterator open might delay block commit transactions.
671    fn zs_range_iter_with_direction<C, K, V, R>(
672        &self,
673        cf: &C,
674        range: R,
675        reverse: bool,
676    ) -> impl Iterator<Item = (K, V)> + '_
677    where
678        C: rocksdb::AsColumnFamilyRef,
679        K: IntoDisk + FromDisk,
680        V: FromDisk,
681        R: RangeBounds<K>,
682    {
683        use std::ops::Bound::{self, *};
684
685        // Replace with map() when it stabilises:
686        // https://github.com/rust-lang/rust/issues/86026
687        let map_to_vec = |bound: Bound<&K>| -> Bound<Vec<u8>> {
688            match bound {
689                Unbounded => Unbounded,
690                Included(x) => Included(x.as_bytes().as_ref().to_vec()),
691                Excluded(x) => Excluded(x.as_bytes().as_ref().to_vec()),
692            }
693        };
694
695        let start_bound = map_to_vec(range.start_bound());
696        let end_bound = map_to_vec(range.end_bound());
697        let range = (start_bound, end_bound);
698
699        let mode = Self::zs_iter_mode(&range, reverse);
700        let opts = Self::zs_iter_opts(&range);
701
702        // Reading multiple items from iterators has caused database hangs,
703        // in previous RocksDB versions
704        self.db
705            .iterator_cf_opt(cf, opts, mode)
706            .map(|result| result.expect("unexpected database failure"))
707            .map(|(key, value)| (key.to_vec(), value))
708            // Skip excluded "from" bound and empty ranges. The `mode` already skips keys
709            // strictly before the "from" bound.
710            .skip_while({
711                let range = range.clone();
712                move |(key, _value)| !range.contains(key)
713            })
714            // Take until the excluded "to" bound is reached,
715            // or we're after the included "to" bound.
716            .take_while(move |(key, _value)| range.contains(key))
717            .map(|(key, value)| (K::from_bytes(key), V::from_bytes(value)))
718    }
719
720    /// Returns the RocksDB ReadOptions with a lower and upper bound for a range.
721    fn zs_iter_opts<R>(range: &R) -> ReadOptions
722    where
723        R: RangeBounds<Vec<u8>>,
724    {
725        let mut opts = ReadOptions::default();
726        let (lower_bound, upper_bound) = Self::zs_iter_bounds(range);
727
728        if let Some(bound) = lower_bound {
729            opts.set_iterate_lower_bound(bound);
730        };
731
732        if let Some(bound) = upper_bound {
733            opts.set_iterate_upper_bound(bound);
734        };
735
736        opts
737    }
738
739    /// Returns a lower and upper iterate bounds for a range.
740    ///
741    /// Note: Since upper iterate bounds are always exclusive in RocksDB, this method
742    ///       will increment the upper bound by 1 if the end bound of the provided range
743    ///       is inclusive.
744    fn zs_iter_bounds<R>(range: &R) -> (Option<Vec<u8>>, Option<Vec<u8>>)
745    where
746        R: RangeBounds<Vec<u8>>,
747    {
748        use std::ops::Bound::*;
749
750        let lower_bound = match range.start_bound() {
751            Included(bound) | Excluded(bound) => Some(bound.clone()),
752            Unbounded => None,
753        };
754
755        let upper_bound = match range.end_bound().cloned() {
756            Included(mut bound) => {
757                // Increment the last byte in the upper bound that is less than u8::MAX, and
758                // clear any bytes after it to increment the next key in lexicographic order
759                // (next big-endian number). RocksDB uses lexicographic order for keys.
760                let is_wrapped_overflow = increment_big_endian(&mut bound);
761
762                if is_wrapped_overflow {
763                    bound.insert(0, 0x01)
764                }
765
766                Some(bound)
767            }
768            Excluded(bound) => Some(bound),
769            Unbounded => None,
770        };
771
772        (lower_bound, upper_bound)
773    }
774
775    /// Returns the RocksDB iterator "from" mode for `range`.
776    ///
777    /// RocksDB iterators are ordered by increasing key bytes by default.
778    /// Otherwise, if `reverse` is `true`, the iterator is ordered by decreasing key bytes.
779    fn zs_iter_mode<R>(range: &R, reverse: bool) -> rocksdb::IteratorMode
780    where
781        R: RangeBounds<Vec<u8>>,
782    {
783        use std::ops::Bound::*;
784
785        let from_bound = if reverse {
786            range.end_bound()
787        } else {
788            range.start_bound()
789        };
790
791        match from_bound {
792            Unbounded => {
793                if reverse {
794                    // Reversed unbounded iterators start from the last item
795                    rocksdb::IteratorMode::End
796                } else {
797                    // Unbounded iterators start from the first item
798                    rocksdb::IteratorMode::Start
799                }
800            }
801
802            Included(bound) | Excluded(bound) => {
803                let direction = if reverse {
804                    rocksdb::Direction::Reverse
805                } else {
806                    rocksdb::Direction::Forward
807                };
808
809                rocksdb::IteratorMode::From(bound.as_slice(), direction)
810            }
811        }
812    }
813
814    /// The ideal open file limit for Zebra
815    const IDEAL_OPEN_FILE_LIMIT: u64 = 1024;
816
817    /// The minimum number of open files for Zebra to operate normally. Also used
818    /// as the default open file limit, when the OS doesn't tell us how many
819    /// files we can use.
820    ///
821    /// We want 100+ file descriptors for peers, and 100+ for the database.
822    ///
823    /// On Windows, the default limit is 512 high-level I/O files, and 8192
824    /// low-level I/O files:
825    /// <https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks>
826    const MIN_OPEN_FILE_LIMIT: u64 = 512;
827
828    /// The number of files used internally by Zebra.
829    ///
830    /// Zebra uses file descriptors for OS libraries (10+), polling APIs (10+),
831    /// stdio (3), and other OS facilities (2+).
832    const RESERVED_FILE_COUNT: u64 = 48;
833
834    /// The size of the database memtable RAM cache in megabytes.
835    ///
836    /// <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#configuration-and-tuning>
837    const MEMTABLE_RAM_CACHE_MEGABYTES: usize = 128;
838
839    /// Build a vector of current column families on the disk and optionally any new column families.
840    /// Returns an iterable collection of all column families.
841    fn construct_column_families(
842        db_options: Options,
843        path: &Path,
844        column_families_in_code: impl IntoIterator<Item = String>,
845    ) -> impl Iterator<Item = ColumnFamilyDescriptor> {
846        // When opening the database in read/write mode, all column families must be opened.
847        //
848        // To make Zebra forward-compatible with databases updated by later versions,
849        // we read any existing column families off the disk, then add any new column families
850        // from the current implementation.
851        //
852        // <https://github.com/facebook/rocksdb/wiki/Column-Families#reference>
853        let column_families_on_disk = DB::list_cf(&db_options, path).unwrap_or_default();
854        let column_families_in_code = column_families_in_code.into_iter();
855
856        column_families_on_disk
857            .into_iter()
858            .chain(column_families_in_code)
859            .unique()
860            .map(move |cf_name: String| {
861                let mut cf_options = db_options.clone();
862
863                if cf_name == BALANCE_BY_TRANSPARENT_ADDR {
864                    cf_options.set_merge_operator_associative(
865                        BALANCE_BY_TRANSPARENT_ADDR_MERGE_OP,
866                        fetch_add_balance_and_received,
867                    );
868                }
869
870                rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_options.clone())
871            })
872    }
873
874    /// Opens or creates the database at a path based on the kind, major version and network,
875    /// with the supplied column families, preserving any existing column families,
876    /// and returns a shared low-level database wrapper.
877    ///
878    /// # Panics
879    ///
880    /// - If the cache directory does not exist and can't be created.
881    /// - If the database cannot be opened for whatever reason.
882    pub fn new(
883        config: &Config,
884        db_kind: impl AsRef<str>,
885        format_version_in_code: &Version,
886        network: &Network,
887        column_families_in_code: impl IntoIterator<Item = String>,
888        read_only: bool,
889    ) -> DiskDb {
890        // If the database is ephemeral, we don't need to check the cache directory.
891        if !config.ephemeral {
892            DiskDb::validate_cache_dir(&config.cache_dir);
893        }
894
895        let db_kind = db_kind.as_ref();
896        let path = config.db_path(db_kind, format_version_in_code.major, network);
897
898        let db_options = DiskDb::options();
899
900        let column_families =
901            DiskDb::construct_column_families(db_options.clone(), &path, column_families_in_code);
902
903        let db_result = if read_only {
904            // Use a tempfile for the secondary instance cache directory
905            let secondary_config = Config {
906                ephemeral: true,
907                ..config.clone()
908            };
909            let secondary_path =
910                secondary_config.db_path("secondary_state", format_version_in_code.major, network);
911            let create_dir_result = std::fs::create_dir_all(&secondary_path);
912
913            info!(?create_dir_result, "creating secondary db directory");
914
915            DB::open_cf_descriptors_as_secondary(
916                &db_options,
917                &path,
918                &secondary_path,
919                column_families,
920            )
921        } else {
922            DB::open_cf_descriptors(&db_options, &path, column_families)
923        };
924
925        match db_result {
926            Ok(db) => {
927                info!("Opened Zebra state cache at {}", path.display());
928
929                let db = DiskDb {
930                    db_kind: db_kind.to_string(),
931                    format_version_in_code: format_version_in_code.clone(),
932                    network: network.clone(),
933                    ephemeral: config.ephemeral,
934                    db: Arc::new(db),
935                };
936
937                db.assert_default_cf_is_empty();
938
939                db
940            }
941
942            Err(e) if matches!(e.kind(), ErrorKind::Busy | ErrorKind::IOError) => panic!(
943                "Database likely already open {path:?} \
944                         Hint: Check if another zebrad process is running."
945            ),
946
947            Err(e) => panic!(
948                "Opening database {path:?} failed. \
949                        Hint: Try changing the state cache_dir in the Zebra config. \
950                        Error: {e}",
951            ),
952        }
953    }
954
955    // Accessor methods
956
957    /// Returns the configured database kind for this database.
958    pub fn db_kind(&self) -> String {
959        self.db_kind.clone()
960    }
961
962    /// Returns the format version of the running code that created this `DiskDb` instance in memory.
963    pub fn format_version_in_code(&self) -> Version {
964        self.format_version_in_code.clone()
965    }
966
967    /// Returns the fixed major version for this database.
968    pub fn major_version(&self) -> u64 {
969        self.format_version_in_code().major
970    }
971
972    /// Returns the configured network for this database.
973    pub fn network(&self) -> Network {
974        self.network.clone()
975    }
976
977    /// Returns the `Path` where the files used by this database are located.
978    pub fn path(&self) -> &Path {
979        self.db.path()
980    }
981
982    /// Returns the low-level rocksdb inner database.
983    #[allow(dead_code)]
984    fn inner(&self) -> &Arc<DB> {
985        &self.db
986    }
987
988    /// Returns the column family handle for `cf_name`.
989    pub fn cf_handle(&self, cf_name: &str) -> Option<rocksdb::ColumnFamilyRef<'_>> {
990        // Note: the lifetime returned by this method is subtly wrong. As of December 2023 it is
991        // the shorter of &self and &str, but RocksDB clones column family names internally, so it
992        // should just be &self. To avoid this restriction, clone the string before passing it to
993        // this method. Currently Zebra uses static strings, so this doesn't matter.
994        self.db.cf_handle(cf_name)
995    }
996
997    // Read methods are located in the ReadDisk trait
998
999    // Write methods
1000    // Low-level write methods are located in the WriteDisk trait
1001
1002    /// Writes `batch` to the database.
1003    pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
1004        self.db.write(batch.batch)
1005    }
1006
1007    // Private methods
1008
1009    /// Tries to reuse an existing db after a major upgrade.
1010    ///
1011    /// If the current db version belongs to `restorable_db_versions`, the function moves a previous
1012    /// db to a new path so it can be used again. It does so by merely trying to rename the path
1013    /// corresponding to the db version directly preceding the current version to the path that is
1014    /// used by the current db. If successful, it also deletes the db version file.
1015    ///
1016    /// Returns the old disk version if one existed and the db directory was renamed, or None otherwise.
1017    // TODO: Update this function to rename older major db format version to the current version (#9565).
1018    #[allow(clippy::unwrap_in_result)]
1019    pub(crate) fn try_reusing_previous_db_after_major_upgrade(
1020        restorable_db_versions: &[u64],
1021        format_version_in_code: &Version,
1022        config: &Config,
1023        db_kind: impl AsRef<str>,
1024        network: &Network,
1025    ) -> Option<Version> {
1026        if let Some(&major_db_ver) = restorable_db_versions
1027            .iter()
1028            .find(|v| **v == format_version_in_code.major)
1029        {
1030            let db_kind = db_kind.as_ref();
1031
1032            let old_major_db_ver = major_db_ver - 1;
1033            let old_path = config.db_path(db_kind, old_major_db_ver, network);
1034            // Exit early if the path doesn't exist or there's an error checking it.
1035            if !fs::exists(&old_path).unwrap_or(false) {
1036                return None;
1037            }
1038
1039            let new_path = config.db_path(db_kind, major_db_ver, network);
1040
1041            let old_path = match fs::canonicalize(&old_path) {
1042                Ok(canonicalized_old_path) => canonicalized_old_path,
1043                Err(e) => {
1044                    warn!("could not canonicalize {old_path:?}: {e}");
1045                    return None;
1046                }
1047            };
1048
1049            let cache_path = match fs::canonicalize(&config.cache_dir) {
1050                Ok(canonicalized_cache_path) => canonicalized_cache_path,
1051                Err(e) => {
1052                    warn!("could not canonicalize {:?}: {e}", config.cache_dir);
1053                    return None;
1054                }
1055            };
1056
1057            // # Correctness
1058            //
1059            // Check that the path we're about to move is inside the cache directory.
1060            //
1061            // If the user has symlinked the state directory to a non-cache directory, we don't want
1062            // to move it, because it might contain other files.
1063            //
1064            // We don't attempt to guard against malicious symlinks created by attackers
1065            // (TOCTOU attacks). Zebra should not be run with elevated privileges.
1066            if !old_path.starts_with(&cache_path) {
1067                info!("skipped reusing previous state cache: state is outside cache directory");
1068                return None;
1069            }
1070
1071            let opts = DiskDb::options();
1072            let old_db_exists = DB::list_cf(&opts, &old_path).is_ok_and(|cf| !cf.is_empty());
1073            let new_db_exists = DB::list_cf(&opts, &new_path).is_ok_and(|cf| !cf.is_empty());
1074
1075            if old_db_exists && !new_db_exists {
1076                // Create the parent directory for the new db. This is because we can't directly
1077                // rename e.g. `state/v25/mainnet/` to `state/v26/mainnet/` with `fs::rename()` if
1078                // `state/v26/` does not exist.
1079                match fs::create_dir_all(
1080                    new_path
1081                        .parent()
1082                        .expect("new state cache must have a parent path"),
1083                ) {
1084                    Ok(()) => info!("created new directory for state cache at {new_path:?}"),
1085                    Err(e) => {
1086                        warn!(
1087                            "could not create new directory for state cache at {new_path:?}: {e}"
1088                        );
1089                        return None;
1090                    }
1091                };
1092
1093                match fs::rename(&old_path, &new_path) {
1094                    Ok(()) => {
1095                        info!("moved state cache from {old_path:?} to {new_path:?}");
1096
1097                        let mut disk_version =
1098                            database_format_version_on_disk(config, db_kind, major_db_ver, network)
1099                                .expect("unable to read database format version file")
1100                                .expect("unable to parse database format version");
1101
1102                        disk_version.major = old_major_db_ver;
1103
1104                        write_database_format_version_to_disk(
1105                            config,
1106                            db_kind,
1107                            major_db_ver,
1108                            &disk_version,
1109                            network,
1110                        )
1111                        .expect("unable to write database format version file to disk");
1112
1113                        // Get the parent of the old path, e.g. `state/v25/` and delete it if it is
1114                        // empty.
1115                        let old_path = old_path
1116                            .parent()
1117                            .expect("old state cache must have parent path");
1118
1119                        if fs::read_dir(old_path)
1120                            .expect("cached state dir needs to be readable")
1121                            .next()
1122                            .is_none()
1123                        {
1124                            match fs::remove_dir_all(old_path) {
1125                                Ok(()) => {
1126                                    info!("removed empty old state cache directory at {old_path:?}")
1127                                }
1128                                Err(e) => {
1129                                    warn!(
1130                                        "could not remove empty old state cache directory \
1131                                           at {old_path:?}: {e}"
1132                                    )
1133                                }
1134                            }
1135                        }
1136
1137                        return Some(disk_version);
1138                    }
1139                    Err(e) => {
1140                        warn!("could not move state cache from {old_path:?} to {new_path:?}: {e}");
1141                    }
1142                };
1143            }
1144        };
1145
1146        None
1147    }
1148
1149    /// Returns the database options for the finalized state database.
1150    fn options() -> rocksdb::Options {
1151        let mut opts = rocksdb::Options::default();
1152        let mut block_based_opts = rocksdb::BlockBasedOptions::default();
1153
1154        const ONE_MEGABYTE: usize = 1024 * 1024;
1155
1156        opts.create_if_missing(true);
1157        opts.create_missing_column_families(true);
1158
1159        // Use the recommended Ribbon filter setting for all column families.
1160        //
1161        // Ribbon filters are faster than Bloom filters in Zebra, as of April 2022.
1162        // (They aren't needed for single-valued column families, but they don't hurt either.)
1163        block_based_opts.set_ribbon_filter(9.9);
1164
1165        // Use the recommended LZ4 compression type.
1166        //
1167        // https://github.com/facebook/rocksdb/wiki/Compression#configuration
1168        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
1169
1170        // Tune level-style database file compaction.
1171        //
1172        // This improves Zebra's initial sync speed slightly, as of April 2022.
1173        opts.optimize_level_style_compaction(Self::MEMTABLE_RAM_CACHE_MEGABYTES * ONE_MEGABYTE);
1174
1175        // Increase the process open file limit if needed,
1176        // then use it to set RocksDB's limit.
1177        let open_file_limit = DiskDb::increase_open_file_limit();
1178        let db_file_limit = DiskDb::get_db_open_file_limit(open_file_limit);
1179
1180        // If the current limit is very large, set the DB limit using the ideal limit
1181        let ideal_limit = DiskDb::get_db_open_file_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT)
1182            .try_into()
1183            .expect("ideal open file limit fits in a c_int");
1184        let db_file_limit = db_file_limit.try_into().unwrap_or(ideal_limit);
1185
1186        opts.set_max_open_files(db_file_limit);
1187
1188        // Set the block-based options
1189        opts.set_block_based_table_factory(&block_based_opts);
1190
1191        opts
1192    }
1193
1194    /// Calculate the database's share of `open_file_limit`
1195    fn get_db_open_file_limit(open_file_limit: u64) -> u64 {
1196        // Give the DB half the files, and reserve half the files for peers
1197        (open_file_limit - DiskDb::RESERVED_FILE_COUNT) / 2
1198    }
1199
1200    /// Increase the open file limit for this process to `IDEAL_OPEN_FILE_LIMIT`.
1201    /// If that fails, try `MIN_OPEN_FILE_LIMIT`.
1202    ///
1203    /// If the current limit is above `IDEAL_OPEN_FILE_LIMIT`, leaves it
1204    /// unchanged.
1205    ///
1206    /// Returns the current limit, after any successful increases.
1207    ///
1208    /// # Panics
1209    ///
1210    /// If the open file limit can not be increased to `MIN_OPEN_FILE_LIMIT`.
1211    fn increase_open_file_limit() -> u64 {
1212        // Zebra mainly uses TCP sockets (`zebra-network`) and low-level files
1213        // (`zebra-state` database).
1214        //
1215        // On Unix-based platforms, `increase_nofile_limit` changes the limit for
1216        // both database files and TCP connections.
1217        //
1218        // But it doesn't do anything on Windows in rlimit 0.7.0.
1219        //
1220        // On Windows, the default limits are:
1221        // - 512 high-level stream I/O files (via the C standard functions),
1222        // - 8192 low-level I/O files (via the Unix C functions), and
1223        // - 1000 TCP Control Block entries (network connections).
1224        //
1225        // https://docs.microsoft.com/en-us/cpp/c-runtime-library/reference/setmaxstdio?view=msvc-160#remarks
1226        // http://smallvoid.com/article/winnt-tcpip-max-limit.html
1227        //
1228        // `zebra-state`'s `IDEAL_OPEN_FILE_LIMIT` is much less than
1229        // the Windows low-level I/O file limit.
1230        //
1231        // The [`setmaxstdio` and `getmaxstdio`](https://docs.rs/rlimit/latest/rlimit/#windows)
1232        // functions from the `rlimit` crate only change the high-level I/O file limit.
1233        //
1234        // `zebra-network`'s default connection limit is much less than
1235        // the TCP Control Block limit on Windows.
1236
1237        // We try setting the ideal limit, then the minimum limit.
1238        let current_limit = match increase_nofile_limit(DiskDb::IDEAL_OPEN_FILE_LIMIT) {
1239            Ok(current_limit) => current_limit,
1240            Err(limit_error) => {
1241                // These errors can happen due to sandboxing or unsupported system calls,
1242                // even if the file limit is high enough.
1243                info!(
1244                    ?limit_error,
1245                    min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1246                    ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1247                    "unable to increase the open file limit, \
1248                     assuming Zebra can open a minimum number of files"
1249                );
1250
1251                return DiskDb::MIN_OPEN_FILE_LIMIT;
1252            }
1253        };
1254
1255        if current_limit < DiskDb::MIN_OPEN_FILE_LIMIT {
1256            panic!(
1257                "open file limit too low: \
1258                 unable to set the number of open files to {}, \
1259                 the minimum number of files required by Zebra. \
1260                 Current limit is {:?}. \
1261                 Hint: Increase the open file limit to {} before launching Zebra",
1262                DiskDb::MIN_OPEN_FILE_LIMIT,
1263                current_limit,
1264                DiskDb::IDEAL_OPEN_FILE_LIMIT
1265            );
1266        } else if current_limit < DiskDb::IDEAL_OPEN_FILE_LIMIT {
1267            warn!(
1268                ?current_limit,
1269                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1270                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1271                "the maximum number of open files is below Zebra's ideal limit. \
1272                 Hint: Increase the open file limit to {} before launching Zebra",
1273                DiskDb::IDEAL_OPEN_FILE_LIMIT
1274            );
1275        } else if cfg!(windows) {
1276            // This log is verbose during tests.
1277            #[cfg(not(test))]
1278            info!(
1279                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1280                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1281                "assuming the open file limit is high enough for Zebra",
1282            );
1283            #[cfg(test)]
1284            debug!(
1285                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1286                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1287                "assuming the open file limit is high enough for Zebra",
1288            );
1289        } else {
1290            #[cfg(not(test))]
1291            debug!(
1292                ?current_limit,
1293                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1294                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1295                "the open file limit is high enough for Zebra",
1296            );
1297            #[cfg(test)]
1298            debug!(
1299                ?current_limit,
1300                min_limit = ?DiskDb::MIN_OPEN_FILE_LIMIT,
1301                ideal_limit = ?DiskDb::IDEAL_OPEN_FILE_LIMIT,
1302                "the open file limit is high enough for Zebra",
1303            );
1304        }
1305
1306        current_limit
1307    }
1308
1309    // Cleanup methods
1310
1311    /// Returns the number of shared instances of this database.
1312    ///
1313    /// # Concurrency
1314    ///
1315    /// The actual number of owners can be higher or lower than the returned value,
1316    /// because databases can simultaneously be cloned or dropped in other threads.
1317    ///
1318    /// However, if the number of owners is 1, and the caller has exclusive access,
1319    /// the count can't increase unless that caller clones the database.
1320    pub(crate) fn shared_database_owners(&self) -> usize {
1321        Arc::strong_count(&self.db) + Arc::weak_count(&self.db)
1322    }
1323
1324    /// Shut down the database, cleaning up background tasks and ephemeral data.
1325    ///
1326    /// If `force` is true, clean up regardless of any shared references.
1327    /// `force` can cause errors accessing the database from other shared references.
1328    /// It should only be used in debugging or test code, immediately before a manual shutdown.
1329    ///
1330    /// TODO: make private after the stop height check has moved to the syncer (#3442)
1331    ///       move shutting down the database to a blocking thread (#2188)
1332    pub(crate) fn shutdown(&mut self, force: bool) {
1333        // # Correctness
1334        //
1335        // If we're the only owner of the shared database instance,
1336        // then there are no other threads that can increase the strong or weak count.
1337        //
1338        // ## Implementation Requirements
1339        //
1340        // This function and all functions that it calls should avoid cloning the shared database
1341        // instance. If they do, they must drop it before:
1342        // - shutting down database threads, or
1343        // - deleting database files.
1344
1345        if self.shared_database_owners() > 1 {
1346            let path = self.path();
1347
1348            let mut ephemeral_note = "";
1349
1350            if force {
1351                if self.ephemeral {
1352                    ephemeral_note = " and removing ephemeral files";
1353                }
1354
1355                // This log is verbose during tests.
1356                #[cfg(not(test))]
1357                info!(
1358                    ?path,
1359                    "forcing shutdown{} of a state database with multiple active instances",
1360                    ephemeral_note,
1361                );
1362                #[cfg(test)]
1363                debug!(
1364                    ?path,
1365                    "forcing shutdown{} of a state database with multiple active instances",
1366                    ephemeral_note,
1367                );
1368            } else {
1369                if self.ephemeral {
1370                    ephemeral_note = " and files";
1371                }
1372
1373                debug!(
1374                    ?path,
1375                    "dropping DiskDb clone, \
1376                     but keeping shared database instance{} until the last reference is dropped",
1377                    ephemeral_note,
1378                );
1379                return;
1380            }
1381        }
1382
1383        self.assert_default_cf_is_empty();
1384
1385        // Drop isn't guaranteed to run, such as when we panic, or if the tokio shutdown times out.
1386        //
1387        // Zebra's data should be fine if we don't clean up, because:
1388        // - the database flushes regularly anyway
1389        // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back
1390        // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually
1391        let path = self.path();
1392        debug!(?path, "flushing database to disk");
1393
1394        // These flushes can fail during forced shutdown or during Drop after a shutdown,
1395        // particularly in tests. If they fail, there's nothing we can do about it anyway.
1396        if let Err(error) = self.db.flush() {
1397            let error = format!("{error:?}");
1398            if error.to_ascii_lowercase().contains("shutdown in progress") {
1399                debug!(
1400                    ?error,
1401                    ?path,
1402                    "expected shutdown error flushing database SST files to disk"
1403                );
1404            } else {
1405                info!(
1406                    ?error,
1407                    ?path,
1408                    "unexpected error flushing database SST files to disk during shutdown"
1409                );
1410            }
1411        }
1412
1413        if let Err(error) = self.db.flush_wal(true) {
1414            let error = format!("{error:?}");
1415            if error.to_ascii_lowercase().contains("shutdown in progress") {
1416                debug!(
1417                    ?error,
1418                    ?path,
1419                    "expected shutdown error flushing database WAL buffer to disk"
1420                );
1421            } else {
1422                info!(
1423                    ?error,
1424                    ?path,
1425                    "unexpected error flushing database WAL buffer to disk during shutdown"
1426                );
1427            }
1428        }
1429
1430        // # Memory Safety
1431        //
1432        // We'd like to call `cancel_all_background_work()` before Zebra exits,
1433        // but when we call it, we get memory, thread, or C++ errors when the process exits.
1434        // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until
1435        // all the threads have cleaned up.)
1436        //
1437        // # Change History
1438        //
1439        // We've changed this setting multiple times since 2021, in response to new RocksDB
1440        // and Rust compiler behaviour.
1441        //
1442        // We enabled cancel_all_background_work() due to failures on:
1443        // - Rust 1.57 on Linux
1444        //
1445        // We disabled cancel_all_background_work() due to failures on:
1446        // - Rust 1.64 on Linux
1447        //
1448        // We tried enabling cancel_all_background_work() due to failures on:
1449        // - Rust 1.70 on macOS 12.6.5 on x86_64
1450        // but it didn't stop the aborts happening (PR #6820).
1451        //
1452        // There weren't any failures with cancel_all_background_work() disabled on:
1453        // - Rust 1.69 or earlier
1454        // - Linux with Rust 1.70
1455        // And with cancel_all_background_work() enabled or disabled on:
1456        // - macOS 13.2 on aarch64 (M1), native and emulated x86_64, with Rust 1.70
1457        //
1458        // # Detailed Description
1459        //
1460        // We see these kinds of errors:
1461        // ```
1462        // pthread lock: Invalid argument
1463        // pure virtual method called
1464        // terminate called without an active exception
1465        // pthread destroy mutex: Device or resource busy
1466        // Aborted (core dumped)
1467        // signal: 6, SIGABRT: process abort signal
1468        // signal: 11, SIGSEGV: invalid memory reference
1469        // ```
1470        //
1471        // # Reference
1472        //
1473        // The RocksDB wiki says:
1474        // > Q: Is it safe to close RocksDB while another thread is issuing read, write or manual compaction requests?
1475        // >
1476        // > A: No. The users of RocksDB need to make sure all functions have finished before they close RocksDB.
1477        // > You can speed up the waiting by calling CancelAllBackgroundWork().
1478        //
1479        // <https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ>
1480        //
1481        // > rocksdb::DB instances need to be destroyed before your main function exits.
1482        // > RocksDB instances usually depend on some internal static variables.
1483        // > Users need to make sure rocksdb::DB instances are destroyed before those static variables.
1484        //
1485        // <https://github.com/facebook/rocksdb/wiki/Known-Issues>
1486        //
1487        // # TODO
1488        //
1489        // Try re-enabling this code and fixing the underlying concurrency bug.
1490        //
1491        //info!(?path, "stopping background database tasks");
1492        //self.db.cancel_all_background_work(true);
1493
1494        // We'd like to drop the database before deleting its files,
1495        // because that closes the column families and the database correctly.
1496        // But Rust's ownership rules make that difficult,
1497        // so we just flush and delete ephemeral data instead.
1498        //
1499        // This implementation doesn't seem to cause any issues,
1500        // and the RocksDB Drop implementation handles any cleanup.
1501        self.delete_ephemeral();
1502    }
1503
1504    /// If the database is `ephemeral`, delete its files.
1505    fn delete_ephemeral(&mut self) {
1506        // # Correctness
1507        //
1508        // This function and all functions that it calls should avoid cloning the shared database
1509        // instance. See `shutdown()` for details.
1510
1511        if !self.ephemeral {
1512            return;
1513        }
1514
1515        let path = self.path();
1516
1517        // This log is verbose during tests.
1518        #[cfg(not(test))]
1519        info!(?path, "removing temporary database files");
1520        #[cfg(test)]
1521        debug!(?path, "removing temporary database files");
1522
1523        // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases,
1524        // but the Zcash blockchain might not fit in memory. So we just
1525        // delete the database files instead.
1526        //
1527        // We'd also like to call `DB::destroy` here, but calling destroy on a
1528        // live DB is undefined behaviour:
1529        // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite
1530        //
1531        // So we assume that all the database files are under `path`, and
1532        // delete them using standard filesystem APIs. Deleting open files
1533        // might cause errors on non-Unix platforms, so we ignore the result.
1534        // (The OS will delete them eventually anyway, if they are in a temporary directory.)
1535        let result = std::fs::remove_dir_all(path);
1536
1537        if result.is_err() {
1538            // This log is verbose during tests.
1539            #[cfg(not(test))]
1540            info!(
1541                ?result,
1542                ?path,
1543                "removing temporary database files caused an error",
1544            );
1545            #[cfg(test)]
1546            debug!(
1547                ?result,
1548                ?path,
1549                "removing temporary database files caused an error",
1550            );
1551        } else {
1552            debug!(
1553                ?result,
1554                ?path,
1555                "successfully removed temporary database files",
1556            );
1557        }
1558    }
1559
1560    /// Check that the "default" column family is empty.
1561    ///
1562    /// # Panics
1563    ///
1564    /// If Zebra has a bug where it is storing data in the wrong column family.
1565    fn assert_default_cf_is_empty(&self) {
1566        // # Correctness
1567        //
1568        // This function and all functions that it calls should avoid cloning the shared database
1569        // instance. See `shutdown()` for details.
1570
1571        if let Some(default_cf) = self.cf_handle("default") {
1572            assert!(
1573                self.zs_is_empty(&default_cf),
1574                "Zebra should not store data in the 'default' column family"
1575            );
1576        }
1577    }
1578
1579    // Validates a cache directory and creates it if it doesn't exist.
1580    // If the directory cannot be created, it panics with a specific error message.
1581    fn validate_cache_dir(cache_dir: &std::path::PathBuf) {
1582        if let Err(e) = fs::create_dir_all(cache_dir) {
1583            match e.kind() {
1584                std::io::ErrorKind::PermissionDenied => panic!(
1585                    "Permission denied creating {cache_dir:?}. \
1586                     Hint: check if cache directory exist and has write permissions."
1587                ),
1588                std::io::ErrorKind::StorageFull => panic!(
1589                    "No space left on device creating {cache_dir:?}. \
1590                     Hint: check if the disk is full."
1591                ),
1592                _ => panic!("Could not create cache dir {:?}: {}", cache_dir, e),
1593            }
1594        }
1595    }
1596}
1597
1598impl Drop for DiskDb {
1599    fn drop(&mut self) {
1600        let path = self.path();
1601        debug!(?path, "dropping DiskDb instance");
1602
1603        self.shutdown(false);
1604    }
1605}