zebra_state/service/finalized_state/disk_format/upgrade/
track_tx_locs_by_spends.rs1use std::sync::Arc;
4
5use crossbeam_channel::{Receiver, TryRecvError};
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7
8use zebra_chain::block::Height;
9
10use crate::{
11 service::{finalized_state::ZebraDb, non_finalized_state::Chain, read},
12 Spend,
13};
14
15use super::{super::super::DiskWriteBatch, CancelFormatChange};
16
17#[allow(clippy::unwrap_in_result)]
21#[instrument(skip(zebra_db, cancel_receiver))]
22pub fn run(
23 initial_tip_height: Height,
24 zebra_db: &ZebraDb,
25 cancel_receiver: &Receiver<CancelFormatChange>,
26) -> Result<(), CancelFormatChange> {
27 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
28 return Err(CancelFormatChange);
29 }
30
31 (0..=initial_tip_height.0)
32 .into_par_iter()
33 .try_for_each(|height| {
34 let height = Height(height);
35 let mut batch = DiskWriteBatch::new();
36 let mut should_index_at_height = false;
37
38 let transactions = zebra_db.transactions_by_location_range(
39 crate::TransactionLocation::from_index(height, 1)
40 ..=crate::TransactionLocation::max_for_height(height),
41 );
42
43 for (tx_loc, tx) in transactions {
44 if tx.is_coinbase() {
45 continue;
46 }
47
48 if !should_index_at_height {
49 if let Some(spend) = tx
50 .inputs()
51 .iter()
52 .filter_map(|input| Some(input.outpoint()?.into()))
53 .chain(tx.sprout_nullifiers().cloned().map(Spend::from))
54 .chain(tx.sapling_nullifiers().cloned().map(Spend::from))
55 .chain(tx.orchard_nullifiers().cloned().map(Spend::from))
56 .next()
57 {
58 if read::spending_transaction_hash::<Arc<Chain>>(None, zebra_db, spend)
59 .is_some()
60 {
61 return Ok(());
63 } else {
64 should_index_at_height = true
65 }
66 } else {
67 continue;
68 };
69 }
70
71 for input in tx.inputs() {
72 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
73 return Err(CancelFormatChange);
74 }
75
76 let spent_outpoint = input
77 .outpoint()
78 .expect("should filter out coinbase transactions");
79
80 let spent_output_location = zebra_db
81 .output_location(&spent_outpoint)
82 .expect("should have location for spent outpoint");
83
84 let _ = zebra_db
85 .tx_loc_by_spent_output_loc_cf()
86 .with_batch_for_writing(&mut batch)
87 .zs_insert(&spent_output_location, &tx_loc);
88 }
89
90 batch
91 .prepare_nullifier_batch(zebra_db, &tx, tx_loc)
92 .expect("method should never return an error");
93 }
94
95 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
96 return Err(CancelFormatChange);
97 }
98
99 zebra_db
100 .write_batch(batch)
101 .expect("unexpected database write failure");
102
103 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
104 return Err(CancelFormatChange);
105 }
106
107 Ok(())
108 })
109}