zebrad/commands/
copy_state.rs

1//! `copy-state` subcommand - copies state from one directory to another (debug only)
2//!
3//! Copying state helps Zebra developers modify and debug cached state formats.
4//!
5//! In order to test a new state format, blocks must be identical when they are:
6//! - read from the old format,
7//! - written to the new format, and
8//! - read from the new format.
9//!
10//! The "old" and "new" states can also use the same format.
11//! This tests the low-level state API's performance.
12//!
13//! ## Command Structure
14//!
15//! Copying cached state uses the following services and tasks:
16//!
17//! Tasks:
18//!  * Old to New Copy Task
19//!    * queries the source state for blocks,
20//!      copies those blocks to the target state, then
21//!      reads the copied blocks from the target state.
22//!
23//! Services:
24//!  * Source Old State Service
25//!    * fetches blocks from the best finalized chain from permanent storage,
26//!      in the old format
27//!  * Target New State Service
28//!    * writes best finalized chain blocks to permanent storage,
29//!      in the new format
30//!    * only performs essential contextual verification of blocks,
31//!      to make sure that block data hasn't been corrupted by
32//!      receiving blocks in the new format
33//!    * fetches blocks from the best finalized chain from permanent storage,
34//!      in the new format
35
36use std::{cmp::min, path::PathBuf};
37
38use abscissa_core::{config, Command, FrameworkError};
39use color_eyre::eyre::{eyre, Report};
40use tokio::time::Instant;
41use tower::{Service, ServiceExt};
42
43use zebra_chain::{block::Height, parameters::Network};
44use zebra_state as old_zs;
45use zebra_state as new_zs;
46
47use crate::{
48    application::ZebradApp,
49    components::tokio::{RuntimeRun, TokioComponent},
50    config::ZebradConfig,
51    prelude::*,
52    BoxError,
53};
54
55/// How often we log info-level progress messages
56const PROGRESS_HEIGHT_INTERVAL: u32 = 5_000;
57
58/// copy cached chain state (expert users only)
59#[derive(Command, Debug, clap::Parser)]
60pub struct CopyStateCmd {
61    /// Source height that the copy finishes at.
62    #[clap(long, short, help = "stop copying at this source height")]
63    max_source_height: Option<u32>,
64
65    /// Path to a Zebra config.toml for the target state.
66    /// Uses an ephemeral config by default.
67    ///
68    /// Zebra only uses the state options from this config.
69    /// All other options are ignored.
70    #[clap(
71        long,
72        short,
73        help = "config file path for the target state (default: ephemeral), \
74                      the source state uses the main zebrad config"
75    )]
76    target_config_path: Option<PathBuf>,
77
78    /// Filter strings which override the config file and defaults
79    #[clap(help = "tracing filters which override the zebrad.toml config")]
80    filters: Vec<String>,
81}
82
83impl CopyStateCmd {
84    /// Configure and launch the copy command
85    async fn start(&self) -> Result<(), Report> {
86        let base_config = APPLICATION.config();
87        let source_config = base_config.state.clone();
88
89        // The default load_config impl doesn't actually modify the app config.
90        let target_config = self
91            .target_config_path
92            .as_ref()
93            .map(|path| ZebradApp::default().load_config(path))
94            .transpose()?
95            .map(|app_config| app_config.state)
96            .unwrap_or_else(new_zs::Config::ephemeral);
97
98        info!(?base_config, "state copy base config");
99
100        self.copy(&base_config.network.network, source_config, target_config)
101            .await
102            .map_err(|e| eyre!(e))
103    }
104
105    /// Initialize the source and target states,
106    /// then copy from the source to the target state.
107    async fn copy(
108        &self,
109        network: &Network,
110        source_config: old_zs::Config,
111        target_config: new_zs::Config,
112    ) -> Result<(), BoxError> {
113        info!(
114            ?source_config,
115            "initializing source state service (old format)"
116        );
117
118        let source_start_time = Instant::now();
119
120        // We're not verifying UTXOs here, so we don't need the maximum checkpoint height.
121        let (mut source_read_only_state_service, _source_db, _source_latest_non_finalized_state) =
122            old_zs::spawn_init_read_only(source_config.clone(), network).await?;
123
124        let elapsed = source_start_time.elapsed();
125        info!(?elapsed, "finished initializing source state service");
126
127        info!(
128            ?target_config, target_config_path = ?self.target_config_path,
129            "initializing target state service (new format)"
130        );
131
132        let target_start_time = Instant::now();
133        // We're not verifying UTXOs here, so we don't need the maximum checkpoint height.
134        //
135        // TODO: call Options::PrepareForBulkLoad()
136        // See "What's the fastest way to load data into RocksDB?" in
137        // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
138        let (
139            mut target_state,
140            _target_read_only_state_service,
141            _target_latest_chain_tip,
142            _target_chain_tip_change,
143        ) = new_zs::spawn_init(target_config.clone(), network, Height::MAX, 0).await?;
144
145        let elapsed = target_start_time.elapsed();
146        info!(?elapsed, "finished initializing target state service");
147
148        info!("fetching source and target tip heights");
149
150        let source_tip = source_read_only_state_service
151            .ready()
152            .await?
153            .call(old_zs::ReadRequest::Tip)
154            .await?;
155        let source_tip = match source_tip {
156            old_zs::ReadResponse::Tip(Some(source_tip)) => source_tip,
157            old_zs::ReadResponse::Tip(None) => Err("empty source state: no blocks to copy")?,
158
159            response => Err(format!("unexpected response to Tip request: {response:?}",))?,
160        };
161        let source_tip_height = source_tip.0 .0;
162
163        let initial_target_tip = target_state
164            .ready()
165            .await?
166            .call(new_zs::Request::Tip)
167            .await?;
168        let initial_target_tip = match initial_target_tip {
169            new_zs::Response::Tip(target_tip) => target_tip,
170
171            response => Err(format!("unexpected response to Tip request: {response:?}",))?,
172        };
173        let min_target_height = initial_target_tip
174            .map(|target_tip| target_tip.0 .0 + 1)
175            .unwrap_or(0);
176
177        let max_copy_height = self
178            .max_source_height
179            .map(|max_source_height| min(source_tip_height, max_source_height))
180            .unwrap_or(source_tip_height);
181
182        if min_target_height >= max_copy_height {
183            info!(
184                ?min_target_height,
185                ?max_copy_height,
186                max_source_height = ?self.max_source_height,
187                ?source_tip,
188                ?initial_target_tip,
189                "target is already at or after max copy height"
190            );
191
192            return Ok(());
193        }
194
195        info!(
196            ?min_target_height,
197            ?max_copy_height,
198            max_source_height = ?self.max_source_height,
199            ?source_tip,
200            ?initial_target_tip,
201            "starting copy from source to target"
202        );
203
204        let copy_start_time = Instant::now();
205        for height in min_target_height..=max_copy_height {
206            // Read block from source
207            let source_block = source_read_only_state_service
208                .ready()
209                .await?
210                .call(old_zs::ReadRequest::Block(Height(height).into()))
211                .await?;
212            let source_block = match source_block {
213                old_zs::ReadResponse::Block(Some(source_block)) => {
214                    trace!(?height, %source_block, "read source block");
215                    source_block
216                }
217                old_zs::ReadResponse::Block(None) => {
218                    Err(format!("unexpected missing source block, height: {height}",))?
219                }
220
221                response => Err(format!(
222                    "unexpected response to Block request, height: {height}, \n \
223                     response: {response:?}",
224                ))?,
225            };
226            let source_block_hash = source_block.hash();
227
228            // Write block to target
229            let target_block_commit_hash = target_state
230                .ready()
231                .await?
232                .call(new_zs::Request::CommitCheckpointVerifiedBlock(
233                    source_block.clone().into(),
234                ))
235                .await?;
236            let target_block_commit_hash = match target_block_commit_hash {
237                new_zs::Response::Committed(target_block_commit_hash) => {
238                    trace!(?target_block_commit_hash, "wrote target block");
239                    target_block_commit_hash
240                }
241                response => Err(format!(
242                    "unexpected response to CommitCheckpointVerifiedBlock request, height: {height}\n \
243                     response: {response:?}",
244                ))?,
245            };
246
247            // Read written block from target
248            let target_block = target_state
249                .ready()
250                .await?
251                .call(new_zs::Request::Block(Height(height).into()))
252                .await?;
253            let target_block = match target_block {
254                new_zs::Response::Block(Some(target_block)) => {
255                    trace!(?height, %target_block, "read target block");
256                    target_block
257                }
258                new_zs::Response::Block(None) => {
259                    Err(format!("unexpected missing target block, height: {height}",))?
260                }
261
262                response => Err(format!(
263                    "unexpected response to Block request, height: {height},\n \
264                     response: {response:?}",
265                ))?,
266            };
267            let target_block_data_hash = target_block.hash();
268
269            // Check for data errors
270            //
271            // These checks make sure that Zebra doesn't corrupt the block data
272            // when serializing it in the new format.
273            // Zebra currently serializes `Block` structs into bytes while writing,
274            // then deserializes bytes into new `Block` structs when reading.
275            // So these checks are sufficient to detect block data corruption.
276            //
277            // If Zebra starts reusing cached `Block` structs after writing them,
278            // we'll also need to check `Block` structs created from the actual database bytes.
279            if source_block_hash != target_block_commit_hash
280                || source_block_hash != target_block_data_hash
281                || source_block != target_block
282            {
283                Err(format!(
284                    "unexpected mismatch between source and target blocks,\n \
285                     max copy height: {max_copy_height:?},\n \
286                     source hash: {source_block_hash:?},\n \
287                     target commit hash: {target_block_commit_hash:?},\n \
288                     target data hash: {target_block_data_hash:?},\n \
289                     source block: {source_block:?},\n \
290                     target block: {target_block:?}",
291                ))?;
292            }
293
294            // Log progress
295            if height % PROGRESS_HEIGHT_INTERVAL == 0 {
296                let elapsed = copy_start_time.elapsed();
297                info!(
298                    ?height,
299                    ?max_copy_height,
300                    ?elapsed,
301                    "copied block from source to target"
302                );
303            }
304        }
305
306        let elapsed = copy_start_time.elapsed();
307        info!(?max_copy_height, ?elapsed, "finished copying blocks");
308
309        info!(?max_copy_height, "fetching final target tip");
310
311        let final_target_tip = target_state
312            .ready()
313            .await?
314            .call(new_zs::Request::Tip)
315            .await?;
316        let final_target_tip = match final_target_tip {
317            new_zs::Response::Tip(Some(target_tip)) => target_tip,
318            new_zs::Response::Tip(None) => Err("empty target state: expected written blocks")?,
319
320            response => Err(format!("unexpected response to Tip request: {response:?}",))?,
321        };
322        let final_target_tip_height = final_target_tip.0 .0;
323        let final_target_tip_hash = final_target_tip.1;
324
325        let target_tip_source_depth = source_read_only_state_service
326            .ready()
327            .await?
328            .call(old_zs::ReadRequest::Depth(final_target_tip_hash))
329            .await?;
330        let target_tip_source_depth = match target_tip_source_depth {
331            old_zs::ReadResponse::Depth(source_depth) => source_depth,
332
333            response => Err(format!(
334                "unexpected response to Depth request: {response:?}",
335            ))?,
336        };
337
338        // Check the tips match
339        //
340        // This check works because Zebra doesn't cache tip structs.
341        // (See details above.)
342        if max_copy_height == source_tip_height {
343            let expected_target_depth = Some(0);
344            if source_tip != final_target_tip || target_tip_source_depth != expected_target_depth {
345                Err(format!(
346                    "unexpected mismatch between source and target tips,\n \
347                     max copy height: {max_copy_height:?},\n \
348                     source tip: {source_tip:?},\n \
349                     target tip: {final_target_tip:?},\n \
350                     actual target tip depth in source: {target_tip_source_depth:?},\n \
351                     expect target tip depth in source: {expected_target_depth:?}",
352                ))?;
353            } else {
354                info!(
355                    ?max_copy_height,
356                    ?source_tip,
357                    ?final_target_tip,
358                    ?target_tip_source_depth,
359                    "source and target states contain the same blocks"
360                );
361            }
362        } else {
363            let expected_target_depth = source_tip_height.checked_sub(final_target_tip_height);
364            if target_tip_source_depth != expected_target_depth {
365                Err(format!(
366                    "unexpected mismatch between source and target tips,\n \
367                     max copy height: {max_copy_height:?},\n \
368                     source tip: {source_tip:?},\n \
369                     target tip: {final_target_tip:?},\n \
370                     actual target tip depth in source: {target_tip_source_depth:?},\n \
371                     expect target tip depth in source: {expected_target_depth:?}",
372                ))?;
373            } else {
374                info!(
375                    ?max_copy_height,
376                    ?source_tip,
377                    ?final_target_tip,
378                    ?target_tip_source_depth,
379                    "target state reached the max copy height"
380                );
381            }
382        }
383
384        Ok(())
385    }
386}
387
388impl Runnable for CopyStateCmd {
389    /// Start the application.
390    fn run(&self) {
391        info!(
392            max_source_height = ?self.max_source_height,
393            target_config_path = ?self.target_config_path,
394            "starting cached chain state copy"
395        );
396        let rt = APPLICATION
397            .state()
398            .components_mut()
399            .get_downcast_mut::<TokioComponent>()
400            .expect("TokioComponent should be available")
401            .rt
402            .take();
403
404        rt.expect("runtime should not already be taken")
405            .run(self.start());
406
407        info!("finished cached chain state copy");
408    }
409}
410
411impl config::Override<ZebradConfig> for CopyStateCmd {
412    // Process the given command line options, overriding settings from
413    // a configuration file using explicit flags taken from command-line
414    // arguments.
415    fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
416        if !self.filters.is_empty() {
417            config.tracing.filter = Some(self.filters.join(","));
418        }
419
420        Ok(config)
421    }
422}