zebra_state/service/non_finalized_state/
backup.rs1use std::{
2 collections::{BTreeMap, HashMap},
3 fs::DirEntry,
4 io::{self, ErrorKind},
5 path::{Path, PathBuf},
6 sync::Arc,
7 time::Duration,
8};
9
10use hex::ToHex;
11use zebra_chain::{
12 amount::{Amount, DeferredPoolBalanceChange},
13 block::{self, Block, Height},
14 serialization::{ZcashDeserializeInto, ZcashSerialize},
15};
16
17use crate::{
18 ContextuallyVerifiedBlock, IntoDisk, NonFinalizedState, SemanticallyVerifiedBlock,
19 WatchReceiver, ZebraDb,
20};
21
22#[cfg(not(test))]
23use crate::service::write::validate_and_commit_non_finalized;
24
25pub(crate) const MIN_DURATION_BETWEEN_BACKUP_UPDATES: Duration = Duration::from_secs(5);
27
28pub(super) fn restore_backup(
35 mut non_finalized_state: NonFinalizedState,
36 backup_dir_path: &PathBuf,
37 finalized_state: &ZebraDb,
38) -> NonFinalizedState {
39 let mut store: BTreeMap<Height, Vec<SemanticallyVerifiedBlock>> = BTreeMap::new();
40
41 for block in read_non_finalized_blocks_from_backup(backup_dir_path, finalized_state) {
42 store.entry(block.height).or_default().push(block);
43 }
44
45 for (height, blocks) in store {
46 for block in blocks {
47 #[cfg(test)]
48 let commit_result = if non_finalized_state
49 .any_chain_contains(&block.block.header.previous_block_hash)
50 {
51 non_finalized_state.commit_block(block, finalized_state)
52 } else {
53 non_finalized_state.commit_new_chain(block, finalized_state)
54 };
55
56 #[cfg(not(test))]
57 let commit_result =
58 validate_and_commit_non_finalized(finalized_state, &mut non_finalized_state, block);
59
60 if let Err(commit_error) = commit_result {
62 tracing::warn!(
63 ?commit_error,
64 ?height,
65 "failed to commit non-finalized block from backup directory"
66 );
67 }
68 }
69 }
70
71 non_finalized_state
72}
73
74pub(super) async fn run_backup_task(
78 mut non_finalized_state_receiver: WatchReceiver<NonFinalizedState>,
79 backup_dir_path: PathBuf,
80) {
81 let err = loop {
82 let rate_limit = tokio::time::sleep(MIN_DURATION_BETWEEN_BACKUP_UPDATES);
83 let mut backup_blocks: HashMap<block::Hash, PathBuf> = {
84 let backup_dir_path = backup_dir_path.clone();
85 tokio::task::spawn_blocking(move || list_backup_dir_entries(&backup_dir_path))
86 .await
87 .expect("failed to join blocking task when reading in backup task")
88 .collect()
89 };
90
91 if let (Err(err), _) = tokio::join!(non_finalized_state_receiver.changed(), rate_limit) {
92 break err;
93 };
94
95 let latest_non_finalized_state = non_finalized_state_receiver.cloned_watch_data();
96
97 let backup_dir_path = backup_dir_path.clone();
98 tokio::task::spawn_blocking(move || {
99 for block in latest_non_finalized_state
100 .chain_iter()
101 .flat_map(|chain| chain.blocks.values())
102 .filter(|block| backup_blocks.remove(&block.hash).is_none())
104 {
105 write_backup_block(&backup_dir_path, block);
108 }
109
110 for (_, outdated_backup_block_path) in backup_blocks {
112 if let Err(delete_error) = std::fs::remove_file(outdated_backup_block_path) {
113 tracing::warn!(?delete_error, "failed to delete backup block file");
114 }
115 }
116 })
117 .await
118 .expect("failed to join blocking task when writing in backup task");
119 };
120
121 tracing::warn!(
122 ?err,
123 "got recv error waiting on non-finalized state change, is Zebra shutting down?"
124 )
125}
126
127#[derive(Clone, Debug, PartialEq, Eq)]
128struct NonFinalizedBlockBackup {
129 block: Arc<Block>,
130 deferred_pool_balance_change: Amount,
131}
132
133impl From<&ContextuallyVerifiedBlock> for NonFinalizedBlockBackup {
134 fn from(cv_block: &ContextuallyVerifiedBlock) -> Self {
135 Self {
136 block: cv_block.block.clone(),
137 deferred_pool_balance_change: cv_block.chain_value_pool_change.deferred_amount(),
138 }
139 }
140}
141
142impl NonFinalizedBlockBackup {
143 fn as_bytes(&self) -> Vec<u8> {
145 let block_bytes = self
146 .block
147 .zcash_serialize_to_vec()
148 .expect("verified block header version should be valid");
149
150 let deferred_pool_balance_change_bytes =
151 self.deferred_pool_balance_change.as_bytes().to_vec();
152
153 [deferred_pool_balance_change_bytes, block_bytes].concat()
154 }
155
156 #[allow(clippy::unwrap_in_result)]
158 fn from_bytes(bytes: Vec<u8>) -> Result<Self, io::Error> {
159 let (deferred_pool_balance_change_bytes, block_bytes) = bytes
160 .split_at_checked(size_of::<Amount>())
161 .ok_or(io::Error::new(
162 ErrorKind::InvalidInput,
163 "input is too short",
164 ))?;
165
166 Ok(Self {
167 block: Arc::new(
168 block_bytes
169 .zcash_deserialize_into()
170 .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
171 ),
172 deferred_pool_balance_change: Amount::from_bytes(
173 deferred_pool_balance_change_bytes
174 .try_into()
175 .expect("slice from `split_at_checked()` should fit in [u8; 8]"),
176 )
177 .map_err(|err| io::Error::new(ErrorKind::InvalidData, err))?,
178 })
179 }
180}
181
182fn write_backup_block(backup_dir_path: &Path, block: &ContextuallyVerifiedBlock) {
184 let backup_block_file_name: String = block.hash.encode_hex();
185 let backup_block_file_path = backup_dir_path.join(backup_block_file_name);
186 let non_finalized_block_backup: NonFinalizedBlockBackup = block.into();
187
188 if let Err(err) = std::fs::write(
189 backup_block_file_path,
190 non_finalized_block_backup.as_bytes(),
191 ) {
192 tracing::warn!(?err, "failed to write non-finalized state backup block");
193 }
194}
195
196fn read_non_finalized_blocks_from_backup<'a>(
200 backup_dir_path: &PathBuf,
201 finalized_state: &'a ZebraDb,
202) -> impl Iterator<Item = SemanticallyVerifiedBlock> + 'a {
203 list_backup_dir_entries(backup_dir_path)
204 .filter(|&(block_hash, _)| !finalized_state.contains_hash(block_hash))
207 .filter_map(|(block_hash, file_path)| match std::fs::read(file_path) {
208 Ok(block_bytes) => Some((block_hash, block_bytes)),
209 Err(err) => {
210 tracing::warn!(?err, "failed to open non-finalized state backup block file");
211 None
212 }
213 })
214 .filter_map(|(expected_block_hash, backup_block_file_contents)| {
215 match NonFinalizedBlockBackup::from_bytes(backup_block_file_contents) {
216 Ok(NonFinalizedBlockBackup {
217 block,
218 deferred_pool_balance_change,
219 }) if block.coinbase_height().is_some() => {
220 let block = SemanticallyVerifiedBlock::from(block)
221 .with_deferred_pool_balance_change(Some(DeferredPoolBalanceChange::new(
222 deferred_pool_balance_change,
223 )));
224 if block.hash != expected_block_hash {
225 tracing::warn!(
226 block_hash = ?block.hash,
227 ?expected_block_hash,
228 "wrong block hash in file name"
229 );
230 }
231 Some(block)
232 }
233 Ok(block) => {
234 tracing::warn!(
235 ?block,
236 "invalid non-finalized backup block, missing coinbase height"
237 );
238 None
239 }
240 Err(err) => {
241 tracing::warn!(
242 ?err,
243 "failed to deserialize non-finalized backup data into block"
244 );
245 None
246 }
247 }
248 })
249}
250
251fn list_backup_dir_entries(
259 backup_dir_path: &PathBuf,
260) -> impl Iterator<Item = (block::Hash, PathBuf)> {
261 read_backup_dir(backup_dir_path).filter_map(process_backup_dir_entry)
262}
263
264fn read_backup_dir(backup_dir_path: &PathBuf) -> impl Iterator<Item = DirEntry> {
272 std::fs::read_dir(backup_dir_path)
273 .expect("failed to read non-finalized state backup directory")
274 .filter_map(|entry| match entry {
275 Ok(entry) => Some(entry),
276 Err(io_err) => {
277 tracing::warn!(
278 ?io_err,
279 "failed to read DirEntry in non-finalized state backup dir"
280 );
281
282 None
283 }
284 })
285}
286
287fn process_backup_dir_entry(entry: DirEntry) -> Option<(block::Hash, PathBuf)> {
293 let delete_file = || {
294 if let Err(delete_error) = std::fs::remove_file(entry.path()) {
295 tracing::warn!(?delete_error, "failed to delete backup block file");
296 }
297 };
298
299 let block_file_name = match entry.file_name().into_string() {
300 Ok(block_hash) => block_hash,
301 Err(err) => {
302 tracing::warn!(
303 ?err,
304 "failed to convert OsString to String, attempting to delete file"
305 );
306
307 delete_file();
308 return None;
309 }
310 };
311
312 let block_hash: block::Hash = match block_file_name.parse() {
313 Ok(block_hash) => block_hash,
314 Err(err) => {
315 tracing::warn!(
316 ?err,
317 "failed to parse hex-encoded block hash from file name, attempting to delete file"
318 );
319
320 delete_file();
321 return None;
322 }
323 };
324
325 Some((block_hash, entry.path()))
326}