zebra_state/service/finalized_state/disk_format/upgrade.rs
1//! In-place format upgrades and format validity checks for the Zebra state database.
2
3use std::{
4 cmp::Ordering,
5 sync::Arc,
6 thread::{self, JoinHandle},
7};
8
9use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
10use semver::Version;
11use tracing::Span;
12
13use zebra_chain::{
14 block::Height,
15 diagnostic::{
16 task::{CheckForPanics, WaitForPanics},
17 CodeTimer,
18 },
19};
20
21use DbFormatChange::*;
22
23use crate::service::finalized_state::ZebraDb;
24
25pub(crate) mod add_subtrees;
26pub(crate) mod cache_genesis_roots;
27pub(crate) mod fix_tree_key_type;
28pub(crate) mod no_migration;
29pub(crate) mod prune_trees;
30pub(crate) mod tree_keys_and_caches_upgrade;
31
32#[cfg(not(feature = "indexer"))]
33pub(crate) mod drop_tx_locs_by_spends;
34
35#[cfg(feature = "indexer")]
36pub(crate) mod track_tx_locs_by_spends;
37
38/// Defines method signature for running disk format upgrades.
39pub trait DiskFormatUpgrade {
40 /// Returns the version at which this upgrade is applied.
41 fn version(&self) -> Version;
42
43 /// Returns the description of this upgrade.
44 fn description(&self) -> &'static str;
45
46 /// Runs disk format upgrade.
47 fn run(
48 &self,
49 initial_tip_height: Height,
50 db: &ZebraDb,
51 cancel_receiver: &Receiver<CancelFormatChange>,
52 ) -> Result<(), CancelFormatChange>;
53
54 /// Check that state has been upgraded to this format correctly.
55 ///
56 /// The outer `Result` indicates whether the validation was cancelled (due to e.g. node shutdown).
57 /// The inner `Result` indicates whether the validation itself failed or not.
58 fn validate(
59 &self,
60 _db: &ZebraDb,
61 _cancel_receiver: &Receiver<CancelFormatChange>,
62 ) -> Result<Result<(), String>, CancelFormatChange> {
63 Ok(Ok(()))
64 }
65
66 /// Prepare for disk format upgrade.
67 fn prepare(
68 &self,
69 _initial_tip_height: Height,
70 _upgrade_db: &ZebraDb,
71 _cancel_receiver: &Receiver<CancelFormatChange>,
72 _older_disk_version: &Version,
73 ) -> Result<(), CancelFormatChange> {
74 Ok(())
75 }
76
77 /// Returns true if the [`DiskFormatUpgrade`] needs to run a migration on existing data in the db.
78 fn needs_migration(&self) -> bool {
79 true
80 }
81}
82
83fn format_upgrades(
84 min_version: Option<Version>,
85) -> impl Iterator<Item = Box<dyn DiskFormatUpgrade>> {
86 let min_version = move || min_version.clone().unwrap_or(Version::new(0, 0, 0));
87
88 // Note: Disk format upgrades must be run in order of database version.
89 ([
90 Box::new(prune_trees::PruneTrees),
91 Box::new(add_subtrees::AddSubtrees),
92 Box::new(tree_keys_and_caches_upgrade::FixTreeKeyTypeAndCacheGenesisRoots),
93 // Value balance upgrade
94 Box::new(no_migration::NoMigration::new(26, 0, 0)),
95 ] as [Box<dyn DiskFormatUpgrade>; 4])
96 .into_iter()
97 .filter(move |upgrade| upgrade.version() > min_version())
98}
99
100/// The kind of database format change or validity check we're performing.
101#[derive(Clone, Debug, Eq, PartialEq)]
102pub enum DbFormatChange {
103 // Data Format Changes
104 //
105 /// Upgrade the format from `older_disk_version` to `newer_running_version`.
106 ///
107 /// Until this upgrade is complete, the format is a mixture of both versions.
108 Upgrade {
109 older_disk_version: Version,
110 newer_running_version: Version,
111 },
112
113 // Format Version File Changes
114 //
115 /// Mark the format as newly created by `running_version`.
116 ///
117 /// Newly created databases are opened with no disk version.
118 /// It is set to the running version by the format change code.
119 NewlyCreated { running_version: Version },
120
121 /// Mark the format as downgraded from `newer_disk_version` to `older_running_version`.
122 ///
123 /// Until the state is upgraded to `newer_disk_version` by a Zebra version with that state
124 /// version (or greater), the format will be a mixture of both versions.
125 Downgrade {
126 newer_disk_version: Version,
127 older_running_version: Version,
128 },
129
130 // Data Format Checks
131 //
132 /// Check that the database from a previous instance has the current `running_version` format.
133 ///
134 /// Current version databases have a disk version that matches the running version.
135 /// No upgrades are needed, so we just run a format check on the database.
136 /// The data in that database was created or updated by a previous Zebra instance.
137 CheckOpenCurrent { running_version: Version },
138
139 /// Check that the database from this instance has the current `running_version` format.
140 ///
141 /// The data in that database was created or updated by the currently running Zebra instance.
142 /// So we periodically check for data bugs, which can happen if the upgrade and new block
143 /// code produce different data. (They can also be caused by disk corruption.)
144 CheckNewBlocksCurrent { running_version: Version },
145}
146
147/// A handle to a spawned format change thread.
148///
149/// Cloning this struct creates an additional handle to the same thread.
150///
151/// # Concurrency
152///
153/// Cancelling the thread on drop has a race condition, because two handles can be dropped at
154/// the same time.
155///
156/// If cancelling the thread is required for correct operation or usability, the owner of the
157/// handle must call force_cancel().
158#[derive(Clone, Debug)]
159pub struct DbFormatChangeThreadHandle {
160 /// A handle to the format change/check thread.
161 /// If configured, this thread continues running so it can perform periodic format checks.
162 ///
163 /// Panics from this thread are propagated into Zebra's state service.
164 /// The task returns an error if the upgrade was cancelled by a shutdown.
165 update_task: Option<Arc<JoinHandle<Result<(), CancelFormatChange>>>>,
166
167 /// A channel that tells the running format thread to finish early.
168 cancel_handle: Sender<CancelFormatChange>,
169}
170
171/// Marker type that is sent to cancel a format upgrade, and returned as an error on cancellation.
172#[derive(Copy, Clone, Debug, Eq, PartialEq)]
173pub struct CancelFormatChange;
174
175impl DbFormatChange {
176 /// Returns the format change for `running_version` code loading a `disk_version` database.
177 ///
178 /// Also logs that change at info level.
179 ///
180 /// If `disk_version` is `None`, Zebra is creating a new database.
181 pub fn open_database(running_version: &Version, disk_version: Option<Version>) -> Self {
182 let running_version = running_version.clone();
183
184 let Some(disk_version) = disk_version else {
185 info!(
186 %running_version,
187 "creating new database with the current format"
188 );
189
190 return NewlyCreated { running_version };
191 };
192
193 match disk_version.cmp_precedence(&running_version) {
194 Ordering::Less => {
195 info!(
196 %running_version,
197 %disk_version,
198 "trying to open older database format: launching upgrade task"
199 );
200
201 Upgrade {
202 older_disk_version: disk_version,
203 newer_running_version: running_version,
204 }
205 }
206 Ordering::Greater => {
207 info!(
208 %running_version,
209 %disk_version,
210 "trying to open newer database format: data should be compatible"
211 );
212
213 Downgrade {
214 newer_disk_version: disk_version,
215 older_running_version: running_version,
216 }
217 }
218 Ordering::Equal => {
219 info!(%running_version, "trying to open current database format");
220
221 CheckOpenCurrent { running_version }
222 }
223 }
224 }
225
226 /// Returns a format check for newly added blocks in the currently running Zebra version.
227 /// This check makes sure the upgrade and new block code produce the same data.
228 ///
229 /// Also logs the check at info level.
230 pub fn check_new_blocks(db: &ZebraDb) -> Self {
231 let running_version = db.format_version_in_code();
232
233 info!(%running_version, "checking new blocks were written in current database format");
234 CheckNewBlocksCurrent { running_version }
235 }
236
237 /// Returns true if this format change/check is an upgrade.
238 #[allow(dead_code)]
239 pub fn is_upgrade(&self) -> bool {
240 matches!(self, Upgrade { .. })
241 }
242
243 /// Returns true if this format change/check happens at startup.
244 #[allow(dead_code)]
245 pub fn is_run_at_startup(&self) -> bool {
246 !matches!(self, CheckNewBlocksCurrent { .. })
247 }
248
249 /// Returns the running version in this format change.
250 pub fn running_version(&self) -> Version {
251 match self {
252 Upgrade {
253 newer_running_version,
254 ..
255 } => newer_running_version,
256 Downgrade {
257 older_running_version,
258 ..
259 } => older_running_version,
260 NewlyCreated { running_version }
261 | CheckOpenCurrent { running_version }
262 | CheckNewBlocksCurrent { running_version } => running_version,
263 }
264 .clone()
265 }
266
267 /// Returns the initial database version before this format change.
268 ///
269 /// Returns `None` if the database was newly created.
270 pub fn initial_disk_version(&self) -> Option<Version> {
271 match self {
272 Upgrade {
273 older_disk_version, ..
274 } => Some(older_disk_version),
275 Downgrade {
276 newer_disk_version, ..
277 } => Some(newer_disk_version),
278 CheckOpenCurrent { running_version } | CheckNewBlocksCurrent { running_version } => {
279 Some(running_version)
280 }
281 NewlyCreated { .. } => None,
282 }
283 .cloned()
284 }
285
286 /// Launch a `std::thread` that applies this format change to the database,
287 /// then continues running to perform periodic format checks.
288 ///
289 /// `initial_tip_height` is the database height when it was opened, and `db` is the
290 /// database instance to upgrade or check.
291 pub fn spawn_format_change(
292 self,
293 db: ZebraDb,
294 initial_tip_height: Option<Height>,
295 ) -> DbFormatChangeThreadHandle {
296 // # Correctness
297 //
298 // Cancel handles must use try_send() to avoid blocking waiting for the format change
299 // thread to shut down.
300 let (cancel_handle, cancel_receiver) = bounded(1);
301
302 let span = Span::current();
303 let update_task = thread::spawn(move || {
304 span.in_scope(move || {
305 self.format_change_run_loop(db, initial_tip_height, cancel_receiver)
306 })
307 });
308
309 let mut handle = DbFormatChangeThreadHandle {
310 update_task: Some(Arc::new(update_task)),
311 cancel_handle,
312 };
313
314 handle.check_for_panics();
315
316 handle
317 }
318
319 /// Run the initial format change or check to the database. Under the default runtime config,
320 /// this method returns after the format change or check.
321 ///
322 /// But if runtime validity checks are enabled, this method periodically checks the format of
323 /// newly added blocks matches the current format. It will run until it is cancelled or panics.
324 fn format_change_run_loop(
325 self,
326 db: ZebraDb,
327 initial_tip_height: Option<Height>,
328 cancel_receiver: Receiver<CancelFormatChange>,
329 ) -> Result<(), CancelFormatChange> {
330 self.run_format_change_or_check(&db, initial_tip_height, &cancel_receiver)?;
331
332 let Some(debug_validity_check_interval) = db.config().debug_validity_check_interval else {
333 return Ok(());
334 };
335
336 loop {
337 // We've just run a format check, so sleep first, then run another one.
338 // But return early if there is a cancel signal.
339 if !matches!(
340 cancel_receiver.recv_timeout(debug_validity_check_interval),
341 Err(RecvTimeoutError::Timeout)
342 ) {
343 return Err(CancelFormatChange);
344 }
345
346 Self::check_new_blocks(&db).run_format_change_or_check(
347 &db,
348 initial_tip_height,
349 &cancel_receiver,
350 )?;
351 }
352 }
353
354 /// Run a format change in the database, or check the format of the database once.
355 #[allow(clippy::unwrap_in_result)]
356 pub(crate) fn run_format_change_or_check(
357 &self,
358 db: &ZebraDb,
359 initial_tip_height: Option<Height>,
360 cancel_receiver: &Receiver<CancelFormatChange>,
361 ) -> Result<(), CancelFormatChange> {
362 match self {
363 // Perform any required upgrades, then mark the state as upgraded.
364 Upgrade { .. } => self.apply_format_upgrade(db, initial_tip_height, cancel_receiver)?,
365
366 NewlyCreated { .. } => {
367 Self::mark_as_newly_created(db);
368 }
369
370 Downgrade { .. } => {
371 // # Correctness
372 //
373 // At the start of a format downgrade, the database must be marked as partially or
374 // fully downgraded. This lets newer Zebra versions know that some blocks with older
375 // formats have been added to the database.
376 Self::mark_as_downgraded(db);
377
378 // Older supported versions just assume they can read newer formats,
379 // because they can't predict all changes a newer Zebra version could make.
380 //
381 // The responsibility of staying backwards-compatible is on the newer version.
382 // We do this on a best-effort basis for versions that are still supported.
383 }
384
385 CheckOpenCurrent { running_version } => {
386 // If we're re-opening a previously upgraded or newly created database,
387 // the database format should be valid. This check is done below.
388 info!(
389 %running_version,
390 "checking database format produced by a previous zebra instance \
391 is current and valid"
392 );
393 }
394
395 CheckNewBlocksCurrent { running_version } => {
396 // If we've added new blocks using the non-upgrade code,
397 // the database format should be valid. This check is done below.
398 //
399 // TODO: should this check panic or just log an error?
400 // Currently, we panic to avoid consensus bugs, but this could cause a denial
401 // of service. We can make errors fail in CI using ZEBRA_FAILURE_MESSAGES.
402 info!(
403 %running_version,
404 "checking database format produced by new blocks in this instance is valid"
405 );
406 }
407 }
408
409 #[cfg(feature = "indexer")]
410 if let (
411 Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
412 Some(initial_tip_height),
413 ) = (self, initial_tip_height)
414 {
415 // Indexing transaction locations by their spent outpoints and revealed nullifiers.
416 let timer = CodeTimer::start();
417
418 // Add build metadata to on-disk version file just before starting to add indexes
419 let mut version = db
420 .format_version_on_disk()
421 .expect("unable to read database format version file")
422 .expect("should write database format version file above");
423 version.build = db.format_version_in_code().build;
424
425 db.update_format_version_on_disk(&version)
426 .expect("unable to write database format version file to disk");
427
428 info!("started checking/adding indexes for spending tx ids");
429 track_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
430 info!("finished checking/adding indexes for spending tx ids");
431
432 timer.finish(module_path!(), line!(), "indexing spending transaction ids");
433 };
434
435 #[cfg(not(feature = "indexer"))]
436 if let (
437 Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. },
438 Some(initial_tip_height),
439 ) = (self, initial_tip_height)
440 {
441 let mut version = db
442 .format_version_on_disk()
443 .expect("unable to read database format version file")
444 .expect("should write database format version file above");
445
446 if version.build.contains("indexer") {
447 // Indexing transaction locations by their spent outpoints and revealed nullifiers.
448 let timer = CodeTimer::start();
449
450 info!("started removing indexes for spending tx ids");
451 drop_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?;
452 info!("finished removing indexes for spending tx ids");
453
454 // Remove build metadata to on-disk version file after indexes have been dropped.
455 version.build = db.format_version_in_code().build;
456 db.update_format_version_on_disk(&version)
457 .expect("unable to write database format version file to disk");
458
459 timer.finish(module_path!(), line!(), "removing spending transaction ids");
460 }
461 };
462
463 // These checks should pass for all format changes:
464 // - upgrades should produce a valid format (and they already do that check)
465 // - an empty state should pass all the format checks
466 // - since the running Zebra code knows how to upgrade the database to this format,
467 // downgrades using this running code still know how to create a valid database
468 // (unless a future upgrade breaks these format checks)
469 // - re-opening the current version should be valid, regardless of whether the upgrade
470 // or new block code created the format (or any combination).
471 Self::format_validity_checks_detailed(db, cancel_receiver)?.unwrap_or_else(|_| {
472 panic!(
473 "unexpected invalid database format: delete and re-sync the database at '{:?}'",
474 db.path()
475 )
476 });
477
478 let inital_disk_version = self
479 .initial_disk_version()
480 .map_or_else(|| "None".to_string(), |version| version.to_string());
481 info!(
482 running_version = %self.running_version(),
483 %inital_disk_version,
484 "database format is valid"
485 );
486
487 Ok(())
488 }
489
490 // TODO: Move state-specific upgrade code to a finalized_state/* module.
491
492 /// Apply any required format updates to the database.
493 /// Format changes should be launched in an independent `std::thread`.
494 ///
495 /// If `cancel_receiver` gets a message, or its sender is dropped,
496 /// the format change stops running early, and returns an error.
497 ///
498 /// See the format upgrade design docs for more details:
499 /// <https://github.com/ZcashFoundation/zebra/blob/main/book/src/dev/state-db-upgrades.md#design>
500 //
501 // New format upgrades must be added to the *end* of this method.
502 #[allow(clippy::unwrap_in_result)]
503 fn apply_format_upgrade(
504 &self,
505 db: &ZebraDb,
506 initial_tip_height: Option<Height>,
507 cancel_receiver: &Receiver<CancelFormatChange>,
508 ) -> Result<(), CancelFormatChange> {
509 let Upgrade {
510 newer_running_version,
511 older_disk_version,
512 } = self
513 else {
514 unreachable!("already checked for Upgrade")
515 };
516
517 // # New Upgrades Sometimes Go Here
518 //
519 // If the format change is outside RocksDb, put new code above this comment!
520 let Some(initial_tip_height) = initial_tip_height else {
521 // If the database is empty, then the RocksDb format doesn't need any changes.
522 info!(
523 %newer_running_version,
524 %older_disk_version,
525 "marking empty database as upgraded"
526 );
527
528 Self::mark_as_upgraded_to(db, newer_running_version);
529
530 info!(
531 %newer_running_version,
532 %older_disk_version,
533 "empty database is fully upgraded"
534 );
535
536 return Ok(());
537 };
538
539 // Apply or validate format upgrades
540 for upgrade in format_upgrades(Some(older_disk_version.clone())) {
541 if upgrade.needs_migration() {
542 let timer = CodeTimer::start();
543
544 upgrade.prepare(initial_tip_height, db, cancel_receiver, older_disk_version)?;
545 upgrade.run(initial_tip_height, db, cancel_receiver)?;
546
547 // Before marking the state as upgraded, check that the upgrade completed successfully.
548 upgrade
549 .validate(db, cancel_receiver)?
550 .expect("db should be valid after upgrade");
551
552 timer.finish(module_path!(), line!(), upgrade.description());
553 }
554
555 // Mark the database as upgraded. Zebra won't repeat the upgrade anymore once the
556 // database is marked, so the upgrade MUST be complete at this point.
557 info!(
558 newer_running_version = ?upgrade.version(),
559 "Zebra automatically upgraded the database format"
560 );
561 Self::mark_as_upgraded_to(db, &upgrade.version());
562 }
563
564 Ok(())
565 }
566
567 /// Run quick checks that the current database format is valid.
568 #[allow(clippy::vec_init_then_push)]
569 pub fn format_validity_checks_quick(db: &ZebraDb) -> Result<(), String> {
570 let timer = CodeTimer::start();
571 let mut results = Vec::new();
572
573 // Check the entire format before returning any errors.
574 results.push(db.check_max_on_disk_tip_height());
575
576 // This check can be run before the upgrade, but the upgrade code is finished, so we don't
577 // run it early any more. (If future code changes accidentally make it depend on the
578 // upgrade, they would accidentally break compatibility with older Zebra cached states.)
579 results.push(add_subtrees::subtree_format_calculation_pre_checks(db));
580
581 results.push(cache_genesis_roots::quick_check(db));
582 results.push(fix_tree_key_type::quick_check(db));
583
584 // The work is done in the functions we just called.
585 timer.finish(module_path!(), line!(), "format_validity_checks_quick()");
586
587 if results.iter().any(Result::is_err) {
588 let err = Err(format!("invalid quick check: {results:?}"));
589 error!(?err);
590 return err;
591 }
592
593 Ok(())
594 }
595
596 /// Run detailed checks that the current database format is valid.
597 #[allow(clippy::vec_init_then_push)]
598 pub fn format_validity_checks_detailed(
599 db: &ZebraDb,
600 cancel_receiver: &Receiver<CancelFormatChange>,
601 ) -> Result<Result<(), String>, CancelFormatChange> {
602 let timer = CodeTimer::start();
603 let mut results = Vec::new();
604
605 // Check the entire format before returning any errors.
606 //
607 // Do the quick checks first, so we don't have to do this in every detailed check.
608 results.push(Self::format_validity_checks_quick(db));
609
610 for upgrade in format_upgrades(None) {
611 results.push(upgrade.validate(db, cancel_receiver)?);
612 }
613
614 // The work is done in the functions we just called.
615 timer.finish(module_path!(), line!(), "format_validity_checks_detailed()");
616
617 if results.iter().any(Result::is_err) {
618 let err = Err(format!("invalid detailed check: {results:?}"));
619 error!(?err);
620 return Ok(err);
621 }
622
623 Ok(Ok(()))
624 }
625
626 /// Mark a newly created database with the current format version.
627 ///
628 /// This should be called when a newly created database is opened.
629 ///
630 /// # Concurrency
631 ///
632 /// The version must only be updated while RocksDB is holding the database
633 /// directory lock. This prevents multiple Zebra instances corrupting the version
634 /// file.
635 ///
636 /// # Panics
637 ///
638 /// If the format should not have been upgraded, because the database is not newly created.
639 fn mark_as_newly_created(db: &ZebraDb) {
640 let running_version = db.format_version_in_code();
641 let disk_version = db
642 .format_version_on_disk()
643 .expect("unable to read database format version file path");
644
645 let default_new_version = Some(Version::new(running_version.major, 0, 0));
646
647 // The database version isn't empty any more, because we've created the RocksDB database
648 // and acquired its lock. (If it is empty, we have a database locking bug.)
649 assert_eq!(
650 disk_version, default_new_version,
651 "can't overwrite the format version in an existing database:\n\
652 disk: {disk_version:?}\n\
653 running: {running_version}"
654 );
655
656 db.update_format_version_on_disk(&running_version)
657 .expect("unable to write database format version file to disk");
658
659 info!(
660 %running_version,
661 disk_version = %disk_version.map_or("None".to_string(), |version| version.to_string()),
662 "marked database format as newly created"
663 );
664 }
665
666 /// Mark the database as upgraded to `format_upgrade_version`.
667 ///
668 /// This should be called when an older database is opened by an older Zebra version,
669 /// after each version upgrade is complete.
670 ///
671 /// # Concurrency
672 ///
673 /// The version must only be updated while RocksDB is holding the database
674 /// directory lock. This prevents multiple Zebra instances corrupting the version
675 /// file.
676 ///
677 /// # Panics
678 ///
679 /// If the format should not have been upgraded, because the running version is:
680 /// - older than the disk version (that's a downgrade)
681 /// - the same as to the disk version (no upgrade needed)
682 ///
683 /// If the format should not have been upgraded, because the format upgrade version is:
684 /// - older or the same as the disk version
685 /// (multiple upgrades to the same version are not allowed)
686 /// - greater than the running version (that's a logic bug)
687 fn mark_as_upgraded_to(db: &ZebraDb, format_upgrade_version: &Version) {
688 let running_version = db.format_version_in_code();
689 let disk_version = db
690 .format_version_on_disk()
691 .expect("unable to read database format version file")
692 .expect("tried to upgrade a newly created database");
693
694 assert!(
695 running_version > disk_version,
696 "can't upgrade a database that is being opened by an older or the same Zebra version:\n\
697 disk: {disk_version}\n\
698 upgrade: {format_upgrade_version}\n\
699 running: {running_version}"
700 );
701
702 assert!(
703 format_upgrade_version > &disk_version,
704 "can't upgrade a database that has already been upgraded, or is newer:\n\
705 disk: {disk_version}\n\
706 upgrade: {format_upgrade_version}\n\
707 running: {running_version}"
708 );
709
710 assert!(
711 format_upgrade_version <= &running_version,
712 "can't upgrade to a newer version than the running Zebra version:\n\
713 disk: {disk_version}\n\
714 upgrade: {format_upgrade_version}\n\
715 running: {running_version}"
716 );
717
718 db.update_format_version_on_disk(format_upgrade_version)
719 .expect("unable to write database format version file to disk");
720
721 info!(
722 %running_version,
723 %disk_version,
724 // wait_for_state_version_upgrade() needs this to be the last field,
725 // so the regex matches correctly
726 %format_upgrade_version,
727 "marked database format as upgraded"
728 );
729 }
730
731 /// Mark the database as downgraded to the running database version.
732 /// This should be called after a newer database is opened by an older Zebra version.
733 ///
734 /// # Concurrency
735 ///
736 /// The version must only be updated while RocksDB is holding the database
737 /// directory lock. This prevents multiple Zebra instances corrupting the version
738 /// file.
739 ///
740 /// # Panics
741 ///
742 /// If the format should have been upgraded, because the running version is newer.
743 /// If the state is newly created, because the running version should be the same.
744 ///
745 /// Multiple downgrades are allowed, because they all downgrade to the same running version.
746 fn mark_as_downgraded(db: &ZebraDb) {
747 let running_version = db.format_version_in_code();
748 let disk_version = db
749 .format_version_on_disk()
750 .expect("unable to read database format version file")
751 .expect("can't downgrade a newly created database");
752
753 assert!(
754 disk_version >= running_version,
755 "can't downgrade a database that is being opened by a newer Zebra version:\n\
756 disk: {disk_version}\n\
757 running: {running_version}"
758 );
759
760 db.update_format_version_on_disk(&running_version)
761 .expect("unable to write database format version file to disk");
762
763 info!(
764 %running_version,
765 %disk_version,
766 "marked database format as downgraded"
767 );
768 }
769}
770
771impl DbFormatChangeThreadHandle {
772 /// Cancel the running format change thread, if this is the last handle.
773 /// Returns true if it was actually cancelled.
774 pub fn cancel_if_needed(&self) -> bool {
775 // # Correctness
776 //
777 // Checking the strong count has a race condition, because two handles can be dropped at
778 // the same time.
779 //
780 // If cancelling the thread is important, the owner of the handle must call force_cancel().
781 if let Some(update_task) = self.update_task.as_ref() {
782 if Arc::strong_count(update_task) <= 1 {
783 self.force_cancel();
784 return true;
785 }
786 }
787
788 false
789 }
790
791 /// Force the running format change thread to cancel, even if there are other handles.
792 pub fn force_cancel(&self) {
793 // There's nothing we can do about errors here.
794 // If the channel is disconnected, the task has exited.
795 // If it's full, it's already been cancelled.
796 let _ = self.cancel_handle.try_send(CancelFormatChange);
797 }
798
799 /// Check for panics in the code running in the spawned thread.
800 /// If the thread exited with a panic, resume that panic.
801 ///
802 /// This method should be called regularly, so that panics are detected as soon as possible.
803 pub fn check_for_panics(&mut self) {
804 self.update_task.panic_if_task_has_panicked();
805 }
806
807 /// Wait for the spawned thread to finish. If it exited with a panic, resume that panic.
808 ///
809 /// Exits early if the thread has other outstanding handles.
810 ///
811 /// This method should be called during shutdown.
812 pub fn wait_for_panics(&mut self) {
813 self.update_task.wait_for_panics();
814 }
815}
816
817impl Drop for DbFormatChangeThreadHandle {
818 fn drop(&mut self) {
819 // Only cancel the format change if the state service is shutting down.
820 if self.cancel_if_needed() {
821 self.wait_for_panics();
822 } else {
823 self.check_for_panics();
824 }
825 }
826}
827
828#[test]
829fn format_upgrades_are_in_version_order() {
830 let mut last_version = Version::new(0, 0, 0);
831 for upgrade in format_upgrades(None) {
832 assert!(upgrade.version() > last_version);
833 last_version = upgrade.version();
834 }
835}