zebra_scan/service/scan_task/scan/
scan_range.rsuse std::{collections::HashMap, sync::Arc};
use crate::{
scan::{get_min_height, scan_height_and_store_results, wait_for_height, State, CHECK_INTERVAL},
storage::Storage,
};
use color_eyre::eyre::Report;
use sapling_crypto::zip32::DiversifiableFullViewingKey;
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::Instrument;
use zebra_chain::block::Height;
use zebra_node_services::scan_service::response::ScanResult;
use zebra_state::SaplingScanningKey;
pub struct ScanRangeTaskBuilder {
height_range: std::ops::Range<Height>,
keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
state: State,
storage: Storage,
}
impl ScanRangeTaskBuilder {
pub fn new(
stop_height: Height,
keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
state: State,
storage: Storage,
) -> Self {
Self {
height_range: Height::MIN..stop_height,
keys,
state,
storage,
}
}
pub fn spawn(
self,
subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
) -> JoinHandle<Result<(), Report>> {
let Self {
height_range,
keys,
state,
storage,
} = self;
tokio::spawn(
scan_range(
height_range.end,
keys,
state,
storage,
subscribed_keys_receiver,
)
.in_current_span(),
)
}
}
pub async fn scan_range(
stop_before_height: Height,
keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
state: State,
storage: Storage,
subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
) -> Result<(), Report> {
let sapling_activation_height = storage.network().sapling_activation_height();
wait_for_height(
sapling_activation_height,
"Sapling activation",
state.clone(),
)
.await?;
let key_heights: HashMap<String, Height> = keys
.iter()
.map(|(key, (_, height))| (key.clone(), *height))
.collect();
let mut height = get_min_height(&key_heights).unwrap_or(sapling_activation_height);
let key_heights = Arc::new(key_heights);
let parsed_keys: HashMap<SaplingScanningKey, DiversifiableFullViewingKey> = keys
.into_iter()
.map(|(key, (dfvk, _))| (key, dfvk))
.collect();
while height < stop_before_height {
let scanned_height = scan_height_and_store_results(
height,
state.clone(),
None,
storage.clone(),
key_heights.clone(),
parsed_keys.clone(),
subscribed_keys_receiver.clone(),
)
.await?;
if scanned_height.is_none() {
tokio::time::sleep(CHECK_INTERVAL).await;
continue;
}
height = height
.next()
.expect("a valid blockchain never reaches the max height");
}
info!(
start_height = ?height,
?stop_before_height,
"finished scanning range"
);
Ok(())
}