zebra_state/service/finalized_state/disk_format/
upgrade.rs

1//! In-place format upgrades and format validity checks for the Zebra state database.
2
3use std::{
4    cmp::Ordering,
5    sync::Arc,
6    thread::{self, JoinHandle},
7};
8
9use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
10use semver::Version;
11use tracing::Span;
12
13use zebra_chain::{
14    block::Height,
15    diagnostic::{
16        task::{CheckForPanics, WaitForPanics},
17        CodeTimer,
18    },
19};
20
21use DbFormatChange::*;
22
23use crate::service::finalized_state::ZebraDb;
24
25pub(crate) mod add_subtrees;
26pub(crate) mod cache_genesis_roots;
27pub(crate) mod fix_tree_key_type;
28pub(crate) mod no_migration;
29pub(crate) mod prune_trees;
30pub(crate) mod tree_keys_and_caches_upgrade;
31
32#[cfg(not(feature = "indexer"))]
33pub(crate) mod drop_tx_locs_by_spends;
34
35#[cfg(feature = "indexer")]
36pub(crate) mod track_tx_locs_by_spends;
37
38/// Defines method signature for running disk format upgrades.
39pub trait DiskFormatUpgrade {
40    /// Returns the version at which this upgrade is applied.
41    fn version(&self) -> Version;
42
43    /// Returns the description of this upgrade.
44    fn description(&self) -> &'static str;
45
46    /// Runs disk format upgrade.
47    fn run(
48        &self,
49        initial_tip_height: Height,
50        db: &ZebraDb,
51        cancel_receiver: &Receiver<CancelFormatChange>,
52    ) -> Result<(), CancelFormatChange>;
53
54    /// Check that state has been upgraded to this format correctly.
55    ///
56    /// The outer `Result` indicates whether the validation was cancelled (due to e.g. node shutdown).
57    /// The inner `Result` indicates whether the validation itself failed or not.
58    fn validate(
59        &self,
60        _db: &ZebraDb,
61        _cancel_receiver: &Receiver<CancelFormatChange>,
62    ) -> Result<Result<(), String>, CancelFormatChange> {
63        Ok(Ok(()))
64    }
65
66    /// Prepare for disk format upgrade.
67    fn prepare(
68        &self,
69        _initial_tip_height: Height,
70        _upgrade_db: &ZebraDb,
71        _cancel_receiver: &Receiver<CancelFormatChange>,
72        _older_disk_version: &Version,
73    ) -> Result<(), CancelFormatChange> {
74        Ok(())
75    }
76
77    /// Returns true if the [`DiskFormatUpgrade`] needs to run a migration on existing data in the db.
78    fn needs_migration(&self) -> bool {
79        true
80    }
81}
82
83fn format_upgrades(
84    min_version: Option<Version>,
85) -> impl Iterator<Item = Box<dyn DiskFormatUpgrade>> {
86    let min_version = move || min_version.clone().unwrap_or(Version::new(0, 0, 0));
87
88    // Note: Disk format upgrades must be run in order of database version.
89    ([
90        Box::new(prune_trees::PruneTrees),
91        Box::new(add_subtrees::AddSubtrees),
92        Box::new(tree_keys_and_caches_upgrade::FixTreeKeyTypeAndCacheGenesisRoots),
93        // Value balance upgrade
94        Box::new(no_migration::NoMigration::new(26, 0, 0)),
95    ] as [Box<dyn DiskFormatUpgrade>; 4])
96        .into_iter()
97        .filter(move |upgrade| upgrade.version() > min_version())
98}
99
100/// The kind of database format change or validity check we're performing.
101#[derive(Clone, Debug, Eq, PartialEq)]
102pub enum DbFormatChange {
103    // Data Format Changes
104    //
105    /// Upgrade the format from `older_disk_version` to `newer_running_version`.
106    ///
107    /// Until this upgrade is complete, the format is a mixture of both versions.
108    Upgrade {
109        older_disk_version: Version,
110        newer_running_version: Version,
111    },
112
113    // Format Version File Changes
114    //
115    /// Mark the format as newly created by `running_version`.
116    ///
117    /// Newly created databases are opened with no disk version.
118    /// It is set to the running version by the format change code.
119    NewlyCreated { running_version: Version },
120
121    /// Mark the format as downgraded from `newer_disk_version` to `older_running_version`.
122    ///
123    /// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state
124    /// version (or greater), the format will be a mixture of both versions.
125    Downgrade {
126        newer_disk_version: Version,
127        older_running_version: Version,
128    },
129
130    // Data Format Checks
131    //
132    /// Check that the database from a previous instance has the current `running_version` format.
133    ///
134    /// Current version databases have a disk version that matches the running version.
135    /// No upgrades are needed, so we just run a format check on the database.
136    /// The data in that database was created or updated by a previous Zebra instance.
137    CheckOpenCurrent { running_version: Version },
138
139    /// Check that the database from this instance has the current `running_version` format.
140    ///
141    /// The data in that database was created or updated by the currently running Zebra instance.
142    /// So we periodically check for data bugs, which can happen if the upgrade and new block
143    /// code produce different data. (They can also be caused by disk corruption.)
144    CheckNewBlocksCurrent { running_version: Version },
145}
146
147/// A handle to a spawned format change thread.
148///
149/// Cloning this struct creates an additional handle to the same thread.
150///
151/// # Concurrency
152///
153/// Cancelling the thread on drop has a race condition, because two handles can be dropped at
154/// the same time.
155///
156/// If cancelling the thread is required for correct operation or usability, the owner of the
157/// handle must call force_cancel().
158#[derive(Clone, Debug)]
159pub struct DbFormatChangeThreadHandle {
160    /// A handle to the format change/check thread.
161    /// If configured, this thread continues running so it can perform periodic format checks.
162    ///
163    /// Panics from this thread are propagated into Zebra's state service.
164    /// The task returns an error if the upgrade was cancelled by a shutdown.
165    update_task: Option<Arc<JoinHandle<Result<(), CancelFormatChange>>>>,
166
167    /// A channel that tells the running format thread to finish early.
168    cancel_handle: Sender<CancelFormatChange>,
169}
170
171/// Marker type that is sent to cancel a format upgrade, and returned as an error on cancellation.
172#[derive(Copy, Clone, Debug, Eq, PartialEq)]
173pub struct CancelFormatChange;
174
175impl DbFormatChange {
176    /// Returns the format change for `running_version` code loading a `disk_version` database.
177    ///
178    /// Also logs that change at info level.
179    ///
180    /// If `disk_version` is `None`, Zebra is creating a new database.
181    pub fn open_database(running_version: &Version, disk_version: Option<Version>) -> Self {
182        let running_version = running_version.clone();
183
184        let Some(disk_version) = disk_version else {
185            info!(
186                %running_version,
187                "creating new database with the current format"
188            );
189
190            return NewlyCreated { running_version };
191        };
192
193        match disk_version.cmp_precedence(&running_version) {
194            Ordering::Less => {
195                info!(
196                    %running_version,
197                    %disk_version,
198                    "trying to open older database format: launching upgrade task"
199                );
200
201                Upgrade {
202                    older_disk_version: disk_version,
203                    newer_running_version: running_version,
204                }
205            }
206            Ordering::Greater => {
207                info!(
208                    %running_version,
209                    %disk_version,
210                    "trying to open newer database format: data should be compatible"
211                );
212
213                Downgrade {
214                    newer_disk_version: disk_version,
215                    older_running_version: running_version,
216                }
217            }
218            Ordering::Equal => {
219                info!(%running_version, "trying to open current database format");
220
221                CheckOpenCurrent { running_version }
222            }
223        }
224    }
225
226    /// Returns a format check for newly added blocks in the currently running Zebra version.
227    /// This check makes sure the upgrade and new block code produce the same data.
228    ///
229    /// Also logs the check at info level.
230    pub fn check_new_blocks(db: &ZebraDb) -> Self {
231        let running_version = db.format_version_in_code();
232
233        info!(%running_version, "checking new blocks were written in current database format");
234        CheckNewBlocksCurrent { running_version }
235    }
236
237    /// Returns true if this format change/check is an upgrade.
238    #[allow(dead_code)]
239    pub fn is_upgrade(&self) -> bool {
240        matches!(self, Upgrade { .. })
241    }
242
243    /// Returns true if this format change/check happens at startup.
244    #[allow(dead_code)]
245    pub fn is_run_at_startup(&self) -> bool {
246        !matches!(self, CheckNewBlocksCurrent { .. })
247    }
248
249    /// Returns the running version in this format change.
250    pub fn running_version(&self) -> Version {
251        match self {
252            Upgrade {
253                newer_running_version,
254                ..
255            } => newer_running_version,
256            Downgrade {
257                older_running_version,
258                ..
259            } => older_running_version,
260            NewlyCreated { running_version }
261            | CheckOpenCurrent { running_version }
262            | CheckNewBlocksCurrent { running_version } => running_version,
263        }
264        .clone()
265    }
266
267    /// Returns the initial database version before this format change.
268    ///
269    /// Returns `None` if the database was newly created.
270    pub fn initial_disk_version(&self) -> Option<Version> {
271        match self {
272            Upgrade {
273                older_disk_version, ..
274            } => Some(older_disk_version),
275            Downgrade {
276                newer_disk_version, ..
277            } => Some(newer_disk_version),
278            CheckOpenCurrent { running_version } | CheckNewBlocksCurrent { running_version } => {
279                Some(running_version)
280            }
281            NewlyCreated { .. } => None,
282        }
283        .cloned()
284    }
285
286    /// Launch a `std::thread` that applies this format change to the database,
287    /// then continues running to perform periodic format checks.
288    ///
289    /// `initial_tip_height` is the database height when it was opened, and `db` is the
290    /// database instance to upgrade or check.
291    pub fn spawn_format_change(
292        self,
293        db: ZebraDb,
294        initial_tip_height: Option<Height>,
295    ) -> DbFormatChangeThreadHandle {
296        // # Correctness
297        //
298        // Cancel handles must use try_send() to avoid blocking waiting for the format change
299        // thread to shut down.
300        let (cancel_handle, cancel_receiver) = bounded(1);
301
302        let span = Span::current();
303        let update_task = thread::spawn(move || {
304            span.in_scope(move || {
305                self.format_change_run_loop(db, initial_tip_height, cancel_receiver)
306            })
307        });
308
309        let mut handle = DbFormatChangeThreadHandle {
310            update_task: Some(Arc::new(update_task)),
311            cancel_handle,
312        };
313
314        handle.check_for_panics();
315
316        handle
317    }
318
319    /// Run the initial format change or check to the database. Under the default runtime config,
320    /// this method returns after the format change or check.
321    ///
322    /// But if runtime validity checks are enabled, this method periodically checks the format of
323    /// newly added blocks matches the current format. It will run until it is cancelled or panics.
324    fn format_change_run_loop(
325        self,
326        db: ZebraDb,
327        initial_tip_height: Option<Height>,
328        cancel_receiver: Receiver<CancelFormatChange>,
329    ) -> Result<(), CancelFormatChange> {
330        self.run_format_change_or_check(&db, initial_tip_height, &cancel_receiver)?;
331
332        let Some(debug_validity_check_interval) = db.config().debug_validity_check_interval else {
333            return Ok(());
334        };
335
336        loop {
337            // We've just run a format check, so sleep first, then run another one.
338            // But return early if there is a cancel signal.
339            if !matches!(
340                cancel_receiver.recv_timeout(debug_validity_check_interval),
341                Err(RecvTimeoutError::Timeout)
342            ) {
343                return Err(CancelFormatChange);
344            }
345
346            Self::check_new_blocks(&db).run_format_change_or_check(
347                &db,
348                initial_tip_height,
349                &cancel_receiver,
350            )?;
351        }
352    }
353
354    /// Run a format change in the database, or check the format of the database once.
355    #[allow(clippy::unwrap_in_result)]
356    pub(crate) fn run_format_change_or_check(
357        &self,
358        db: &ZebraDb,
359        initial_tip_height: Option<Height>,
360        cancel_receiver: &Receiver<CancelFormatChange>,
361    ) -> Result<(), CancelFormatChange> {
362        match self {
363            // Perform any required upgrades, then mark the state as upgraded.
364            Upgrade { .. } => self.apply_format_upgrade(db, initial_tip_height, cancel_receiver)?,
365
366            NewlyCreated { .. } => {
367                Self::mark_as_newly_created(db);
368            }
369
370            Downgrade { .. } => {
371                // # Correctness
372                //
373                // At the start of a format downgrade, the database must be marked as partially or
374                // fully downgraded. This lets newer Zebra versions know that some blocks with older
375                // formats have been added to the database.
376                Self::mark_as_downgraded(db);
377
378                // Older supported versions just assume they can read newer formats,
379                // because they can't predict all changes a newer Zebra version could make.
380                //
381                // The responsibility of staying backwards-compatible is on the newer version.
382                // We do this on a best-effort basis for versions that are still supported.
383            }
384
385            CheckOpenCurrent { running_version } => {
386                // If we're re-opening a previously upgraded or newly created database,
387                // the database format should be valid. This check is done below.
388                info!(
389                    %running_version,
390                    "checking database format produced by a previous zebra instance \
391                     is current and valid"
392                );
393            }
394
395            CheckNewBlocksCurrent { running_version } => {
396                // If we've added new blocks using the non-upgrade code,
397                // the database format should be valid. This check is done below.
398                //
399                // TODO: should this check panic or just log an error?
400                //       Currently, we panic to avoid consensus bugs, but this could cause a denial
401                //       of service. We can make errors fail in CI using ZEBRA_FAILURE_MESSAGES.
402                info!(
403                    %running_version,
404                    "checking database format produced by new blocks in this instance is valid"
405                );
406            }
407        }
408
409        #[cfg(feature = "indexer")]
410        if let (
411            Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
412            Some(initial_tip_height),
413        ) = (self, initial_tip_height)
414        {
415            // Indexing transaction locations by their spent outpoints and revealed nullifiers.
416            let timer = CodeTimer::start();
417
418            // Add build metadata to on-disk version file just before starting to add indexes
419            let mut version = db
420                .format_version_on_disk()
421                .expect("unable to read database format version file")
422                .expect("should write database format version file above");
423            version.build = db.format_version_in_code().build;
424
425            db.update_format_version_on_disk(&version)
426                .expect("unable to write database format version file to disk");
427
428            info!("started checking/adding indexes for spending tx ids");
429            track_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
430            info!("finished checking/adding indexes for spending tx ids");
431
432            timer.finish(module_path!(), line!(), "indexing spending transaction ids");
433        };
434
435        #[cfg(not(feature = "indexer"))]
436        if let (
437            Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
438            Some(initial_tip_height),
439        ) = (self, initial_tip_height)
440        {
441            let mut version = db
442                .format_version_on_disk()
443                .expect("unable to read database format version file")
444                .expect("should write database format version file above");
445
446            if version.build.contains("indexer") {
447                // Indexing transaction locations by their spent outpoints and revealed nullifiers.
448                let timer = CodeTimer::start();
449
450                info!("started removing indexes for spending tx ids");
451                drop_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
452                info!("finished removing indexes for spending tx ids");
453
454                // Remove build metadata to on-disk version file after indexes have been dropped.
455                version.build = db.format_version_in_code().build;
456                db.update_format_version_on_disk(&version)
457                    .expect("unable to write database format version file to disk");
458
459                timer.finish(module_path!(), line!(), "removing spending transaction ids");
460            }
461        };
462
463        // These checks should pass for all format changes:
464        // - upgrades should produce a valid format (and they already do that check)
465        // - an empty state should pass all the format checks
466        // - since the running Zebra code knows how to upgrade the database to this format,
467        //   downgrades using this running code still know how to create a valid database
468        //   (unless a future upgrade breaks these format checks)
469        // - re-opening the current version should be valid, regardless of whether the upgrade
470        //   or new block code created the format (or any combination).
471        Self::format_validity_checks_detailed(db, cancel_receiver)?.unwrap_or_else(|_| {
472            panic!(
473                "unexpected invalid database format: delete and re-sync the database at '{:?}'",
474                db.path()
475            )
476        });
477
478        let inital_disk_version = self
479            .initial_disk_version()
480            .map_or_else(|| "None".to_string(), |version| version.to_string());
481        info!(
482            running_version = %self.running_version(),
483            %inital_disk_version,
484            "database format is valid"
485        );
486
487        Ok(())
488    }
489
490    // TODO: Move state-specific upgrade code to a finalized_state/* module.
491
492    /// Apply any required format updates to the database.
493    /// Format changes should be launched in an independent `std::thread`.
494    ///
495    /// If `cancel_receiver` gets a message, or its sender is dropped,
496    /// the format change stops running early, and returns an error.
497    ///
498    /// See the format upgrade design docs for more details:
499    /// <https://github.com/ZcashFoundation/zebra/blob/main/book/src/dev/state-db-upgrades.md#design>
500    //
501    // New format upgrades must be added to the *end* of this method.
502    #[allow(clippy::unwrap_in_result)]
503    fn apply_format_upgrade(
504        &self,
505        db: &ZebraDb,
506        initial_tip_height: Option<Height>,
507        cancel_receiver: &Receiver<CancelFormatChange>,
508    ) -> Result<(), CancelFormatChange> {
509        let Upgrade {
510            newer_running_version,
511            older_disk_version,
512        } = self
513        else {
514            unreachable!("already checked for Upgrade")
515        };
516
517        // # New Upgrades Sometimes Go Here
518        //
519        // If the format change is outside RocksDb, put new code above this comment!
520        let Some(initial_tip_height) = initial_tip_height else {
521            // If the database is empty, then the RocksDb format doesn't need any changes.
522            info!(
523                %newer_running_version,
524                %older_disk_version,
525                "marking empty database as upgraded"
526            );
527
528            Self::mark_as_upgraded_to(db, newer_running_version);
529
530            info!(
531                %newer_running_version,
532                %older_disk_version,
533                "empty database is fully upgraded"
534            );
535
536            return Ok(());
537        };
538
539        // Apply or validate format upgrades
540        for upgrade in format_upgrades(Some(older_disk_version.clone())) {
541            if upgrade.needs_migration() {
542                let timer = CodeTimer::start();
543
544                upgrade.prepare(initial_tip_height, db, cancel_receiver, older_disk_version)?;
545                upgrade.run(initial_tip_height, db, cancel_receiver)?;
546
547                // Before marking the state as upgraded, check that the upgrade completed successfully.
548                upgrade
549                    .validate(db, cancel_receiver)?
550                    .expect("db should be valid after upgrade");
551
552                timer.finish(module_path!(), line!(), upgrade.description());
553            }
554
555            // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the
556            // database is marked, so the upgrade MUST be complete at this point.
557            info!(
558                newer_running_version = ?upgrade.version(),
559                "Zebra automatically upgraded the database format"
560            );
561            Self::mark_as_upgraded_to(db, &upgrade.version());
562        }
563
564        Ok(())
565    }
566
567    /// Run quick checks that the current database format is valid.
568    #[allow(clippy::vec_init_then_push)]
569    pub fn format_validity_checks_quick(db: &ZebraDb) -> Result<(), String> {
570        let timer = CodeTimer::start();
571        let mut results = Vec::new();
572
573        // Check the entire format before returning any errors.
574        results.push(db.check_max_on_disk_tip_height());
575
576        // This check can be run before the upgrade, but the upgrade code is finished, so we don't
577        // run it early any more. (If future code changes accidentally make it depend on the
578        // upgrade, they would accidentally break compatibility with older Zebra cached states.)
579        results.push(add_subtrees::subtree_format_calculation_pre_checks(db));
580
581        results.push(cache_genesis_roots::quick_check(db));
582        results.push(fix_tree_key_type::quick_check(db));
583
584        // The work is done in the functions we just called.
585        timer.finish(module_path!(), line!(), "format_validity_checks_quick()");
586
587        if results.iter().any(Result::is_err) {
588            let err = Err(format!("invalid quick check: {results:?}"));
589            error!(?err);
590            return err;
591        }
592
593        Ok(())
594    }
595
596    /// Run detailed checks that the current database format is valid.
597    #[allow(clippy::vec_init_then_push)]
598    pub fn format_validity_checks_detailed(
599        db: &ZebraDb,
600        cancel_receiver: &Receiver<CancelFormatChange>,
601    ) -> Result<Result<(), String>, CancelFormatChange> {
602        let timer = CodeTimer::start();
603        let mut results = Vec::new();
604
605        // Check the entire format before returning any errors.
606        //
607        // Do the quick checks first, so we don't have to do this in every detailed check.
608        results.push(Self::format_validity_checks_quick(db));
609
610        for upgrade in format_upgrades(None) {
611            results.push(upgrade.validate(db, cancel_receiver)?);
612        }
613
614        // The work is done in the functions we just called.
615        timer.finish(module_path!(), line!(), "format_validity_checks_detailed()");
616
617        if results.iter().any(Result::is_err) {
618            let err = Err(format!("invalid detailed check: {results:?}"));
619            error!(?err);
620            return Ok(err);
621        }
622
623        Ok(Ok(()))
624    }
625
626    /// Mark a newly created database with the current format version.
627    ///
628    /// This should be called when a newly created database is opened.
629    ///
630    /// # Concurrency
631    ///
632    /// The version must only be updated while RocksDB is holding the database
633    /// directory lock. This prevents multiple Zebra instances corrupting the version
634    /// file.
635    ///
636    /// # Panics
637    ///
638    /// If the format should not have been upgraded, because the database is not newly created.
639    fn mark_as_newly_created(db: &ZebraDb) {
640        let running_version = db.format_version_in_code();
641        let disk_version = db
642            .format_version_on_disk()
643            .expect("unable to read database format version file path");
644
645        let default_new_version = Some(Version::new(running_version.major, 0, 0));
646
647        // The database version isn't empty any more, because we've created the RocksDB database
648        // and acquired its lock. (If it is empty, we have a database locking bug.)
649        assert_eq!(
650            disk_version, default_new_version,
651            "can't overwrite the format version in an existing database:\n\
652             disk: {disk_version:?}\n\
653             running: {running_version}"
654        );
655
656        db.update_format_version_on_disk(&running_version)
657            .expect("unable to write database format version file to disk");
658
659        info!(
660            %running_version,
661            disk_version = %disk_version.map_or("None".to_string(), |version| version.to_string()),
662            "marked database format as newly created"
663        );
664    }
665
666    /// Mark the database as upgraded to `format_upgrade_version`.
667    ///
668    /// This should be called when an older database is opened by an older Zebra version,
669    /// after each version upgrade is complete.
670    ///
671    /// # Concurrency
672    ///
673    /// The version must only be updated while RocksDB is holding the database
674    /// directory lock. This prevents multiple Zebra instances corrupting the version
675    /// file.
676    ///
677    /// # Panics
678    ///
679    /// If the format should not have been upgraded, because the running version is:
680    /// - older than the disk version (that's a downgrade)
681    /// - the same as to the disk version (no upgrade needed)
682    ///
683    /// If the format should not have been upgraded, because the format upgrade version is:
684    /// - older or the same as the disk version
685    ///   (multiple upgrades to the same version are not allowed)
686    /// - greater than the running version (that's a logic bug)
687    fn mark_as_upgraded_to(db: &ZebraDb, format_upgrade_version: &Version) {
688        let running_version = db.format_version_in_code();
689        let disk_version = db
690            .format_version_on_disk()
691            .expect("unable to read database format version file")
692            .expect("tried to upgrade a newly created database");
693
694        assert!(
695            running_version > disk_version,
696            "can't upgrade a database that is being opened by an older or the same Zebra version:\n\
697             disk: {disk_version}\n\
698             upgrade: {format_upgrade_version}\n\
699             running: {running_version}"
700        );
701
702        assert!(
703            format_upgrade_version > &disk_version,
704            "can't upgrade a database that has already been upgraded, or is newer:\n\
705             disk: {disk_version}\n\
706             upgrade: {format_upgrade_version}\n\
707             running: {running_version}"
708        );
709
710        assert!(
711            format_upgrade_version <= &running_version,
712            "can't upgrade to a newer version than the running Zebra version:\n\
713             disk: {disk_version}\n\
714             upgrade: {format_upgrade_version}\n\
715             running: {running_version}"
716        );
717
718        db.update_format_version_on_disk(format_upgrade_version)
719            .expect("unable to write database format version file to disk");
720
721        info!(
722            %running_version,
723            %disk_version,
724            // wait_for_state_version_upgrade() needs this to be the last field,
725            // so the regex matches correctly
726            %format_upgrade_version,
727            "marked database format as upgraded"
728        );
729    }
730
731    /// Mark the database as downgraded to the running database version.
732    /// This should be called after a newer database is opened by an older Zebra version.
733    ///
734    /// # Concurrency
735    ///
736    /// The version must only be updated while RocksDB is holding the database
737    /// directory lock. This prevents multiple Zebra instances corrupting the version
738    /// file.
739    ///
740    /// # Panics
741    ///
742    /// If the format should have been upgraded, because the running version is newer.
743    /// If the state is newly created, because the running version should be the same.
744    ///
745    /// Multiple downgrades are allowed, because they all downgrade to the same running version.
746    fn mark_as_downgraded(db: &ZebraDb) {
747        let running_version = db.format_version_in_code();
748        let disk_version = db
749            .format_version_on_disk()
750            .expect("unable to read database format version file")
751            .expect("can't downgrade a newly created database");
752
753        assert!(
754            disk_version >= running_version,
755            "can't downgrade a database that is being opened by a newer Zebra version:\n\
756             disk: {disk_version}\n\
757             running: {running_version}"
758        );
759
760        db.update_format_version_on_disk(&running_version)
761            .expect("unable to write database format version file to disk");
762
763        info!(
764            %running_version,
765            %disk_version,
766            "marked database format as downgraded"
767        );
768    }
769}
770
771impl DbFormatChangeThreadHandle {
772    /// Cancel the running format change thread, if this is the last handle.
773    /// Returns true if it was actually cancelled.
774    pub fn cancel_if_needed(&self) -> bool {
775        // # Correctness
776        //
777        // Checking the strong count has a race condition, because two handles can be dropped at
778        // the same time.
779        //
780        // If cancelling the thread is important, the owner of the handle must call force_cancel().
781        if let Some(update_task) = self.update_task.as_ref() {
782            if Arc::strong_count(update_task) <= 1 {
783                self.force_cancel();
784                return true;
785            }
786        }
787
788        false
789    }
790
791    /// Force the running format change thread to cancel, even if there are other handles.
792    pub fn force_cancel(&self) {
793        // There's nothing we can do about errors here.
794        // If the channel is disconnected, the task has exited.
795        // If it's full, it's already been cancelled.
796        let _ = self.cancel_handle.try_send(CancelFormatChange);
797    }
798
799    /// Check for panics in the code running in the spawned thread.
800    /// If the thread exited with a panic, resume that panic.
801    ///
802    /// This method should be called regularly, so that panics are detected as soon as possible.
803    pub fn check_for_panics(&mut self) {
804        self.update_task.panic_if_task_has_panicked();
805    }
806
807    /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.
808    ///
809    /// Exits early if the thread has other outstanding handles.
810    ///
811    /// This method should be called during shutdown.
812    pub fn wait_for_panics(&mut self) {
813        self.update_task.wait_for_panics();
814    }
815}
816
817impl Drop for DbFormatChangeThreadHandle {
818    fn drop(&mut self) {
819        // Only cancel the format change if the state service is shutting down.
820        if self.cancel_if_needed() {
821            self.wait_for_panics();
822        } else {
823            self.check_for_panics();
824        }
825    }
826}
827
828#[test]
829fn format_upgrades_are_in_version_order() {
830    let mut last_version = Version::new(0, 0, 0);
831    for upgrade in format_upgrades(None) {
832        assert!(upgrade.version() > last_version);
833        last_version = upgrade.version();
834    }
835}