zebra_scan/service/scan_task/scan/
scan_range.rs
1use std::{collections::HashMap, sync::Arc};
4
5use crate::{
6 scan::{get_min_height, scan_height_and_store_results, wait_for_height, State, CHECK_INTERVAL},
7 storage::Storage,
8};
9use color_eyre::eyre::Report;
10use sapling_crypto::zip32::DiversifiableFullViewingKey;
11use tokio::{
12 sync::{mpsc::Sender, watch},
13 task::JoinHandle,
14};
15use tracing::Instrument;
16use zebra_chain::block::Height;
17use zebra_node_services::scan_service::response::ScanResult;
18use zebra_state::SaplingScanningKey;
19
20pub struct ScanRangeTaskBuilder {
22 height_range: std::ops::Range<Height>,
25
26 keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
28
29 state: State,
31
32 storage: Storage,
34}
35
36impl ScanRangeTaskBuilder {
37 pub fn new(
39 stop_height: Height,
40 keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
41 state: State,
42 storage: Storage,
43 ) -> Self {
44 Self {
45 height_range: Height::MIN..stop_height,
46 keys,
47 state,
48 storage,
49 }
50 }
51
52 pub fn spawn(
55 self,
56 subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
57 ) -> JoinHandle<Result<(), Report>> {
58 let Self {
59 height_range,
60 keys,
61 state,
62 storage,
63 } = self;
64
65 tokio::spawn(
66 scan_range(
67 height_range.end,
68 keys,
69 state,
70 storage,
71 subscribed_keys_receiver,
72 )
73 .in_current_span(),
74 )
75 }
76}
77
78pub async fn scan_range(
82 stop_before_height: Height,
83 keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
84 state: State,
85 storage: Storage,
86 subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
87) -> Result<(), Report> {
88 let sapling_activation_height = storage.network().sapling_activation_height();
89 wait_for_height(
91 sapling_activation_height,
92 "Sapling activation",
93 state.clone(),
94 )
95 .await?;
96
97 let key_heights: HashMap<String, Height> = keys
98 .iter()
99 .map(|(key, (_, height))| (key.clone(), *height))
100 .collect();
101
102 let mut height = get_min_height(&key_heights).unwrap_or(sapling_activation_height);
103
104 let key_heights = Arc::new(key_heights);
105
106 let parsed_keys: HashMap<SaplingScanningKey, DiversifiableFullViewingKey> = keys
108 .into_iter()
109 .map(|(key, (dfvk, _))| (key, dfvk))
110 .collect();
111
112 while height < stop_before_height {
113 let scanned_height = scan_height_and_store_results(
114 height,
115 state.clone(),
116 None,
117 storage.clone(),
118 key_heights.clone(),
119 parsed_keys.clone(),
120 subscribed_keys_receiver.clone(),
121 )
122 .await?;
123
124 if scanned_height.is_none() {
126 tokio::time::sleep(CHECK_INTERVAL).await;
127 continue;
128 }
129
130 height = height
131 .next()
132 .expect("a valid blockchain never reaches the max height");
133 }
134
135 info!(
136 start_height = ?height,
137 ?stop_before_height,
138 "finished scanning range"
139 );
140
141 Ok(())
142}