zebra_state/service/finalized_state/disk_format/upgrade/
block_info_and_address_received.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use crossbeam_channel::TryRecvError;
7use itertools::Itertools;
8use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
9use zebra_chain::{
10    amount::{DeferredPoolBalanceChange, NonNegative},
11    block::{Block, Height},
12    block_info::BlockInfo,
13    parameters::subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver},
14    transparent::{self, OutPoint, Utxo},
15    value_balance::ValueBalance,
16};
17
18use crate::{
19    service::finalized_state::{
20        disk_format::transparent::{AddressBalanceLocationChange, AddressLocation},
21        MAX_ON_DISK_HEIGHT,
22    },
23    DiskWriteBatch, HashOrHeight, TransactionLocation, WriteDisk,
24};
25
26use super::{CancelFormatChange, DiskFormatUpgrade};
27
28/// Implements [`DiskFormatUpgrade`] for adding additionl block info to the
29/// database.
30pub struct Upgrade;
31
32/// The result of loading data to create a [`BlockInfo`]. If the info was
33/// already there we only need to ValueBalance to keep track of the totals.
34/// Otherwise we need the block, size and utxos to compute the BlockInfo.
35enum LoadResult {
36    HasInfo(ValueBalance<NonNegative>),
37    LoadedInfo {
38        block: Arc<Block>,
39        size: usize,
40        utxos: HashMap<OutPoint, Utxo>,
41        address_balance_changes: HashMap<transparent::Address, AddressBalanceLocationChange>,
42    },
43}
44
45impl DiskFormatUpgrade for Upgrade {
46    fn version(&self) -> semver::Version {
47        semver::Version::new(27, 0, 0)
48    }
49
50    fn description(&self) -> &'static str {
51        "add block info and address received balances upgrade"
52    }
53
54    #[allow(clippy::unwrap_in_result)]
55    fn run(
56        &self,
57        initial_tip_height: zebra_chain::block::Height,
58        db: &crate::ZebraDb,
59        cancel_receiver: &crossbeam_channel::Receiver<super::CancelFormatChange>,
60    ) -> Result<(), super::CancelFormatChange> {
61        let network = db.network();
62        let balance_by_transparent_addr = db.address_balance_cf();
63        let chunk_size = rayon::current_num_threads();
64        tracing::info!(chunk_size = ?chunk_size, "adding block info data");
65
66        let chunks = (0..=initial_tip_height.0).chunks(chunk_size);
67        // Since transaction parsing is slow, we want to parallelize it.
68        // Get chunks of block heights and load them in parallel.
69        let seq_iter = chunks.into_iter().flat_map(|height_span| {
70            let height_vec = height_span.collect_vec();
71            let result_vec = height_vec
72                .into_par_iter()
73                .map(|h| {
74                    // Return early if the upgrade is cancelled.
75                    if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
76                        return Err(super::CancelFormatChange);
77                    }
78
79                    let height = Height(h);
80
81                    // The upgrade might have been interrupted and some heights might
82                    // have already been filled. Return a value indicating that
83                    // along with the loaded value pool.
84                    if let Some(existing_block_info) = db.block_info_cf().zs_get(&height) {
85                        let value_pool = *existing_block_info.value_pools();
86                        return Ok((h, LoadResult::HasInfo(value_pool)));
87                    }
88
89                    // Load the block. This is slow since transaction
90                    // parsing is slow.
91                    let (block, size) = db
92                        .block_and_size(HashOrHeight::Height(height))
93                        .expect("block info should be in the database");
94
95                    // Load the utxos for all the transactions inputs in the block.
96                    // This is required to compute the value pool change.
97                    // This is slow because transaction parsing is slow.
98                    let mut utxos = HashMap::new();
99                    let mut address_balance_changes = HashMap::new();
100                    for tx in &block.transactions {
101                        for input in tx.inputs() {
102                            if let Some(outpoint) = input.outpoint() {
103                                let (tx, h, _) = db
104                                    .transaction(outpoint.hash)
105                                    .expect("transaction should be in the database");
106                                let output = tx
107                                    .outputs()
108                                    .get(outpoint.index as usize)
109                                    .expect("output should exist");
110
111                                let utxo = Utxo {
112                                    output: output.clone(),
113                                    height: h,
114                                    from_coinbase: tx.is_coinbase(),
115                                };
116                                utxos.insert(outpoint, utxo);
117                            }
118                        }
119
120                        for output in tx.outputs() {
121                            if let Some(address) = output.address(&network) {
122                                // Note: using `empty()` will set the location
123                                // to a dummy value. This only works because the
124                                // addition operator for
125                                // `AddressBalanceLocationChange` (which reuses
126                                // the `AddressBalanceLocationInner` addition
127                                // operator) will ignore these dummy values when
128                                // adding balances during the merge operator.
129                                *address_balance_changes
130                                    .entry(address)
131                                    .or_insert_with(AddressBalanceLocationChange::empty)
132                                    .received_mut() += u64::from(output.value());
133                            }
134                        }
135                    }
136
137                    Ok((
138                        h,
139                        LoadResult::LoadedInfo {
140                            block,
141                            size,
142                            utxos,
143                            address_balance_changes,
144                        },
145                    ))
146                })
147                .collect::<Vec<_>>();
148            // The collected Vec is in-order as required as guaranteed by Rayon.
149            // Note that since we use flat_map() above, the result iterator will
150            // iterate through individual results as expected.
151            result_vec
152        });
153
154        // Keep track of the current value pool as we iterate the blocks.
155        let mut value_pool = ValueBalance::<NonNegative>::default();
156
157        for result in seq_iter {
158            let (h, load_result) = result?;
159            let height = Height(h);
160            if height.0.is_multiple_of(1000) {
161                tracing::info!(height = ?height, "adding block info for height");
162            }
163            // Get the data loaded from the parallel iterator
164            let (block, size, utxos, address_balance_changes) = match load_result {
165                LoadResult::HasInfo(prev_value_pool) => {
166                    // BlockInfo already stored; we just need the its value pool
167                    // then skip the block
168                    value_pool = prev_value_pool;
169                    continue;
170                }
171                LoadResult::LoadedInfo {
172                    block,
173                    size,
174                    utxos,
175                    address_balance_changes,
176                } => (block, size, utxos, address_balance_changes),
177            };
178
179            // Get the deferred amount which is required to update the value pool.
180            let deferred_pool_balance_change = if height > network.slow_start_interval() {
181                // See [ZIP-1015](https://zips.z.cash/zip-1015).
182                let deferred_pool_balance_change = funding_stream_values(
183                    height,
184                    &network,
185                    block_subsidy(height, &network).unwrap_or_default(),
186                )
187                .expect("should have valid funding stream values")
188                .remove(&FundingStreamReceiver::Deferred)
189                .unwrap_or_default()
190                .checked_sub(network.lockbox_disbursement_total_amount(height));
191
192                Some(
193                    deferred_pool_balance_change
194                        .expect("deferred pool balance change should be valid Amount"),
195                )
196            } else {
197                None
198            };
199
200            // Add this block's value pool changes to the total value pool.
201            value_pool = value_pool
202                .add_chain_value_pool_change(
203                    block
204                        .chain_value_pool_change(
205                            &utxos,
206                            deferred_pool_balance_change.map(DeferredPoolBalanceChange::new),
207                        )
208                        .unwrap_or_default(),
209                )
210                .expect("value pool change should not overflow");
211
212            let mut batch = DiskWriteBatch::new();
213
214            // Create and store the BlockInfo for this block.
215            let block_info = BlockInfo::new(value_pool, size as u32);
216            let _ = db
217                .block_info_cf()
218                .with_batch_for_writing(&mut batch)
219                .zs_insert(&height, &block_info);
220
221            // Update transparent addresses that received funds in this block.
222            for (address, change) in address_balance_changes {
223                // Note that the logic of the merge operator is set up by
224                // calling `set_merge_operator_associative()` in `DiskDb`.
225                batch.zs_merge(balance_by_transparent_addr, address, change);
226            }
227
228            db.write_batch(batch)
229                .expect("writing block info and address received changes should succeed");
230        }
231
232        Ok(())
233    }
234
235    #[allow(clippy::unwrap_in_result)]
236    fn validate(
237        &self,
238        db: &crate::ZebraDb,
239        cancel_receiver: &crossbeam_channel::Receiver<super::CancelFormatChange>,
240    ) -> Result<Result<(), String>, super::CancelFormatChange> {
241        let network = db.network();
242
243        // Return early before the next disk read if the upgrade was cancelled.
244        if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
245            return Err(super::CancelFormatChange);
246        }
247
248        // Read the finalized tip height or return early if the database is empty.
249        let Some(tip_height) = db.finalized_tip_height() else {
250            return Ok(Ok(()));
251        };
252
253        // Check any outputs in the last 1000 blocks.
254        let start_height = (tip_height - 1_000).unwrap_or(Height::MIN);
255
256        if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
257            return Err(CancelFormatChange);
258        }
259
260        // Check that all blocks in the range have a BlockInfo.
261
262        for height in start_height.0..=tip_height.0 {
263            if let Some(block_info) = db.block_info_cf().zs_get(&Height(height)) {
264                if block_info == Default::default() {
265                    return Ok(Err(format!("zero block info for height: {height}")));
266                }
267            } else {
268                return Ok(Err(format!("missing block info for height: {height}")));
269            }
270        }
271
272        if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
273            return Err(CancelFormatChange);
274        }
275
276        // Check that all recipient addresses of transparent transfers in the range have a non-zero received balance.
277
278        // Collect the set of addresses that received transparent funds in the last query range (last 1000 blocks).
279        let tx_loc_range = TransactionLocation::min_for_height(start_height)..;
280        let addresses: HashSet<_> = db
281            .transactions_by_location_range(tx_loc_range)
282            .flat_map(|(_, tx)| tx.outputs().to_vec())
283            .filter_map(|output| {
284                if output.value != 0 {
285                    output.address(&network)
286                } else {
287                    None
288                }
289            })
290            .collect();
291
292        // Check that no address balances for that set of addresses have a received field of `0`.
293        for address in addresses {
294            if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
295                return Err(CancelFormatChange);
296            }
297
298            let balance = db
299                .address_balance_location(&address)
300                .expect("should have address balances in finalized state");
301
302            if balance.received() == 0 {
303                return Ok(Err(format!(
304                    "unexpected balance received for address {}: {}",
305                    address,
306                    balance.received(),
307                )));
308            }
309        }
310
311        Ok(Ok(()))
312    }
313}
314
315impl AddressBalanceLocationChange {
316    /// Creates a new [`AddressBalanceLocationChange`] with all zero values and
317    /// a dummy (all one bits) location. See `AddressBalanceLocationInner::add()`
318    /// for the rationale for using this dummy value.
319    fn empty() -> Self {
320        Self::new(AddressLocation::from_output_index(
321            TransactionLocation::from_index(MAX_ON_DISK_HEIGHT, u16::MAX),
322            u32::MAX,
323        ))
324    }
325}