zebrad/commands/
copy_state.rs

1//! `copy-state` subcommand - copies state from one directory to another (debug only)
2//!
3//! Copying state helps Zebra developers modify and debug cached state formats.
4//!
5//! In order to test a new state format, blocks must be identical when they are:
6//! - read from the old format,
7//! - written to the new format, and
8//! - read from the new format.
9//!
10//! The "old" and "new" states can also use the same format.
11//! This tests the low-level state API's performance.
12//!
13//! ## Command Structure
14//!
15//! Copying cached state uses the following services and tasks:
16//!
17//! Tasks:
18//!  * Old to New Copy Task
19//!    * queries the source state for blocks,
20//!      copies those blocks to the target state, then
21//!      reads the copied blocks from the target state.
22//!
23//! Services:
24//!  * Source Old State Service
25//!    * fetches blocks from the best finalized chain from permanent storage,
26//!      in the old format
27//!  * Target New State Service
28//!    * writes best finalized chain blocks to permanent storage,
29//!      in the new format
30//!    * only performs essential contextual verification of blocks,
31//!      to make sure that block data hasn't been corrupted by
32//!      receiving blocks in the new format
33//!    * fetches blocks from the best finalized chain from permanent storage,
34//!      in the new format
35
36use std::{cmp::min, path::PathBuf};
37
38use abscissa_core::{config, Command, FrameworkError};
39use color_eyre::eyre::{eyre, Report};
40use tokio::time::Instant;
41use tower::{Service, ServiceExt};
42
43use zebra_chain::{block::Height, parameters::Network};
44use zebra_state as old_zs;
45use zebra_state as new_zs;
46
47use crate::{
48    components::tokio::{RuntimeRun, TokioComponent},
49    config::ZebradConfig,
50    prelude::*,
51    BoxError,
52};
53
54/// How often we log info-level progress messages
55const PROGRESS_HEIGHT_INTERVAL: u32 = 5_000;
56
57/// copy cached chain state (expert users only)
58#[derive(Command, Debug, clap::Parser)]
59pub struct CopyStateCmd {
60    /// Source height that the copy finishes at.
61    #[clap(long, short, help = "stop copying at this source height")]
62    max_source_height: Option<u32>,
63
64    /// Path to a Zebra config.toml for the target state.
65    /// Uses an ephemeral config by default.
66    ///
67    /// Zebra only uses the state options from this config.
68    /// All other options are ignored.
69    #[clap(
70        long,
71        short,
72        help = "config file path for the target state (default: ephemeral), \
73                      the source state uses the main zebrad config"
74    )]
75    target_config_path: Option<PathBuf>,
76
77    /// Filter strings which override the config file and defaults
78    #[clap(help = "tracing filters which override the zebrad.toml config")]
79    filters: Vec<String>,
80}
81
82impl CopyStateCmd {
83    /// Configure and launch the copy command
84    async fn start(&self) -> Result<(), Report> {
85        let base_config = APPLICATION.config();
86        let source_config = base_config.state.clone();
87
88        // Load the target config if a target config path was provided, or use an ephemeral config otherwise.
89        //
90        // Use the `ZEBRA_TARGET_` environment prefix for target overrides to avoid
91        // conflicting with the source/base config (`ZEBRA_...`).
92        // Example: `ZEBRA_TARGET_STATE__CACHE_DIR=/dst/cache`.
93        let target_config = self
94            .target_config_path
95            .as_ref()
96            .map(|path| {
97                crate::config::ZebradConfig::load_with_env(Some(path.clone()), "ZEBRA_TARGET")
98            })
99            .transpose()?
100            .map(|app_config| app_config.state)
101            .unwrap_or_else(new_zs::Config::ephemeral);
102
103        info!(?base_config, "state copy base config");
104
105        self.copy(&base_config.network.network, source_config, target_config)
106            .await
107            .map_err(|e| eyre!(e))
108    }
109
110    /// Initialize the source and target states,
111    /// then copy from the source to the target state.
112    async fn copy(
113        &self,
114        network: &Network,
115        source_config: old_zs::Config,
116        target_config: new_zs::Config,
117    ) -> Result<(), BoxError> {
118        info!(
119            ?source_config,
120            "initializing source state service (old format)"
121        );
122
123        let source_start_time = Instant::now();
124
125        // We're not verifying UTXOs here, so we don't need the maximum checkpoint height.
126        let (mut source_read_only_state_service, _source_db, _source_latest_non_finalized_state) =
127            old_zs::spawn_init_read_only(source_config.clone(), network).await?;
128
129        let elapsed = source_start_time.elapsed();
130        info!(?elapsed, "finished initializing source state service");
131
132        info!(
133            ?target_config, target_config_path = ?self.target_config_path,
134            "initializing target state service (new format)"
135        );
136
137        let target_start_time = Instant::now();
138        // We're not verifying UTXOs here, so we don't need the maximum checkpoint height.
139        //
140        // TODO: call Options::PrepareForBulkLoad()
141        // See "What's the fastest way to load data into RocksDB?" in
142        // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
143        let (
144            mut target_state,
145            _target_read_only_state_service,
146            _target_latest_chain_tip,
147            _target_chain_tip_change,
148        ) = new_zs::spawn_init(target_config.clone(), network, Height::MAX, 0).await?;
149
150        let elapsed = target_start_time.elapsed();
151        info!(?elapsed, "finished initializing target state service");
152
153        info!("fetching source and target tip heights");
154
155        let source_tip = source_read_only_state_service
156            .ready()
157            .await?
158            .call(old_zs::ReadRequest::Tip)
159            .await?;
160        let source_tip = match source_tip {
161            old_zs::ReadResponse::Tip(Some(source_tip)) => source_tip,
162            old_zs::ReadResponse::Tip(None) => Err("empty source state: no blocks to copy")?,
163
164            response => Err(format!("unexpected response to Tip request: {response:?}",))?,
165        };
166        let source_tip_height = source_tip.0 .0;
167
168        let initial_target_tip = target_state
169            .ready()
170            .await?
171            .call(new_zs::Request::Tip)
172            .await?;
173        let initial_target_tip = match initial_target_tip {
174            new_zs::Response::Tip(target_tip) => target_tip,
175
176            response => Err(format!("unexpected response to Tip request: {response:?}",))?,
177        };
178        let min_target_height = initial_target_tip
179            .map(|target_tip| target_tip.0 .0 + 1)
180            .unwrap_or(0);
181
182        let max_copy_height = self
183            .max_source_height
184            .map(|max_source_height| min(source_tip_height, max_source_height))
185            .unwrap_or(source_tip_height);
186
187        if min_target_height >= max_copy_height {
188            info!(
189                ?min_target_height,
190                ?max_copy_height,
191                max_source_height = ?self.max_source_height,
192                ?source_tip,
193                ?initial_target_tip,
194                "target is already at or after max copy height"
195            );
196
197            return Ok(());
198        }
199
200        info!(
201            ?min_target_height,
202            ?max_copy_height,
203            max_source_height = ?self.max_source_height,
204            ?source_tip,
205            ?initial_target_tip,
206            "starting copy from source to target"
207        );
208
209        let copy_start_time = Instant::now();
210        for height in min_target_height..=max_copy_height {
211            // Read block from source
212            let source_block = source_read_only_state_service
213                .ready()
214                .await?
215                .call(old_zs::ReadRequest::Block(Height(height).into()))
216                .await?;
217            let source_block = match source_block {
218                old_zs::ReadResponse::Block(Some(source_block)) => {
219                    trace!(?height, %source_block, "read source block");
220                    source_block
221                }
222                old_zs::ReadResponse::Block(None) => {
223                    Err(format!("unexpected missing source block, height: {height}",))?
224                }
225
226                response => Err(format!(
227                    "unexpected response to Block request, height: {height}, \n \
228                     response: {response:?}",
229                ))?,
230            };
231            let source_block_hash = source_block.hash();
232
233            // Write block to target
234            let target_block_commit_hash = target_state
235                .ready()
236                .await?
237                .call(new_zs::Request::CommitCheckpointVerifiedBlock(
238                    source_block.clone().into(),
239                ))
240                .await?;
241            let target_block_commit_hash = match target_block_commit_hash {
242                new_zs::Response::Committed(target_block_commit_hash) => {
243                    trace!(?target_block_commit_hash, "wrote target block");
244                    target_block_commit_hash
245                }
246                response => Err(format!(
247                    "unexpected response to CommitCheckpointVerifiedBlock request, height: {height}\n \
248                     response: {response:?}",
249                ))?,
250            };
251
252            // Read written block from target
253            let target_block = target_state
254                .ready()
255                .await?
256                .call(new_zs::Request::Block(Height(height).into()))
257                .await?;
258            let target_block = match target_block {
259                new_zs::Response::Block(Some(target_block)) => {
260                    trace!(?height, %target_block, "read target block");
261                    target_block
262                }
263                new_zs::Response::Block(None) => {
264                    Err(format!("unexpected missing target block, height: {height}",))?
265                }
266
267                response => Err(format!(
268                    "unexpected response to Block request, height: {height},\n \
269                     response: {response:?}",
270                ))?,
271            };
272            let target_block_data_hash = target_block.hash();
273
274            // Check for data errors
275            //
276            // These checks make sure that Zebra doesn't corrupt the block data
277            // when serializing it in the new format.
278            // Zebra currently serializes `Block` structs into bytes while writing,
279            // then deserializes bytes into new `Block` structs when reading.
280            // So these checks are sufficient to detect block data corruption.
281            //
282            // If Zebra starts reusing cached `Block` structs after writing them,
283            // we'll also need to check `Block` structs created from the actual database bytes.
284            if source_block_hash != target_block_commit_hash
285                || source_block_hash != target_block_data_hash
286                || source_block != target_block
287            {
288                Err(format!(
289                    "unexpected mismatch between source and target blocks,\n \
290                     max copy height: {max_copy_height:?},\n \
291                     source hash: {source_block_hash:?},\n \
292                     target commit hash: {target_block_commit_hash:?},\n \
293                     target data hash: {target_block_data_hash:?},\n \
294                     source block: {source_block:?},\n \
295                     target block: {target_block:?}",
296                ))?;
297            }
298
299            // Log progress
300            if height % PROGRESS_HEIGHT_INTERVAL == 0 {
301                let elapsed = copy_start_time.elapsed();
302                info!(
303                    ?height,
304                    ?max_copy_height,
305                    ?elapsed,
306                    "copied block from source to target"
307                );
308            }
309        }
310
311        let elapsed = copy_start_time.elapsed();
312        info!(?max_copy_height, ?elapsed, "finished copying blocks");
313
314        info!(?max_copy_height, "fetching final target tip");
315
316        let final_target_tip = target_state
317            .ready()
318            .await?
319            .call(new_zs::Request::Tip)
320            .await?;
321        let final_target_tip = match final_target_tip {
322            new_zs::Response::Tip(Some(target_tip)) => target_tip,
323            new_zs::Response::Tip(None) => Err("empty target state: expected written blocks")?,
324
325            response => Err(format!("unexpected response to Tip request: {response:?}",))?,
326        };
327        let final_target_tip_height = final_target_tip.0 .0;
328        let final_target_tip_hash = final_target_tip.1;
329
330        let target_tip_source_depth = source_read_only_state_service
331            .ready()
332            .await?
333            .call(old_zs::ReadRequest::Depth(final_target_tip_hash))
334            .await?;
335        let target_tip_source_depth = match target_tip_source_depth {
336            old_zs::ReadResponse::Depth(source_depth) => source_depth,
337
338            response => Err(format!(
339                "unexpected response to Depth request: {response:?}",
340            ))?,
341        };
342
343        // Check the tips match
344        //
345        // This check works because Zebra doesn't cache tip structs.
346        // (See details above.)
347        if max_copy_height == source_tip_height {
348            let expected_target_depth = Some(0);
349            if source_tip != final_target_tip || target_tip_source_depth != expected_target_depth {
350                Err(format!(
351                    "unexpected mismatch between source and target tips,\n \
352                     max copy height: {max_copy_height:?},\n \
353                     source tip: {source_tip:?},\n \
354                     target tip: {final_target_tip:?},\n \
355                     actual target tip depth in source: {target_tip_source_depth:?},\n \
356                     expect target tip depth in source: {expected_target_depth:?}",
357                ))?;
358            } else {
359                info!(
360                    ?max_copy_height,
361                    ?source_tip,
362                    ?final_target_tip,
363                    ?target_tip_source_depth,
364                    "source and target states contain the same blocks"
365                );
366            }
367        } else {
368            let expected_target_depth = source_tip_height.checked_sub(final_target_tip_height);
369            if target_tip_source_depth != expected_target_depth {
370                Err(format!(
371                    "unexpected mismatch between source and target tips,\n \
372                     max copy height: {max_copy_height:?},\n \
373                     source tip: {source_tip:?},\n \
374                     target tip: {final_target_tip:?},\n \
375                     actual target tip depth in source: {target_tip_source_depth:?},\n \
376                     expect target tip depth in source: {expected_target_depth:?}",
377                ))?;
378            } else {
379                info!(
380                    ?max_copy_height,
381                    ?source_tip,
382                    ?final_target_tip,
383                    ?target_tip_source_depth,
384                    "target state reached the max copy height"
385                );
386            }
387        }
388
389        Ok(())
390    }
391}
392
393impl Runnable for CopyStateCmd {
394    /// Start the application.
395    fn run(&self) {
396        info!(
397            max_source_height = ?self.max_source_height,
398            target_config_path = ?self.target_config_path,
399            "starting cached chain state copy"
400        );
401        let rt = APPLICATION
402            .state()
403            .components_mut()
404            .get_downcast_mut::<TokioComponent>()
405            .expect("TokioComponent should be available")
406            .rt
407            .take();
408
409        rt.expect("runtime should not already be taken")
410            .run(self.start());
411
412        info!("finished cached chain state copy");
413    }
414}
415
416impl config::Override<ZebradConfig> for CopyStateCmd {
417    // Process the given command line options, overriding settings from
418    // a configuration file using explicit flags taken from command-line
419    // arguments.
420    fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
421        if !self.filters.is_empty() {
422            config.tracing.filter = Some(self.filters.join(","));
423        }
424
425        Ok(config)
426    }
427}