1use std::{cmp::min, path::PathBuf};
37
38use abscissa_core::{config, Command, FrameworkError};
39use color_eyre::eyre::{eyre, Report};
40use tokio::time::Instant;
41use tower::{Service, ServiceExt};
42
43use zebra_chain::{block::Height, parameters::Network};
44use zebra_state as old_zs;
45use zebra_state as new_zs;
46
47use crate::{
48 application::ZebradApp,
49 components::tokio::{RuntimeRun, TokioComponent},
50 config::ZebradConfig,
51 prelude::*,
52 BoxError,
53};
54
55const PROGRESS_HEIGHT_INTERVAL: u32 = 5_000;
57
58#[derive(Command, Debug, clap::Parser)]
60pub struct CopyStateCmd {
61 #[clap(long, short, help = "stop copying at this source height")]
63 max_source_height: Option<u32>,
64
65 #[clap(
71 long,
72 short,
73 help = "config file path for the target state (default: ephemeral), \
74 the source state uses the main zebrad config"
75 )]
76 target_config_path: Option<PathBuf>,
77
78 #[clap(help = "tracing filters which override the zebrad.toml config")]
80 filters: Vec<String>,
81}
82
83impl CopyStateCmd {
84 async fn start(&self) -> Result<(), Report> {
86 let base_config = APPLICATION.config();
87 let source_config = base_config.state.clone();
88
89 let target_config = self
91 .target_config_path
92 .as_ref()
93 .map(|path| ZebradApp::default().load_config(path))
94 .transpose()?
95 .map(|app_config| app_config.state)
96 .unwrap_or_else(new_zs::Config::ephemeral);
97
98 info!(?base_config, "state copy base config");
99
100 self.copy(&base_config.network.network, source_config, target_config)
101 .await
102 .map_err(|e| eyre!(e))
103 }
104
105 async fn copy(
108 &self,
109 network: &Network,
110 source_config: old_zs::Config,
111 target_config: new_zs::Config,
112 ) -> Result<(), BoxError> {
113 info!(
114 ?source_config,
115 "initializing source state service (old format)"
116 );
117
118 let source_start_time = Instant::now();
119
120 let (mut source_read_only_state_service, _source_db, _source_latest_non_finalized_state) =
122 old_zs::spawn_init_read_only(source_config.clone(), network).await?;
123
124 let elapsed = source_start_time.elapsed();
125 info!(?elapsed, "finished initializing source state service");
126
127 info!(
128 ?target_config, target_config_path = ?self.target_config_path,
129 "initializing target state service (new format)"
130 );
131
132 let target_start_time = Instant::now();
133 let (
139 mut target_state,
140 _target_read_only_state_service,
141 _target_latest_chain_tip,
142 _target_chain_tip_change,
143 ) = new_zs::spawn_init(target_config.clone(), network, Height::MAX, 0).await?;
144
145 let elapsed = target_start_time.elapsed();
146 info!(?elapsed, "finished initializing target state service");
147
148 info!("fetching source and target tip heights");
149
150 let source_tip = source_read_only_state_service
151 .ready()
152 .await?
153 .call(old_zs::ReadRequest::Tip)
154 .await?;
155 let source_tip = match source_tip {
156 old_zs::ReadResponse::Tip(Some(source_tip)) => source_tip,
157 old_zs::ReadResponse::Tip(None) => Err("empty source state: no blocks to copy")?,
158
159 response => Err(format!("unexpected response to Tip request: {response:?}",))?,
160 };
161 let source_tip_height = source_tip.0 .0;
162
163 let initial_target_tip = target_state
164 .ready()
165 .await?
166 .call(new_zs::Request::Tip)
167 .await?;
168 let initial_target_tip = match initial_target_tip {
169 new_zs::Response::Tip(target_tip) => target_tip,
170
171 response => Err(format!("unexpected response to Tip request: {response:?}",))?,
172 };
173 let min_target_height = initial_target_tip
174 .map(|target_tip| target_tip.0 .0 + 1)
175 .unwrap_or(0);
176
177 let max_copy_height = self
178 .max_source_height
179 .map(|max_source_height| min(source_tip_height, max_source_height))
180 .unwrap_or(source_tip_height);
181
182 if min_target_height >= max_copy_height {
183 info!(
184 ?min_target_height,
185 ?max_copy_height,
186 max_source_height = ?self.max_source_height,
187 ?source_tip,
188 ?initial_target_tip,
189 "target is already at or after max copy height"
190 );
191
192 return Ok(());
193 }
194
195 info!(
196 ?min_target_height,
197 ?max_copy_height,
198 max_source_height = ?self.max_source_height,
199 ?source_tip,
200 ?initial_target_tip,
201 "starting copy from source to target"
202 );
203
204 let copy_start_time = Instant::now();
205 for height in min_target_height..=max_copy_height {
206 let source_block = source_read_only_state_service
208 .ready()
209 .await?
210 .call(old_zs::ReadRequest::Block(Height(height).into()))
211 .await?;
212 let source_block = match source_block {
213 old_zs::ReadResponse::Block(Some(source_block)) => {
214 trace!(?height, %source_block, "read source block");
215 source_block
216 }
217 old_zs::ReadResponse::Block(None) => {
218 Err(format!("unexpected missing source block, height: {height}",))?
219 }
220
221 response => Err(format!(
222 "unexpected response to Block request, height: {height}, \n \
223 response: {response:?}",
224 ))?,
225 };
226 let source_block_hash = source_block.hash();
227
228 let target_block_commit_hash = target_state
230 .ready()
231 .await?
232 .call(new_zs::Request::CommitCheckpointVerifiedBlock(
233 source_block.clone().into(),
234 ))
235 .await?;
236 let target_block_commit_hash = match target_block_commit_hash {
237 new_zs::Response::Committed(target_block_commit_hash) => {
238 trace!(?target_block_commit_hash, "wrote target block");
239 target_block_commit_hash
240 }
241 response => Err(format!(
242 "unexpected response to CommitCheckpointVerifiedBlock request, height: {height}\n \
243 response: {response:?}",
244 ))?,
245 };
246
247 let target_block = target_state
249 .ready()
250 .await?
251 .call(new_zs::Request::Block(Height(height).into()))
252 .await?;
253 let target_block = match target_block {
254 new_zs::Response::Block(Some(target_block)) => {
255 trace!(?height, %target_block, "read target block");
256 target_block
257 }
258 new_zs::Response::Block(None) => {
259 Err(format!("unexpected missing target block, height: {height}",))?
260 }
261
262 response => Err(format!(
263 "unexpected response to Block request, height: {height},\n \
264 response: {response:?}",
265 ))?,
266 };
267 let target_block_data_hash = target_block.hash();
268
269 if source_block_hash != target_block_commit_hash
280 || source_block_hash != target_block_data_hash
281 || source_block != target_block
282 {
283 Err(format!(
284 "unexpected mismatch between source and target blocks,\n \
285 max copy height: {max_copy_height:?},\n \
286 source hash: {source_block_hash:?},\n \
287 target commit hash: {target_block_commit_hash:?},\n \
288 target data hash: {target_block_data_hash:?},\n \
289 source block: {source_block:?},\n \
290 target block: {target_block:?}",
291 ))?;
292 }
293
294 if height % PROGRESS_HEIGHT_INTERVAL == 0 {
296 let elapsed = copy_start_time.elapsed();
297 info!(
298 ?height,
299 ?max_copy_height,
300 ?elapsed,
301 "copied block from source to target"
302 );
303 }
304 }
305
306 let elapsed = copy_start_time.elapsed();
307 info!(?max_copy_height, ?elapsed, "finished copying blocks");
308
309 info!(?max_copy_height, "fetching final target tip");
310
311 let final_target_tip = target_state
312 .ready()
313 .await?
314 .call(new_zs::Request::Tip)
315 .await?;
316 let final_target_tip = match final_target_tip {
317 new_zs::Response::Tip(Some(target_tip)) => target_tip,
318 new_zs::Response::Tip(None) => Err("empty target state: expected written blocks")?,
319
320 response => Err(format!("unexpected response to Tip request: {response:?}",))?,
321 };
322 let final_target_tip_height = final_target_tip.0 .0;
323 let final_target_tip_hash = final_target_tip.1;
324
325 let target_tip_source_depth = source_read_only_state_service
326 .ready()
327 .await?
328 .call(old_zs::ReadRequest::Depth(final_target_tip_hash))
329 .await?;
330 let target_tip_source_depth = match target_tip_source_depth {
331 old_zs::ReadResponse::Depth(source_depth) => source_depth,
332
333 response => Err(format!(
334 "unexpected response to Depth request: {response:?}",
335 ))?,
336 };
337
338 if max_copy_height == source_tip_height {
343 let expected_target_depth = Some(0);
344 if source_tip != final_target_tip || target_tip_source_depth != expected_target_depth {
345 Err(format!(
346 "unexpected mismatch between source and target tips,\n \
347 max copy height: {max_copy_height:?},\n \
348 source tip: {source_tip:?},\n \
349 target tip: {final_target_tip:?},\n \
350 actual target tip depth in source: {target_tip_source_depth:?},\n \
351 expect target tip depth in source: {expected_target_depth:?}",
352 ))?;
353 } else {
354 info!(
355 ?max_copy_height,
356 ?source_tip,
357 ?final_target_tip,
358 ?target_tip_source_depth,
359 "source and target states contain the same blocks"
360 );
361 }
362 } else {
363 let expected_target_depth = source_tip_height.checked_sub(final_target_tip_height);
364 if target_tip_source_depth != expected_target_depth {
365 Err(format!(
366 "unexpected mismatch between source and target tips,\n \
367 max copy height: {max_copy_height:?},\n \
368 source tip: {source_tip:?},\n \
369 target tip: {final_target_tip:?},\n \
370 actual target tip depth in source: {target_tip_source_depth:?},\n \
371 expect target tip depth in source: {expected_target_depth:?}",
372 ))?;
373 } else {
374 info!(
375 ?max_copy_height,
376 ?source_tip,
377 ?final_target_tip,
378 ?target_tip_source_depth,
379 "target state reached the max copy height"
380 );
381 }
382 }
383
384 Ok(())
385 }
386}
387
388impl Runnable for CopyStateCmd {
389 fn run(&self) {
391 info!(
392 max_source_height = ?self.max_source_height,
393 target_config_path = ?self.target_config_path,
394 "starting cached chain state copy"
395 );
396 let rt = APPLICATION
397 .state()
398 .components_mut()
399 .get_downcast_mut::<TokioComponent>()
400 .expect("TokioComponent should be available")
401 .rt
402 .take();
403
404 rt.expect("runtime should not already be taken")
405 .run(self.start());
406
407 info!("finished cached chain state copy");
408 }
409}
410
411impl config::Override<ZebradConfig> for CopyStateCmd {
412 fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
416 if !self.filters.is_empty() {
417 config.tracing.filter = Some(self.filters.join(","));
418 }
419
420 Ok(config)
421 }
422}