zebra_state/service/finalized_state/disk_format/upgrade/
track_tx_locs_by_spends.rs

1//! Tracks transaction locations by their inputs and revealed nullifiers.
2
3use std::sync::Arc;
4
5use crossbeam_channel::{Receiver, TryRecvError};
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7
8use zebra_chain::block::Height;
9
10use crate::{
11    service::{finalized_state::ZebraDb, non_finalized_state::Chain, read},
12    Spend,
13};
14
15use super::{super::super::DiskWriteBatch, CancelFormatChange};
16
17/// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers.
18///
19/// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled.
20#[allow(clippy::unwrap_in_result)]
21#[instrument(skip(zebra_db, cancel_receiver))]
22pub fn run(
23    initial_tip_height: Height,
24    zebra_db: &ZebraDb,
25    cancel_receiver: &Receiver<CancelFormatChange>,
26) -> Result<(), CancelFormatChange> {
27    if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
28        return Err(CancelFormatChange);
29    }
30
31    (0..=initial_tip_height.0)
32        .into_par_iter()
33        .try_for_each(|height| {
34            let height = Height(height);
35            let mut batch = DiskWriteBatch::new();
36            let mut should_index_at_height = false;
37
38            let transactions = zebra_db.transactions_by_location_range(
39                crate::TransactionLocation::from_index(height, 1)
40                    ..=crate::TransactionLocation::max_for_height(height),
41            );
42
43            for (tx_loc, tx) in transactions {
44                if tx.is_coinbase() {
45                    continue;
46                }
47
48                if !should_index_at_height {
49                    if let Some(spend) = tx
50                        .inputs()
51                        .iter()
52                        .filter_map(|input| Some(input.outpoint()?.into()))
53                        .chain(tx.sprout_nullifiers().cloned().map(Spend::from))
54                        .chain(tx.sapling_nullifiers().cloned().map(Spend::from))
55                        .chain(tx.orchard_nullifiers().cloned().map(Spend::from))
56                        .next()
57                    {
58                        if read::spending_transaction_hash::<Arc<Chain>>(None, zebra_db, spend)
59                            .is_some()
60                        {
61                            // Skip transactions in blocks with existing indexes
62                            return Ok(());
63                        } else {
64                            should_index_at_height = true
65                        }
66                    } else {
67                        continue;
68                    };
69                }
70
71                for input in tx.inputs() {
72                    if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
73                        return Err(CancelFormatChange);
74                    }
75
76                    let spent_outpoint = input
77                        .outpoint()
78                        .expect("should filter out coinbase transactions");
79
80                    let spent_output_location = zebra_db
81                        .output_location(&spent_outpoint)
82                        .expect("should have location for spent outpoint");
83
84                    let _ = zebra_db
85                        .tx_loc_by_spent_output_loc_cf()
86                        .with_batch_for_writing(&mut batch)
87                        .zs_insert(&spent_output_location, &tx_loc);
88                }
89
90                batch
91                    .prepare_nullifier_batch(zebra_db, &tx, tx_loc)
92                    .expect("method should never return an error");
93            }
94
95            if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
96                return Err(CancelFormatChange);
97            }
98
99            zebra_db
100                .write_batch(batch)
101                .expect("unexpected database write failure");
102
103            if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
104                return Err(CancelFormatChange);
105            }
106
107            Ok(())
108        })
109}