zebra_state/service/non_finalized_state/
backup.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    fs::DirEntry,
4    io::{self, ErrorKind},
5    path::{Path, PathBuf},
6    sync::Arc,
7    time::Duration,
8};
9
10use hex::ToHex;
11use zebra_chain::{
12    amount::{Amount, DeferredPoolBalanceChange},
13    block::{self, Block, Height},
14    serialization::{ZcashDeserializeInto, ZcashSerialize},
15};
16
17use crate::{
18    ContextuallyVerifiedBlock, IntoDisk, NonFinalizedState, SemanticallyVerifiedBlock,
19    WatchReceiver, ZebraDb,
20};
21
22#[cfg(not(test))]
23use crate::service::write::validate_and_commit_non_finalized;
24
25/// The minimum duration that Zebra will wait between updates to the non-finalized state backup cache.
26pub(crate) const MIN_DURATION_BETWEEN_BACKUP_UPDATES: Duration = Duration::from_secs(5);
27
28/// Accepts an optional path to the non-finalized state backup directory and a handle to the database.
29///
30/// Looks for blocks above the finalized tip height in the backup directory (if a path was provided) and
31/// attempts to commit them to the non-finalized state.
32///
33/// Returns the resulting non-finalized state.
34pub(super) fn restore_backup(
35    mut non_finalized_state: NonFinalizedState,
36    backup_dir_path: &PathBuf,
37    finalized_state: &ZebraDb,
38) -> NonFinalizedState {
39    let mut store: BTreeMap<Height, Vec<SemanticallyVerifiedBlock>> = BTreeMap::new();
40
41    for block in read_non_finalized_blocks_from_backup(backup_dir_path, finalized_state) {
42        store.entry(block.height).or_default().push(block);
43    }
44
45    for (height, blocks) in store {
46        for block in blocks {
47            #[cfg(test)]
48            let commit_result = if non_finalized_state
49                .any_chain_contains(&block.block.header.previous_block_hash)
50            {
51                non_finalized_state.commit_block(block, finalized_state)
52            } else {
53                non_finalized_state.commit_new_chain(block, finalized_state)
54            };
55
56            #[cfg(not(test))]
57            let commit_result =
58                validate_and_commit_non_finalized(finalized_state, &mut non_finalized_state, block);
59
60            // Re-computes the block hash in case the hash from the filename is wrong.
61            if let Err(commit_error) = commit_result {
62                tracing::warn!(
63                    ?commit_error,
64                    ?height,
65                    "failed to commit non-finalized block from backup directory"
66                );
67            }
68        }
69    }
70
71    non_finalized_state
72}
73
74/// Updates the non-finalized state backup cache whenever the non-finalized state changes,
75/// deleting any outdated backup files and writing any blocks that are in the non-finalized
76/// state but missing in the backup cache.
77pub(super) async fn run_backup_task(
78    mut non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
79    backup_dir_path: PathBuf,
80) {
81    let err = loop {
82        let rate_limit = tokio::time::sleep(MIN_DURATION_BETWEEN_BACKUP_UPDATES);
83        let mut backup_blocks: HashMap<block::Hash, PathBuf> = {
84            let backup_dir_path = backup_dir_path.clone();
85            tokio::task::spawn_blocking(move || list_backup_dir_entries(&backup_dir_path))
86                .await
87                .expect("failed to join blocking task when reading in backup task")
88                .collect()
89        };
90
91        if let (Err(err), _) = tokio::join!(non_finalized_state_receiver.changed(), rate_limit) {
92            break err;
93        };
94
95        let latest_non_finalized_state = non_finalized_state_receiver.cloned_watch_data();
96
97        let backup_dir_path = backup_dir_path.clone();
98        tokio::task::spawn_blocking(move || {
99            for block in latest_non_finalized_state
100                .chain_iter()
101                .flat_map(|chain| chain.blocks.values())
102                // Remove blocks from `backup_blocks` that are present in the non-finalized state
103                .filter(|block| backup_blocks.remove(&block.hash).is_none())
104            {
105                // This loop will typically iterate only once, but may write multiple blocks if it misses
106                // some non-finalized state changes while waiting for I/O ops.
107                write_backup_block(&backup_dir_path, block);
108            }
109
110            // Remove any backup blocks that are not present in the non-finalized state
111            for (_, outdated_backup_block_path) in backup_blocks {
112                if let Err(delete_error) = std::fs::remove_file(outdated_backup_block_path) {
113                    tracing::warn!(?delete_error, "failed to delete backup block file");
114                }
115            }
116        })
117        .await
118        .expect("failed to join blocking task when writing in backup task");
119    };
120
121    tracing::warn!(
122        ?err,
123        "got recv error waiting on non-finalized state change, is Zebra shutting down?"
124    )
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
128struct NonFinalizedBlockBackup {
129    block: Arc<Block>,
130    deferred_pool_balance_change: Amount,
131}
132
133impl From<&ContextuallyVerifiedBlock> for NonFinalizedBlockBackup {
134    fn from(cv_block: &ContextuallyVerifiedBlock) -> Self {
135        Self {
136            block: cv_block.block.clone(),
137            deferred_pool_balance_change: cv_block.chain_value_pool_change.deferred_amount(),
138        }
139    }
140}
141
142impl NonFinalizedBlockBackup {
143    /// Encodes a [`NonFinalizedBlockBackup`] as a vector of bytes.
144    fn as_bytes(&self) -> Vec<u8> {
145        let block_bytes = self
146            .block
147            .zcash_serialize_to_vec()
148            .expect("verified block header version should be valid");
149
150        let deferred_pool_balance_change_bytes =
151            self.deferred_pool_balance_change.as_bytes().to_vec();
152
153        [deferred_pool_balance_change_bytes, block_bytes].concat()
154    }
155
156    /// Constructs a new [`NonFinalizedBlockBackup`] from a vector of bytes.
157    #[allow(clippy::unwrap_in_result)]
158    fn from_bytes(bytes: Vec<u8>) -> Result<Self, io::Error> {
159        let (deferred_pool_balance_change_bytes, block_bytes) = bytes
160            .split_at_checked(size_of::<Amount>())
161            .ok_or(io::Error::new(
162                ErrorKind::InvalidInput,
163                "input is too short",
164            ))?;
165
166        Ok(Self {
167            block: Arc::new(
168                block_bytes
169                    .zcash_deserialize_into()
170                    .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
171            ),
172            deferred_pool_balance_change: Amount::from_bytes(
173                deferred_pool_balance_change_bytes
174                    .try_into()
175                    .expect("slice from `split_at_checked()` should fit in [u8; 8]"),
176            )
177            .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
178        })
179    }
180}
181
182/// Writes a block to a file in the provided non-finalized state backup cache directory path.
183fn write_backup_block(backup_dir_path: &Path, block: &ContextuallyVerifiedBlock) {
184    let backup_block_file_name: String = block.hash.encode_hex();
185    let backup_block_file_path = backup_dir_path.join(backup_block_file_name);
186    let non_finalized_block_backup: NonFinalizedBlockBackup = block.into();
187
188    if let Err(err) = std::fs::write(
189        backup_block_file_path,
190        non_finalized_block_backup.as_bytes(),
191    ) {
192        tracing::warn!(?err, "failed to write non-finalized state backup block");
193    }
194}
195
196/// Reads blocks from the provided non-finalized state backup directory path.
197///
198/// Returns any blocks that are valid and not present in the finalized state.
199fn read_non_finalized_blocks_from_backup<'a>(
200    backup_dir_path: &PathBuf,
201    finalized_state: &'a ZebraDb,
202) -> impl Iterator<Item = SemanticallyVerifiedBlock> + 'a {
203    list_backup_dir_entries(backup_dir_path)
204        // It's okay to leave the file here, the backup task will delete it as long as
205        // the block is not added to the non-finalized state.
206        .filter(|&(block_hash, _)| !finalized_state.contains_hash(block_hash))
207        .filter_map(|(block_hash, file_path)| match std::fs::read(file_path) {
208            Ok(block_bytes) => Some((block_hash, block_bytes)),
209            Err(err) => {
210                tracing::warn!(?err, "failed to open non-finalized state backup block file");
211                None
212            }
213        })
214        .filter_map(|(expected_block_hash, backup_block_file_contents)| {
215            match NonFinalizedBlockBackup::from_bytes(backup_block_file_contents) {
216                Ok(NonFinalizedBlockBackup {
217                    block,
218                    deferred_pool_balance_change,
219                }) if block.coinbase_height().is_some() => {
220                    let block = SemanticallyVerifiedBlock::from(block)
221                        .with_deferred_pool_balance_change(Some(DeferredPoolBalanceChange::new(
222                            deferred_pool_balance_change,
223                        )));
224                    if block.hash != expected_block_hash {
225                        tracing::warn!(
226                            block_hash = ?block.hash,
227                            ?expected_block_hash,
228                            "wrong block hash in file name"
229                        );
230                    }
231                    Some(block)
232                }
233                Ok(block) => {
234                    tracing::warn!(
235                        ?block,
236                        "invalid non-finalized backup block, missing coinbase height"
237                    );
238                    None
239                }
240                Err(err) => {
241                    tracing::warn!(
242                        ?err,
243                        "failed to deserialize non-finalized backup data into block"
244                    );
245                    None
246                }
247            }
248        })
249}
250
251/// Accepts a backup directory path, opens the directory, converts its entries
252/// filenames to block hashes, and deletes any entries with invalid file names.
253///
254/// # Panics
255///
256/// If the provided path cannot be opened as a directory.
257/// See [`read_backup_dir`] for more details.
258fn list_backup_dir_entries(
259    backup_dir_path: &PathBuf,
260) -> impl Iterator<Item = (block::Hash, PathBuf)> {
261    read_backup_dir(backup_dir_path).filter_map(process_backup_dir_entry)
262}
263
264/// Accepts a backup directory path and opens the directory.
265///
266/// Returns an iterator over all [`DirEntry`]s in the directory that are successfully read.
267///
268/// # Panics
269///
270/// If the provided path cannot be opened as a directory.
271fn read_backup_dir(backup_dir_path: &PathBuf) -> impl Iterator<Item = DirEntry> {
272    std::fs::read_dir(backup_dir_path)
273        .expect("failed to read non-finalized state backup directory")
274        .filter_map(|entry| match entry {
275            Ok(entry) => Some(entry),
276            Err(io_err) => {
277                tracing::warn!(
278                    ?io_err,
279                    "failed to read DirEntry in non-finalized state backup dir"
280                );
281
282                None
283            }
284        })
285}
286
287/// Accepts a [`DirEntry`] from the non-finalized state backup directory and
288/// parses the filename into a block hash.
289///
290/// Returns the block hash and the file path if successful, or
291/// returns None and deletes the file at the entry path otherwise.
292fn process_backup_dir_entry(entry: DirEntry) -> Option<(block::Hash, PathBuf)> {
293    let delete_file = || {
294        if let Err(delete_error) = std::fs::remove_file(entry.path()) {
295            tracing::warn!(?delete_error, "failed to delete backup block file");
296        }
297    };
298
299    let block_file_name = match entry.file_name().into_string() {
300        Ok(block_hash) => block_hash,
301        Err(err) => {
302            tracing::warn!(
303                ?err,
304                "failed to convert OsString to String, attempting to delete file"
305            );
306
307            delete_file();
308            return None;
309        }
310    };
311
312    let block_hash: block::Hash = match block_file_name.parse() {
313        Ok(block_hash) => block_hash,
314        Err(err) => {
315            tracing::warn!(
316                ?err,
317                "failed to parse hex-encoded block hash from file name, attempting to delete file"
318            );
319
320            delete_file();
321            return None;
322        }
323    };
324
325    Some((block_hash, entry.path()))
326}