zebra_scan/service/scan_task/scan/
scan_range.rs

1//! Functions for registering new keys in the scan task
2
3use std::{collections::HashMap, sync::Arc};
4
5use crate::{
6    scan::{get_min_height, scan_height_and_store_results, wait_for_height, State, CHECK_INTERVAL},
7    storage::Storage,
8};
9use color_eyre::eyre::Report;
10use sapling_crypto::zip32::DiversifiableFullViewingKey;
11use tokio::{
12    sync::{mpsc::Sender, watch},
13    task::JoinHandle,
14};
15use tracing::Instrument;
16use zebra_chain::block::Height;
17use zebra_node_services::scan_service::response::ScanResult;
18use zebra_state::SaplingScanningKey;
19
20/// A builder for a scan until task
21pub struct ScanRangeTaskBuilder {
22    /// The range of block heights that should be scanned for these keys
23    // TODO: Remove start heights from keys and require that all keys per task use the same start height
24    height_range: std::ops::Range<Height>,
25
26    /// The keys to be used for scanning blocks in this task
27    keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
28
29    /// A handle to the state service for reading the blocks and the chain tip height
30    state: State,
31
32    /// A handle to the zebra-scan database for storing results
33    storage: Storage,
34}
35
36impl ScanRangeTaskBuilder {
37    /// Creates a new [`ScanRangeTaskBuilder`]
38    pub fn new(
39        stop_height: Height,
40        keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
41        state: State,
42        storage: Storage,
43    ) -> Self {
44        Self {
45            height_range: Height::MIN..stop_height,
46            keys,
47            state,
48            storage,
49        }
50    }
51
52    /// Spawns a `scan_range()` task and returns its [`JoinHandle`]
53    // TODO: return a tuple with a shutdown sender
54    pub fn spawn(
55        self,
56        subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
57    ) -> JoinHandle<Result<(), Report>> {
58        let Self {
59            height_range,
60            keys,
61            state,
62            storage,
63        } = self;
64
65        tokio::spawn(
66            scan_range(
67                height_range.end,
68                keys,
69                state,
70                storage,
71                subscribed_keys_receiver,
72            )
73            .in_current_span(),
74        )
75    }
76}
77
78/// Start a scan task that reads blocks from `state` within the provided height range,
79/// scans them with the configured keys in `storage`, and then writes the results to `storage`.
80// TODO: update the first parameter to `std::ops::Range<Height>`
81pub async fn scan_range(
82    stop_before_height: Height,
83    keys: HashMap<SaplingScanningKey, (DiversifiableFullViewingKey, Height)>,
84    state: State,
85    storage: Storage,
86    subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
87) -> Result<(), Report> {
88    let sapling_activation_height = storage.network().sapling_activation_height();
89    // Do not scan and notify if we are below sapling activation height.
90    wait_for_height(
91        sapling_activation_height,
92        "Sapling activation",
93        state.clone(),
94    )
95    .await?;
96
97    let key_heights: HashMap<String, Height> = keys
98        .iter()
99        .map(|(key, (_, height))| (key.clone(), *height))
100        .collect();
101
102    let mut height = get_min_height(&key_heights).unwrap_or(sapling_activation_height);
103
104    let key_heights = Arc::new(key_heights);
105
106    // Parse and convert keys once, then use them to scan all blocks.
107    let parsed_keys: HashMap<SaplingScanningKey, DiversifiableFullViewingKey> = keys
108        .into_iter()
109        .map(|(key, (dfvk, _))| (key, dfvk))
110        .collect();
111
112    while height < stop_before_height {
113        let scanned_height = scan_height_and_store_results(
114            height,
115            state.clone(),
116            None,
117            storage.clone(),
118            key_heights.clone(),
119            parsed_keys.clone(),
120            subscribed_keys_receiver.clone(),
121        )
122        .await?;
123
124        // If we've reached the tip, sleep for a while then try and get the same block.
125        if scanned_height.is_none() {
126            tokio::time::sleep(CHECK_INTERVAL).await;
127            continue;
128        }
129
130        height = height
131            .next()
132            .expect("a valid blockchain never reaches the max height");
133    }
134
135    info!(
136        start_height = ?height,
137        ?stop_before_height,
138        "finished scanning range"
139    );
140
141    Ok(())
142}