1use std::{cmp::min, path::PathBuf};
37
38use abscissa_core::{config, Command, FrameworkError};
39use color_eyre::eyre::{eyre, Report};
40use tokio::time::Instant;
41use tower::{Service, ServiceExt};
42
43use zebra_chain::{block::Height, parameters::Network};
44use zebra_state as old_zs;
45use zebra_state as new_zs;
46
47use crate::{
48 components::tokio::{RuntimeRun, TokioComponent},
49 config::ZebradConfig,
50 prelude::*,
51 BoxError,
52};
53
54const PROGRESS_HEIGHT_INTERVAL: u32 = 5_000;
56
57#[derive(Command, Debug, clap::Parser)]
59pub struct CopyStateCmd {
60 #[clap(long, short, help = "stop copying at this source height")]
62 max_source_height: Option<u32>,
63
64 #[clap(
70 long,
71 short,
72 help = "config file path for the target state (default: ephemeral), \
73 the source state uses the main zebrad config"
74 )]
75 target_config_path: Option<PathBuf>,
76
77 #[clap(help = "tracing filters which override the zebrad.toml config")]
79 filters: Vec<String>,
80}
81
82impl CopyStateCmd {
83 async fn start(&self) -> Result<(), Report> {
85 let base_config = APPLICATION.config();
86 let source_config = base_config.state.clone();
87
88 let target_config = self
94 .target_config_path
95 .as_ref()
96 .map(|path| {
97 crate::config::ZebradConfig::load_with_env(Some(path.clone()), "ZEBRA_TARGET")
98 })
99 .transpose()?
100 .map(|app_config| app_config.state)
101 .unwrap_or_else(new_zs::Config::ephemeral);
102
103 info!(?base_config, "state copy base config");
104
105 self.copy(&base_config.network.network, source_config, target_config)
106 .await
107 .map_err(|e| eyre!(e))
108 }
109
110 async fn copy(
113 &self,
114 network: &Network,
115 source_config: old_zs::Config,
116 target_config: new_zs::Config,
117 ) -> Result<(), BoxError> {
118 info!(
119 ?source_config,
120 "initializing source state service (old format)"
121 );
122
123 let source_start_time = Instant::now();
124
125 let (mut source_read_only_state_service, _source_db, _source_latest_non_finalized_state) =
127 old_zs::spawn_init_read_only(source_config.clone(), network).await?;
128
129 let elapsed = source_start_time.elapsed();
130 info!(?elapsed, "finished initializing source state service");
131
132 info!(
133 ?target_config, target_config_path = ?self.target_config_path,
134 "initializing target state service (new format)"
135 );
136
137 let target_start_time = Instant::now();
138 let (
144 mut target_state,
145 _target_read_only_state_service,
146 _target_latest_chain_tip,
147 _target_chain_tip_change,
148 ) = new_zs::spawn_init(target_config.clone(), network, Height::MAX, 0).await?;
149
150 let elapsed = target_start_time.elapsed();
151 info!(?elapsed, "finished initializing target state service");
152
153 info!("fetching source and target tip heights");
154
155 let source_tip = source_read_only_state_service
156 .ready()
157 .await?
158 .call(old_zs::ReadRequest::Tip)
159 .await?;
160 let source_tip = match source_tip {
161 old_zs::ReadResponse::Tip(Some(source_tip)) => source_tip,
162 old_zs::ReadResponse::Tip(None) => Err("empty source state: no blocks to copy")?,
163
164 response => Err(format!("unexpected response to Tip request: {response:?}",))?,
165 };
166 let source_tip_height = source_tip.0 .0;
167
168 let initial_target_tip = target_state
169 .ready()
170 .await?
171 .call(new_zs::Request::Tip)
172 .await?;
173 let initial_target_tip = match initial_target_tip {
174 new_zs::Response::Tip(target_tip) => target_tip,
175
176 response => Err(format!("unexpected response to Tip request: {response:?}",))?,
177 };
178 let min_target_height = initial_target_tip
179 .map(|target_tip| target_tip.0 .0 + 1)
180 .unwrap_or(0);
181
182 let max_copy_height = self
183 .max_source_height
184 .map(|max_source_height| min(source_tip_height, max_source_height))
185 .unwrap_or(source_tip_height);
186
187 if min_target_height >= max_copy_height {
188 info!(
189 ?min_target_height,
190 ?max_copy_height,
191 max_source_height = ?self.max_source_height,
192 ?source_tip,
193 ?initial_target_tip,
194 "target is already at or after max copy height"
195 );
196
197 return Ok(());
198 }
199
200 info!(
201 ?min_target_height,
202 ?max_copy_height,
203 max_source_height = ?self.max_source_height,
204 ?source_tip,
205 ?initial_target_tip,
206 "starting copy from source to target"
207 );
208
209 let copy_start_time = Instant::now();
210 for height in min_target_height..=max_copy_height {
211 let source_block = source_read_only_state_service
213 .ready()
214 .await?
215 .call(old_zs::ReadRequest::Block(Height(height).into()))
216 .await?;
217 let source_block = match source_block {
218 old_zs::ReadResponse::Block(Some(source_block)) => {
219 trace!(?height, %source_block, "read source block");
220 source_block
221 }
222 old_zs::ReadResponse::Block(None) => {
223 Err(format!("unexpected missing source block, height: {height}",))?
224 }
225
226 response => Err(format!(
227 "unexpected response to Block request, height: {height}, \n \
228 response: {response:?}",
229 ))?,
230 };
231 let source_block_hash = source_block.hash();
232
233 let target_block_commit_hash = target_state
235 .ready()
236 .await?
237 .call(new_zs::Request::CommitCheckpointVerifiedBlock(
238 source_block.clone().into(),
239 ))
240 .await?;
241 let target_block_commit_hash = match target_block_commit_hash {
242 new_zs::Response::Committed(target_block_commit_hash) => {
243 trace!(?target_block_commit_hash, "wrote target block");
244 target_block_commit_hash
245 }
246 response => Err(format!(
247 "unexpected response to CommitCheckpointVerifiedBlock request, height: {height}\n \
248 response: {response:?}",
249 ))?,
250 };
251
252 let target_block = target_state
254 .ready()
255 .await?
256 .call(new_zs::Request::Block(Height(height).into()))
257 .await?;
258 let target_block = match target_block {
259 new_zs::Response::Block(Some(target_block)) => {
260 trace!(?height, %target_block, "read target block");
261 target_block
262 }
263 new_zs::Response::Block(None) => {
264 Err(format!("unexpected missing target block, height: {height}",))?
265 }
266
267 response => Err(format!(
268 "unexpected response to Block request, height: {height},\n \
269 response: {response:?}",
270 ))?,
271 };
272 let target_block_data_hash = target_block.hash();
273
274 if source_block_hash != target_block_commit_hash
285 || source_block_hash != target_block_data_hash
286 || source_block != target_block
287 {
288 Err(format!(
289 "unexpected mismatch between source and target blocks,\n \
290 max copy height: {max_copy_height:?},\n \
291 source hash: {source_block_hash:?},\n \
292 target commit hash: {target_block_commit_hash:?},\n \
293 target data hash: {target_block_data_hash:?},\n \
294 source block: {source_block:?},\n \
295 target block: {target_block:?}",
296 ))?;
297 }
298
299 if height % PROGRESS_HEIGHT_INTERVAL == 0 {
301 let elapsed = copy_start_time.elapsed();
302 info!(
303 ?height,
304 ?max_copy_height,
305 ?elapsed,
306 "copied block from source to target"
307 );
308 }
309 }
310
311 let elapsed = copy_start_time.elapsed();
312 info!(?max_copy_height, ?elapsed, "finished copying blocks");
313
314 info!(?max_copy_height, "fetching final target tip");
315
316 let final_target_tip = target_state
317 .ready()
318 .await?
319 .call(new_zs::Request::Tip)
320 .await?;
321 let final_target_tip = match final_target_tip {
322 new_zs::Response::Tip(Some(target_tip)) => target_tip,
323 new_zs::Response::Tip(None) => Err("empty target state: expected written blocks")?,
324
325 response => Err(format!("unexpected response to Tip request: {response:?}",))?,
326 };
327 let final_target_tip_height = final_target_tip.0 .0;
328 let final_target_tip_hash = final_target_tip.1;
329
330 let target_tip_source_depth = source_read_only_state_service
331 .ready()
332 .await?
333 .call(old_zs::ReadRequest::Depth(final_target_tip_hash))
334 .await?;
335 let target_tip_source_depth = match target_tip_source_depth {
336 old_zs::ReadResponse::Depth(source_depth) => source_depth,
337
338 response => Err(format!(
339 "unexpected response to Depth request: {response:?}",
340 ))?,
341 };
342
343 if max_copy_height == source_tip_height {
348 let expected_target_depth = Some(0);
349 if source_tip != final_target_tip || target_tip_source_depth != expected_target_depth {
350 Err(format!(
351 "unexpected mismatch between source and target tips,\n \
352 max copy height: {max_copy_height:?},\n \
353 source tip: {source_tip:?},\n \
354 target tip: {final_target_tip:?},\n \
355 actual target tip depth in source: {target_tip_source_depth:?},\n \
356 expect target tip depth in source: {expected_target_depth:?}",
357 ))?;
358 } else {
359 info!(
360 ?max_copy_height,
361 ?source_tip,
362 ?final_target_tip,
363 ?target_tip_source_depth,
364 "source and target states contain the same blocks"
365 );
366 }
367 } else {
368 let expected_target_depth = source_tip_height.checked_sub(final_target_tip_height);
369 if target_tip_source_depth != expected_target_depth {
370 Err(format!(
371 "unexpected mismatch between source and target tips,\n \
372 max copy height: {max_copy_height:?},\n \
373 source tip: {source_tip:?},\n \
374 target tip: {final_target_tip:?},\n \
375 actual target tip depth in source: {target_tip_source_depth:?},\n \
376 expect target tip depth in source: {expected_target_depth:?}",
377 ))?;
378 } else {
379 info!(
380 ?max_copy_height,
381 ?source_tip,
382 ?final_target_tip,
383 ?target_tip_source_depth,
384 "target state reached the max copy height"
385 );
386 }
387 }
388
389 Ok(())
390 }
391}
392
393impl Runnable for CopyStateCmd {
394 fn run(&self) {
396 info!(
397 max_source_height = ?self.max_source_height,
398 target_config_path = ?self.target_config_path,
399 "starting cached chain state copy"
400 );
401 let rt = APPLICATION
402 .state()
403 .components_mut()
404 .get_downcast_mut::<TokioComponent>()
405 .expect("TokioComponent should be available")
406 .rt
407 .take();
408
409 rt.expect("runtime should not already be taken")
410 .run(self.start());
411
412 info!("finished cached chain state copy");
413 }
414}
415
416impl config::Override<ZebradConfig> for CopyStateCmd {
417 fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
421 if !self.filters.is_empty() {
422 config.tracing.filter = Some(self.filters.join(","));
423 }
424
425 Ok(config)
426 }
427}