1use std::{
4 collections::{BTreeMap, HashMap},
5 sync::Arc,
6 time::Duration,
7};
8
9use color_eyre::{eyre::eyre, Report};
10use itertools::Itertools;
11use tokio::{
12 sync::{mpsc::Sender, watch},
13 task::JoinHandle,
14};
15use tower::{Service, ServiceExt};
16
17use tracing::Instrument;
18use zcash_address::unified::{Encoding, Fvk, Ufvk};
19use zcash_client_backend::{
20 data_api::ScannedBlock,
21 encoding::decode_extended_full_viewing_key,
22 keys::UnifiedFullViewingKey,
23 proto::compact_formats::{
24 ChainMetadata, CompactBlock, CompactSaplingOutput, CompactSaplingSpend, CompactTx,
25 },
26 scanning::{Nullifiers, ScanError, ScanningKeys},
27};
28use zip32::{AccountId, Scope};
29
30use sapling_crypto::zip32::DiversifiableFullViewingKey;
31
32use zebra_chain::{
33 block::{Block, Height},
34 chain_tip::ChainTip,
35 diagnostic::task::WaitForPanics,
36 parameters::Network,
37 serialization::ZcashSerialize,
38 transaction::Transaction,
39};
40use zebra_node_services::scan_service::response::ScanResult;
41use zebra_state::{ChainTipChange, ReadStateService, SaplingScannedResult, TransactionIndex};
42
43use crate::{
44 service::{ScanTask, ScanTaskCommand},
45 storage::{SaplingScanningKey, Storage},
46};
47
48use super::executor;
49
50mod scan_range;
51
52pub use scan_range::ScanRangeTaskBuilder;
53
54pub type State = ReadStateService;
56
57const INITIAL_WAIT: Duration = Duration::from_secs(15);
61
62pub const CHECK_INTERVAL: Duration = Duration::from_secs(10);
67
68const INFO_LOG_INTERVAL: u32 = 10_000;
70
71pub async fn start(
74 state: State,
75 chain_tip_change: ChainTipChange,
76 storage: Storage,
77 mut cmd_receiver: tokio::sync::mpsc::Receiver<ScanTaskCommand>,
78) -> Result<(), Report> {
79 let network = storage.network();
80 let sapling_activation_height = network.sapling_activation_height();
81
82 info!(?network, "starting scan task");
83
84 #[cfg(not(test))]
86 wait_for_height(
87 sapling_activation_height,
88 "Sapling activation",
89 state.clone(),
90 )
91 .await?;
92
93 let key_storage = storage.clone();
95 let key_heights = tokio::task::spawn_blocking(move || key_storage.sapling_keys_last_heights())
96 .wait_for_panics()
97 .await;
98 let key_heights = Arc::new(key_heights);
99
100 let mut height = get_min_height(&key_heights).unwrap_or(sapling_activation_height);
101
102 info!(start_height = ?height, "got min scan height");
103
104 let mut parsed_keys: HashMap<SaplingScanningKey, DiversifiableFullViewingKey> = key_heights
107 .keys()
108 .map(|key| Ok::<_, Report>((key.clone(), sapling_key_to_dfvk(key, &network)?)))
109 .try_collect()?;
110
111 let mut subscribed_keys: HashMap<SaplingScanningKey, Sender<ScanResult>> = HashMap::new();
112
113 let (subscribed_keys_sender, subscribed_keys_receiver) =
114 tokio::sync::watch::channel(Arc::new(subscribed_keys.clone()));
115
116 let (scan_task_sender, scan_task_executor_handle) =
117 executor::spawn_init(subscribed_keys_receiver.clone());
118 let mut scan_task_executor_handle = Some(scan_task_executor_handle);
119
120 tokio::time::sleep(INITIAL_WAIT).await;
122
123 loop {
124 if let Some(handle) = scan_task_executor_handle {
125 if handle.is_finished() {
126 warn!("scan task finished unexpectedly");
127
128 handle.await?.map_err(|err| eyre!(err))?;
129 return Ok(());
130 } else {
131 scan_task_executor_handle = Some(handle);
132 }
133 }
134
135 let was_parsed_keys_empty = parsed_keys.is_empty();
136
137 let (new_keys, new_result_senders, new_result_receivers) =
138 ScanTask::process_messages(&mut cmd_receiver, &mut parsed_keys, &network)?;
139
140 subscribed_keys.extend(new_result_senders);
141 subscribed_keys.retain(|key, sender| !sender.is_closed() && parsed_keys.contains_key(key));
143
144 subscribed_keys_sender
146 .send(Arc::new(subscribed_keys.clone()))
147 .expect("last receiver should not be dropped while this task is running");
148
149 for (result_receiver, rsp_tx) in new_result_receivers {
150 let _ = rsp_tx.send(result_receiver);
152 }
153
154 if !new_keys.is_empty() {
155 let state = state.clone();
156 let storage = storage.clone();
157
158 let start_height = new_keys
159 .iter()
160 .map(|(_, (_, height))| *height)
161 .min()
162 .unwrap_or(sapling_activation_height);
163
164 if was_parsed_keys_empty {
165 info!(?start_height, "setting new start height");
166 height = start_height;
167 }
168 else if start_height < height {
170 scan_task_sender
171 .send(ScanRangeTaskBuilder::new(height, new_keys, state, storage))
172 .await
173 .expect("scan_until_task channel should not be closed");
174 }
175 }
176
177 if !parsed_keys.is_empty() {
178 let scanned_height = scan_height_and_store_results(
179 height,
180 state.clone(),
181 Some(chain_tip_change.clone()),
182 storage.clone(),
183 key_heights.clone(),
184 parsed_keys.clone(),
185 subscribed_keys_receiver.clone(),
186 )
187 .await?;
188
189 if scanned_height.is_none() {
191 tokio::time::sleep(CHECK_INTERVAL).await;
192 continue;
193 }
194 } else {
195 tokio::time::sleep(CHECK_INTERVAL).await;
196 continue;
197 }
198
199 height = height
200 .next()
201 .expect("a valid blockchain never reaches the max height");
202 }
203}
204
205pub async fn wait_for_height(
207 height: Height,
208 height_name: &'static str,
209 state: State,
210) -> Result<(), Report> {
211 loop {
212 let tip_height = tip_height(state.clone()).await?;
213 if tip_height < height {
214 info!(
215 "scanner is waiting for {height_name}. Current tip: {}, {height_name}: {}",
216 tip_height.0, height.0
217 );
218
219 tokio::time::sleep(CHECK_INTERVAL).await;
220 } else {
221 info!(
222 "scanner finished waiting for {height_name}. Current tip: {}, {height_name}: {}",
223 tip_height.0, height.0
224 );
225
226 break;
227 }
228 }
229
230 Ok(())
231}
232
233pub async fn scan_height_and_store_results(
241 height: Height,
242 mut state: State,
243 chain_tip_change: Option<ChainTipChange>,
244 storage: Storage,
245 key_last_scanned_heights: Arc<HashMap<SaplingScanningKey, Height>>,
246 parsed_keys: HashMap<String, DiversifiableFullViewingKey>,
247 subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
248) -> Result<Option<Height>, Report> {
249 let network = storage.network();
250
251 let is_info_log = height.0 % INFO_LOG_INTERVAL == 0;
255
256 let block = state
259 .ready()
260 .await
261 .map_err(|e| eyre!(e))?
262 .call(zebra_state::ReadRequest::Block(height.into()))
263 .await
264 .map_err(|e| eyre!(e))?;
265
266 let block = match block {
267 zebra_state::ReadResponse::Block(Some(block)) => block,
268 zebra_state::ReadResponse::Block(None) => return Ok(None),
269 _ => unreachable!("unmatched response to a state::Block request"),
270 };
271
272 for (key_index_in_task, (sapling_key, _)) in parsed_keys.iter().enumerate() {
273 match key_last_scanned_heights.get(sapling_key) {
274 Some(last_scanned_height) if height <= *last_scanned_height => continue,
276
277 Some(last_scanned_height) if is_info_log => {
278 if let Some(chain_tip_change) = &chain_tip_change {
279 info!(
284 "Scanning the blockchain for key {}, started at block {:?}, now at block {:?}, current tip {:?}",
285 key_index_in_task, last_scanned_height.next().expect("height is not maximum").as_usize(),
286 height.as_usize(),
287 chain_tip_change.latest_chain_tip().best_tip_height().expect("we should have a tip to scan").as_usize(),
288 );
289 } else {
290 info!(
291 "Scanning the blockchain for key {}, started at block {:?}, now at block {:?}",
292 key_index_in_task, last_scanned_height.next().expect("height is not maximum").as_usize(),
293 height.as_usize(),
294 );
295 }
296 }
297
298 _other => {}
299 };
300
301 let subscribed_keys_receiver = subscribed_keys_receiver.clone();
302
303 let sapling_key = sapling_key.clone();
304 let block = block.clone();
305 let mut storage = storage.clone();
306 let network = network.clone();
307 let parsed_keys = parsed_keys.clone();
308
309 let sapling_tree_size = 1 << 16;
320
321 tokio::task::spawn_blocking(move || {
322 let scanning_keys = scanning_keys(parsed_keys.values()).expect("scanning keys");
326
327 let scanned_block = scan_block(&network, &block, sapling_tree_size, &scanning_keys)
328 .map_err(|e| eyre!(e))?;
329
330 let scanning_result = scanned_block_to_db_result(scanned_block);
331
332 let latest_subscribed_keys = subscribed_keys_receiver.borrow().clone();
333 if let Some(results_sender) = latest_subscribed_keys.get(&sapling_key).cloned() {
334 for (_tx_index, tx_id) in scanning_result.clone() {
335 let _ = results_sender.try_send(ScanResult {
337 key: sapling_key.clone(),
338 height,
339 tx_id: tx_id.into(),
340 });
341 }
342 }
343
344 storage.add_sapling_results(&sapling_key, height, scanning_result);
345 Ok::<_, Report>(())
346 })
347 .wait_for_panics()
348 .await?;
349 }
350
351 Ok(Some(height))
352}
353
354pub fn scan_block(
365 network: &Network,
366 block: &Block,
367 sapling_tree_size: u32,
368 scanning_key: &ScanningKeys<AccountId, (AccountId, Scope)>,
369) -> Result<ScannedBlock<AccountId>, ScanError> {
370 let chain_metadata = ChainMetadata {
374 sapling_commitment_tree_size: sapling_tree_size,
375 orchard_commitment_tree_size: 0,
377 };
378
379 zcash_client_backend::scanning::scan_block(
380 &zp_network(network),
381 block_to_compact(block, chain_metadata),
382 scanning_key,
383 &Nullifiers::empty(),
385 None,
387 )
388}
389
390pub fn sapling_key_to_dfvk(
393 key: &SaplingScanningKey,
394 network: &Network,
395) -> Result<DiversifiableFullViewingKey, Report> {
396 Ok(
397 decode_extended_full_viewing_key(network.sapling_efvk_hrp(), key)
398 .map_err(|e| eyre!(e))?
399 .to_diversifiable_full_viewing_key(),
400 )
401}
402
403pub fn block_to_compact(block: &Block, chain_metadata: ChainMetadata) -> CompactBlock {
405 CompactBlock {
406 height: block
407 .coinbase_height()
408 .expect("verified block should have a valid height")
409 .0
410 .into(),
411 hash: block.hash().bytes_in_display_order().to_vec(),
413 prev_hash: block
414 .header
415 .previous_block_hash
416 .bytes_in_display_order()
417 .to_vec(),
418 time: block
419 .header
420 .time
421 .timestamp()
422 .try_into()
423 .expect("unsigned 32-bit times should work until 2105"),
424 header: block
425 .header
426 .zcash_serialize_to_vec()
427 .expect("verified block should serialize"),
428 vtx: block
429 .transactions
430 .iter()
431 .cloned()
432 .enumerate()
433 .map(transaction_to_compact)
434 .collect(),
435 chain_metadata: Some(chain_metadata),
436
437 proto_version: 0,
439 }
440}
441
442fn transaction_to_compact((index, tx): (usize, Arc<Transaction>)) -> CompactTx {
444 CompactTx {
445 index: index
446 .try_into()
447 .expect("tx index in block should fit in u64"),
448 hash: tx.hash().bytes_in_display_order().to_vec(),
450
451 fee: 0,
454
455 spends: tx
456 .sapling_nullifiers()
457 .map(|nf| CompactSaplingSpend {
458 nf: <[u8; 32]>::from(*nf).to_vec(),
459 })
460 .collect(),
461
462 outputs: tx
466 .sapling_outputs()
467 .map(|output| CompactSaplingOutput {
468 cmu: output.cm_u.to_bytes().to_vec(),
469 ephemeral_key: output
470 .ephemeral_key
471 .zcash_serialize_to_vec()
472 .expect("verified output should serialize successfully"),
473 ciphertext: output
474 .enc_ciphertext
475 .zcash_serialize_to_vec()
476 .expect("verified output should serialize successfully")
477 .into_iter()
478 .take(52)
479 .collect(),
480 })
481 .collect(),
482
483 actions: vec![],
485 }
486}
487
488fn scanned_block_to_db_result<Nf>(
490 scanned_block: ScannedBlock<Nf>,
491) -> BTreeMap<TransactionIndex, SaplingScannedResult> {
492 scanned_block
493 .transactions()
494 .iter()
495 .map(|tx| {
496 (
497 TransactionIndex::from_usize(tx.block_index()),
498 SaplingScannedResult::from_bytes_in_display_order(*tx.txid().as_ref()),
499 )
500 })
501 .collect()
502}
503
504fn get_min_height(map: &HashMap<String, Height>) -> Option<Height> {
506 map.values().cloned().min()
507}
508
509async fn tip_height(mut state: State) -> Result<Height, Report> {
511 let tip = state
512 .ready()
513 .await
514 .map_err(|e| eyre!(e))?
515 .call(zebra_state::ReadRequest::Tip)
516 .await
517 .map_err(|e| eyre!(e))?;
518
519 match tip {
520 zebra_state::ReadResponse::Tip(Some((height, _hash))) => Ok(height),
521 zebra_state::ReadResponse::Tip(None) => Ok(Height(0)),
522 _ => unreachable!("unmatched response to a state::Tip request"),
523 }
524}
525
526pub fn spawn_init(
530 storage: Storage,
531 state: State,
532 chain_tip_change: ChainTipChange,
533 cmd_receiver: tokio::sync::mpsc::Receiver<ScanTaskCommand>,
534) -> JoinHandle<Result<(), Report>> {
535 tokio::spawn(start(state, chain_tip_change, storage, cmd_receiver).in_current_span())
536}
537
538pub fn scanning_keys<'a>(
540 dfvks: impl IntoIterator<Item = &'a DiversifiableFullViewingKey>,
541) -> Result<ScanningKeys<AccountId, (AccountId, Scope)>, Report> {
542 dfvks
543 .into_iter()
544 .enumerate()
545 .map(|(i, dfvk)| {
546 Ok((
547 AccountId::try_from(u32::try_from(i)?).map_err(|err| eyre!(err))?,
548 dfvk_to_ufvk(dfvk)?,
549 ))
550 })
551 .try_collect::<(_, _), Vec<(_, _)>, _>()
552 .map(ScanningKeys::from_account_ufvks)
553}
554
555pub fn dfvk_to_ufvk(dfvk: &DiversifiableFullViewingKey) -> Result<UnifiedFullViewingKey, Report> {
557 UnifiedFullViewingKey::parse(&Ufvk::try_from_items(vec![Fvk::try_from((
558 2,
559 &dfvk.to_bytes()[..],
560 ))?])?)
561 .map_err(|e| eyre!(e))
562}
563
564pub fn zp_network(network: &Network) -> zcash_protocol::consensus::Network {
566 match network {
567 Network::Mainnet => zcash_protocol::consensus::Network::MainNetwork,
568 Network::Testnet(_) => zcash_protocol::consensus::Network::TestNetwork,
569 }
570}