zebra_scan/service/scan_task/
scan.rs

1//! The scanner task and scanning APIs.
2
3use std::{
4    collections::{BTreeMap, HashMap},
5    sync::Arc,
6    time::Duration,
7};
8
9use color_eyre::{eyre::eyre, Report};
10use itertools::Itertools;
11use tokio::{
12    sync::{mpsc::Sender, watch},
13    task::JoinHandle,
14};
15use tower::{Service, ServiceExt};
16
17use tracing::Instrument;
18use zcash_address::unified::{Encoding, Fvk, Ufvk};
19use zcash_client_backend::{
20    data_api::ScannedBlock,
21    encoding::decode_extended_full_viewing_key,
22    keys::UnifiedFullViewingKey,
23    proto::compact_formats::{
24        ChainMetadata, CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx,
25    },
26    scanning::{Nullifiers, ScanError, ScanningKeys},
27};
28use zip32::{AccountId, Scope};
29
30use sapling_crypto::zip32::DiversifiableFullViewingKey;
31
32use zebra_chain::{
33    block::{Block, Height},
34    chain_tip::ChainTip,
35    diagnostic::task::WaitForPanics,
36    parameters::Network,
37    serialization::ZcashSerialize,
38    transaction::Transaction,
39};
40use zebra_node_services::scan_service::response::ScanResult;
41use zebra_state::{ChainTipChange, ReadStateService, SaplingScannedResult, TransactionIndex};
42
43use crate::{
44    service::{ScanTask, ScanTaskCommand},
45    storage::{SaplingScanningKey, Storage},
46};
47
48use super::executor;
49
50mod scan_range;
51
52pub use scan_range::ScanRangeTaskBuilder;
53
54/// The read state type used by the scanner.
55pub type State = ReadStateService;
56
57/// Wait a few seconds at startup for some blocks to get verified.
58///
59/// But sometimes the state might be empty if the network is slow.
60const INITIAL_WAIT: Duration = Duration::from_secs(15);
61
62/// The amount of time between checking for new blocks and starting new scans.
63///
64/// TODO: The current value is set to 10 so that tests don't sleep for too long and finish faster.
65///       Set it to 30 after #8250 gets addressed or remove this const completely in the refactor.
66pub const CHECK_INTERVAL: Duration = Duration::from_secs(10);
67
68/// We log an info log with progress after this many blocks.
69const INFO_LOG_INTERVAL: u32 = 10_000;
70
71/// Start a scan task that reads blocks from `state`, scans them with the configured keys in
72/// `storage`, and then writes the results to `storage`.
73pub async fn start(
74    state: State,
75    chain_tip_change: ChainTipChange,
76    storage: Storage,
77    mut cmd_receiver: tokio::sync::mpsc::Receiver<ScanTaskCommand>,
78) -> Result<(), Report> {
79    let network = storage.network();
80    let sapling_activation_height = network.sapling_activation_height();
81
82    info!(?network, "starting scan task");
83
84    // Do not scan and notify if we are below sapling activation height.
85    #[cfg(not(test))]
86    wait_for_height(
87        sapling_activation_height,
88        "Sapling activation",
89        state.clone(),
90    )
91    .await?;
92
93    // Read keys from the storage on disk, which can block async execution.
94    let key_storage = storage.clone();
95    let key_heights = tokio::task::spawn_blocking(move || key_storage.sapling_keys_last_heights())
96        .wait_for_panics()
97        .await;
98    let key_heights = Arc::new(key_heights);
99
100    let mut height = get_min_height(&key_heights).unwrap_or(sapling_activation_height);
101
102    info!(start_height = ?height, "got min scan height");
103
104    // Parse and convert keys once, then use them to scan all blocks.
105    // There is some cryptography here, but it should be fast even with thousands of keys.
106    let mut parsed_keys: HashMap<SaplingScanningKey, DiversifiableFullViewingKey> = key_heights
107        .keys()
108        .map(|key| Ok::<_, Report>((key.clone(), sapling_key_to_dfvk(key, &network)?)))
109        .try_collect()?;
110
111    let mut subscribed_keys: HashMap<SaplingScanningKey, Sender<ScanResult>> = HashMap::new();
112
113    let (subscribed_keys_sender, subscribed_keys_receiver) =
114        tokio::sync::watch::channel(Arc::new(subscribed_keys.clone()));
115
116    let (scan_task_sender, scan_task_executor_handle) =
117        executor::spawn_init(subscribed_keys_receiver.clone());
118    let mut scan_task_executor_handle = Some(scan_task_executor_handle);
119
120    // Give empty states time to verify some blocks before we start scanning.
121    tokio::time::sleep(INITIAL_WAIT).await;
122
123    loop {
124        if let Some(handle) = scan_task_executor_handle {
125            if handle.is_finished() {
126                warn!("scan task finished unexpectedly");
127
128                handle.await?.map_err(|err| eyre!(err))?;
129                return Ok(());
130            } else {
131                scan_task_executor_handle = Some(handle);
132            }
133        }
134
135        let was_parsed_keys_empty = parsed_keys.is_empty();
136
137        let (new_keys, new_result_senders, new_result_receivers) =
138            ScanTask::process_messages(&mut cmd_receiver, &mut parsed_keys, &network)?;
139
140        subscribed_keys.extend(new_result_senders);
141        // Drop any results senders that are closed from subscribed_keys
142        subscribed_keys.retain(|key, sender| !sender.is_closed() && parsed_keys.contains_key(key));
143
144        // Send the latest version of `subscribed_keys` before spawning the scan range task
145        subscribed_keys_sender
146            .send(Arc::new(subscribed_keys.clone()))
147            .expect("last receiver should not be dropped while this task is running");
148
149        for (result_receiver, rsp_tx) in new_result_receivers {
150            // Ignore send errors, we drop any closed results channels above.
151            let _ = rsp_tx.send(result_receiver);
152        }
153
154        if !new_keys.is_empty() {
155            let state = state.clone();
156            let storage = storage.clone();
157
158            let start_height = new_keys
159                .iter()
160                .map(|(_, (_, height))| *height)
161                .min()
162                .unwrap_or(sapling_activation_height);
163
164            if was_parsed_keys_empty {
165                info!(?start_height, "setting new start height");
166                height = start_height;
167            }
168            // Skip spawning ScanRange task if `start_height` is at or above the current height
169            else if start_height < height {
170                scan_task_sender
171                    .send(ScanRangeTaskBuilder::new(height, new_keys, state, storage))
172                    .await
173                    .expect("scan_until_task channel should not be closed");
174            }
175        }
176
177        if !parsed_keys.is_empty() {
178            let scanned_height = scan_height_and_store_results(
179                height,
180                state.clone(),
181                Some(chain_tip_change.clone()),
182                storage.clone(),
183                key_heights.clone(),
184                parsed_keys.clone(),
185                subscribed_keys_receiver.clone(),
186            )
187            .await?;
188
189            // If we've reached the tip, sleep for a while then try and get the same block.
190            if scanned_height.is_none() {
191                tokio::time::sleep(CHECK_INTERVAL).await;
192                continue;
193            }
194        } else {
195            tokio::time::sleep(CHECK_INTERVAL).await;
196            continue;
197        }
198
199        height = height
200            .next()
201            .expect("a valid blockchain never reaches the max height");
202    }
203}
204
205/// Polls state service for tip height every [`CHECK_INTERVAL`] until the tip reaches the provided `tip_height`
206pub async fn wait_for_height(
207    height: Height,
208    height_name: &'static str,
209    state: State,
210) -> Result<(), Report> {
211    loop {
212        let tip_height = tip_height(state.clone()).await?;
213        if tip_height < height {
214            info!(
215                "scanner is waiting for {height_name}. Current tip: {}, {height_name}: {}",
216                tip_height.0, height.0
217            );
218
219            tokio::time::sleep(CHECK_INTERVAL).await;
220        } else {
221            info!(
222                "scanner finished waiting for {height_name}. Current tip: {}, {height_name}: {}",
223                tip_height.0, height.0
224            );
225
226            break;
227        }
228    }
229
230    Ok(())
231}
232
233/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the
234/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it.
235///
236/// Returns:
237/// - `Ok(Some(height))` if the height was scanned,
238/// - `Ok(None)` if the height was not in the state, and
239/// - `Err(error)` on fatal errors.
240pub async fn scan_height_and_store_results(
241    height: Height,
242    mut state: State,
243    chain_tip_change: Option<ChainTipChange>,
244    storage: Storage,
245    key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
246    parsed_keys: HashMap<String, DiversifiableFullViewingKey>,
247    subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
248) -> Result<Option<Height>, Report> {
249    let network = storage.network();
250
251    // Only log at info level every 100,000 blocks.
252    //
253    // TODO: also log progress every 5 minutes once we reach the tip?
254    let is_info_log = height.0 % INFO_LOG_INTERVAL == 0;
255
256    // Get a block from the state.
257    // We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init().
258    let block = state
259        .ready()
260        .await
261        .map_err(|e| eyre!(e))?
262        .call(zebra_state::ReadRequest::Block(height.into()))
263        .await
264        .map_err(|e| eyre!(e))?;
265
266    let block = match block {
267        zebra_state::ReadResponse::Block(Some(block)) => block,
268        zebra_state::ReadResponse::Block(None) => return Ok(None),
269        _ => unreachable!("unmatched response to a state::Block request"),
270    };
271
272    for (key_index_in_task, (sapling_key, _)) in parsed_keys.iter().enumerate() {
273        match key_last_scanned_heights.get(sapling_key) {
274            // Only scan what was not scanned for each key
275            Some(last_scanned_height) if height <= *last_scanned_height => continue,
276
277            Some(last_scanned_height) if is_info_log => {
278                if let Some(chain_tip_change) = &chain_tip_change {
279                    // # Security
280                    //
281                    // We can't log `sapling_key` here because it is a private viewing key. Anyone who reads
282                    // the logs could use the key to view those transactions.
283                    info!(
284                        "Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
285                        key_index_in_task, last_scanned_height.next().expect("height is not maximum").as_usize(),
286                        height.as_usize(),
287                        chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
288                    );
289                } else {
290                    info!(
291                        "Scanning the blockchain for key {}, started at block {:?}, now at block {:?}",
292                        key_index_in_task, last_scanned_height.next().expect("height is not maximum").as_usize(),
293                        height.as_usize(),
294                    );
295                }
296            }
297
298            _other => {}
299        };
300
301        let subscribed_keys_receiver = subscribed_keys_receiver.clone();
302
303        let sapling_key = sapling_key.clone();
304        let block = block.clone();
305        let mut storage = storage.clone();
306        let network = network.clone();
307        let parsed_keys = parsed_keys.clone();
308
309        // We use a dummy size of the Sapling note commitment tree.
310        //
311        // We can't set the size to zero, because the underlying scanning function would return
312        // `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`.
313        //
314        // And we can't set them close to 0, because the scanner subtracts the number of notes
315        // in the block, and panics with "attempt to subtract with overflow". The number of
316        // notes in a block must be less than this value, this is a consensus rule.
317        //
318        // TODO: use the real sapling tree size: `zs::Response::SaplingTree().position() + 1`
319        let sapling_tree_size = 1 << 16;
320
321        tokio::task::spawn_blocking(move || {
322            // TODO:
323            // - Wait until https://github.com/zcash/librustzcash/pull/1400 makes it to a release.
324            // - Create the scanning keys outside of this thread and move them here instead.
325            let scanning_keys = scanning_keys(parsed_keys.values()).expect("scanning keys");
326
327            let scanned_block = scan_block(&network, &block, sapling_tree_size, &scanning_keys)
328                .map_err(|e| eyre!(e))?;
329
330            let scanning_result = scanned_block_to_db_result(scanned_block);
331
332            let latest_subscribed_keys = subscribed_keys_receiver.borrow().clone();
333            if let Some(results_sender) = latest_subscribed_keys.get(&sapling_key).cloned() {
334                for (_tx_index, tx_id) in scanning_result.clone() {
335                    // TODO: Handle `SendErrors` by dropping sender from `subscribed_keys`
336                    let _ = results_sender.try_send(ScanResult {
337                        key: sapling_key.clone(),
338                        height,
339                        tx_id: tx_id.into(),
340                    });
341                }
342            }
343
344            storage.add_sapling_results(&sapling_key, height, scanning_result);
345            Ok::<_, Report>(())
346        })
347        .wait_for_panics()
348        .await?;
349    }
350
351    Ok(Some(height))
352}
353
354/// Returns the transactions from `block` belonging to the given `scanning_keys`.
355///
356/// # Performance / Hangs
357///
358/// This method can block while reading database files, so it must be inside spawn_blocking()
359/// in async code.
360///
361/// TODO:
362/// - Pass the real `sapling_tree_size` parameter from the state.
363/// - Add other prior block metadata.
364pub fn scan_block(
365    network: &Network,
366    block: &Block,
367    sapling_tree_size: u32,
368    scanning_key: &ScanningKeys<AccountId, (AccountId, Scope)>,
369) -> Result<ScannedBlock<AccountId>, ScanError> {
370    // TODO: Implement a check that returns early when the block height is below the Sapling
371    // activation height.
372
373    let chain_metadata = ChainMetadata {
374        sapling_commitment_tree_size: sapling_tree_size,
375        // Orchard is not supported at the moment so the tree size can be 0.
376        orchard_commitment_tree_size: 0,
377    };
378
379    zcash_client_backend::scanning::scan_block(
380        &zp_network(network),
381        block_to_compact(block, chain_metadata),
382        scanning_key,
383        // Ignore whether notes are change from a viewer's own spends for now.
384        &Nullifiers::empty(),
385        // Ignore previous blocks for now.
386        None,
387    )
388}
389
390/// Converts a Zebra-format scanning key into diversifiable full viewing key.
391// TODO: use `ViewingKey::parse` from zebra-chain instead
392pub fn sapling_key_to_dfvk(
393    key: &SaplingScanningKey,
394    network: &Network,
395) -> Result<DiversifiableFullViewingKey, Report> {
396    Ok(
397        decode_extended_full_viewing_key(network.sapling_efvk_hrp(), key)
398            .map_err(|e| eyre!(e))?
399            .to_diversifiable_full_viewing_key(),
400    )
401}
402
403/// Converts a zebra block and meta data into a compact block.
404pub fn block_to_compact(block: &Block, chain_metadata: ChainMetadata) -> CompactBlock {
405    CompactBlock {
406        height: block
407            .coinbase_height()
408            .expect("verified block should have a valid height")
409            .0
410            .into(),
411        // TODO: performance: look up the block hash from the state rather than recalculating it
412        hash: block.hash().bytes_in_display_order().to_vec(),
413        prev_hash: block
414            .header
415            .previous_block_hash
416            .bytes_in_display_order()
417            .to_vec(),
418        time: block
419            .header
420            .time
421            .timestamp()
422            .try_into()
423            .expect("unsigned 32-bit times should work until 2105"),
424        header: block
425            .header
426            .zcash_serialize_to_vec()
427            .expect("verified block should serialize"),
428        vtx: block
429            .transactions
430            .iter()
431            .cloned()
432            .enumerate()
433            .map(transaction_to_compact)
434            .collect(),
435        chain_metadata: Some(chain_metadata),
436
437        // The protocol version is used for the gRPC wire format, so it isn't needed here.
438        proto_version: 0,
439    }
440}
441
442/// Converts a zebra transaction into a compact transaction.
443fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
444    CompactTx {
445        index: index
446            .try_into()
447            .expect("tx index in block should fit in u64"),
448        // TODO: performance: look up the tx hash from the state rather than recalculating it
449        hash: tx.hash().bytes_in_display_order().to_vec(),
450
451        // `fee` is not checked by the `scan_block` function. It is allowed to be unset.
452        // <https://docs.rs/zcash_client_backend/latest/zcash_client_backend/proto/compact_formats/struct.CompactTx.html#structfield.fee>
453        fee: 0,
454
455        spends: tx
456            .sapling_nullifiers()
457            .map(|nf| CompactSaplingSpend {
458                nf: <[u8; 32]>::from(*nf).to_vec(),
459            })
460            .collect(),
461
462        // > output encodes the cmu field, ephemeralKey field, and a 52-byte prefix of the encCiphertext field of a Sapling Output
463        //
464        // <https://docs.rs/zcash_client_backend/latest/zcash_client_backend/proto/compact_formats/struct.CompactSaplingOutput.html>
465        outputs: tx
466            .sapling_outputs()
467            .map(|output| CompactSaplingOutput {
468                cmu: output.cm_u.to_bytes().to_vec(),
469                ephemeral_key: output
470                    .ephemeral_key
471                    .zcash_serialize_to_vec()
472                    .expect("verified output should serialize successfully"),
473                ciphertext: output
474                    .enc_ciphertext
475                    .zcash_serialize_to_vec()
476                    .expect("verified output should serialize successfully")
477                    .into_iter()
478                    .take(52)
479                    .collect(),
480            })
481            .collect(),
482
483        // `actions` is not checked by the `scan_block` function.
484        actions: vec![],
485    }
486}
487
488/// Convert a scanned block to a list of scanner database results.
489fn scanned_block_to_db_result<Nf>(
490    scanned_block: ScannedBlock<Nf>,
491) -> BTreeMap<TransactionIndex, SaplingScannedResult> {
492    scanned_block
493        .transactions()
494        .iter()
495        .map(|tx| {
496            (
497                TransactionIndex::from_usize(tx.block_index()),
498                SaplingScannedResult::from_bytes_in_display_order(*tx.txid().as_ref()),
499            )
500        })
501        .collect()
502}
503
504/// Get the minimal height available in a key_heights map.
505fn get_min_height(map: &HashMap<String, Height>) -> Option<Height> {
506    map.values().cloned().min()
507}
508
509/// Get tip height or return genesis block height if no tip is available.
510async fn tip_height(mut state: State) -> Result<Height, Report> {
511    let tip = state
512        .ready()
513        .await
514        .map_err(|e| eyre!(e))?
515        .call(zebra_state::ReadRequest::Tip)
516        .await
517        .map_err(|e| eyre!(e))?;
518
519    match tip {
520        zebra_state::ReadResponse::Tip(Some((height, _hash))) => Ok(height),
521        zebra_state::ReadResponse::Tip(None) => Ok(Height(0)),
522        _ => unreachable!("unmatched response to a state::Tip request"),
523    }
524}
525
526/// Initialize the scanner based on its config, and spawn a task for it.
527///
528/// TODO: add a test for this function.
529pub fn spawn_init(
530    storage: Storage,
531    state: State,
532    chain_tip_change: ChainTipChange,
533    cmd_receiver: tokio::sync::mpsc::Receiver<ScanTaskCommand>,
534) -> JoinHandle<Result<(), Report>> {
535    tokio::spawn(start(state, chain_tip_change, storage, cmd_receiver).in_current_span())
536}
537
538/// Turns an iterator of [`DiversifiableFullViewingKey`]s to [`ScanningKeys`].
539pub fn scanning_keys<'a>(
540    dfvks: impl IntoIterator<Item = &'a DiversifiableFullViewingKey>,
541) -> Result<ScanningKeys<AccountId, (AccountId, Scope)>, Report> {
542    dfvks
543        .into_iter()
544        .enumerate()
545        .map(|(i, dfvk)| {
546            Ok((
547                AccountId::try_from(u32::try_from(i)?).map_err(|err| eyre!(err))?,
548                dfvk_to_ufvk(dfvk)?,
549            ))
550        })
551        .try_collect::<(_, _), Vec<(_, _)>, _>()
552        .map(ScanningKeys::from_account_ufvks)
553}
554
555/// Turns a [`DiversifiableFullViewingKey`] to [`UnifiedFullViewingKey`].
556pub fn dfvk_to_ufvk(dfvk: &DiversifiableFullViewingKey) -> Result<UnifiedFullViewingKey, Report> {
557    UnifiedFullViewingKey::parse(&Ufvk::try_from_items(vec![Fvk::try_from((
558        2,
559        &dfvk.to_bytes()[..],
560    ))?])?)
561    .map_err(|e| eyre!(e))
562}
563
564/// Returns the [`zcash_protocol::consensus::Network`] for this network.
565pub fn zp_network(network: &Network) -> zcash_protocol::consensus::Network {
566    match network {
567        Network::Mainnet => zcash_protocol::consensus::Network::MainNetwork,
568        Network::Testnet(_) => zcash_protocol::consensus::Network::TestNetwork,
569    }
570}