zebra_state/service/finalized_state/disk_format/upgrade/
block_info_and_address_received.rs1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use crossbeam_channel::TryRecvError;
7use itertools::Itertools;
8use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
9use zebra_chain::{
10 amount::{DeferredPoolBalanceChange, NonNegative},
11 block::{Block, Height},
12 block_info::BlockInfo,
13 parameters::subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver},
14 transparent::{self, OutPoint, Utxo},
15 value_balance::ValueBalance,
16};
17
18use crate::{
19 service::finalized_state::{
20 disk_format::transparent::{AddressBalanceLocationChange, AddressLocation},
21 MAX_ON_DISK_HEIGHT,
22 },
23 DiskWriteBatch, HashOrHeight, TransactionLocation, WriteDisk,
24};
25
26use super::{CancelFormatChange, DiskFormatUpgrade};
27
28pub struct Upgrade;
31
32enum LoadResult {
36 HasInfo(ValueBalance<NonNegative>),
37 LoadedInfo {
38 block: Arc<Block>,
39 size: usize,
40 utxos: HashMap<OutPoint, Utxo>,
41 address_balance_changes: HashMap<transparent::Address, AddressBalanceLocationChange>,
42 },
43}
44
45impl DiskFormatUpgrade for Upgrade {
46 fn version(&self) -> semver::Version {
47 semver::Version::new(27, 0, 0)
48 }
49
50 fn description(&self) -> &'static str {
51 "add block info and address received balances upgrade"
52 }
53
54 #[allow(clippy::unwrap_in_result)]
55 fn run(
56 &self,
57 initial_tip_height: zebra_chain::block::Height,
58 db: &crate::ZebraDb,
59 cancel_receiver: &crossbeam_channel::Receiver<super::CancelFormatChange>,
60 ) -> Result<(), super::CancelFormatChange> {
61 let network = db.network();
62 let balance_by_transparent_addr = db.address_balance_cf();
63 let chunk_size = rayon::current_num_threads();
64 tracing::info!(chunk_size = ?chunk_size, "adding block info data");
65
66 let chunks = (0..=initial_tip_height.0).chunks(chunk_size);
67 let seq_iter = chunks.into_iter().flat_map(|height_span| {
70 let height_vec = height_span.collect_vec();
71 let result_vec = height_vec
72 .into_par_iter()
73 .map(|h| {
74 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
76 return Err(super::CancelFormatChange);
77 }
78
79 let height = Height(h);
80
81 if let Some(existing_block_info) = db.block_info_cf().zs_get(&height) {
85 let value_pool = *existing_block_info.value_pools();
86 return Ok((h, LoadResult::HasInfo(value_pool)));
87 }
88
89 let (block, size) = db
92 .block_and_size(HashOrHeight::Height(height))
93 .expect("block info should be in the database");
94
95 let mut utxos = HashMap::new();
99 let mut address_balance_changes = HashMap::new();
100 for tx in &block.transactions {
101 for input in tx.inputs() {
102 if let Some(outpoint) = input.outpoint() {
103 let (tx, h, _) = db
104 .transaction(outpoint.hash)
105 .expect("transaction should be in the database");
106 let output = tx
107 .outputs()
108 .get(outpoint.index as usize)
109 .expect("output should exist");
110
111 let utxo = Utxo {
112 output: output.clone(),
113 height: h,
114 from_coinbase: tx.is_coinbase(),
115 };
116 utxos.insert(outpoint, utxo);
117 }
118 }
119
120 for output in tx.outputs() {
121 if let Some(address) = output.address(&network) {
122 *address_balance_changes
130 .entry(address)
131 .or_insert_with(AddressBalanceLocationChange::empty)
132 .received_mut() += u64::from(output.value());
133 }
134 }
135 }
136
137 Ok((
138 h,
139 LoadResult::LoadedInfo {
140 block,
141 size,
142 utxos,
143 address_balance_changes,
144 },
145 ))
146 })
147 .collect::<Vec<_>>();
148 result_vec
152 });
153
154 let mut value_pool = ValueBalance::<NonNegative>::default();
156
157 for result in seq_iter {
158 let (h, load_result) = result?;
159 let height = Height(h);
160 if height.0.is_multiple_of(1000) {
161 tracing::info!(height = ?height, "adding block info for height");
162 }
163 let (block, size, utxos, address_balance_changes) = match load_result {
165 LoadResult::HasInfo(prev_value_pool) => {
166 value_pool = prev_value_pool;
169 continue;
170 }
171 LoadResult::LoadedInfo {
172 block,
173 size,
174 utxos,
175 address_balance_changes,
176 } => (block, size, utxos, address_balance_changes),
177 };
178
179 let deferred_pool_balance_change = if height > network.slow_start_interval() {
181 let deferred_pool_balance_change = funding_stream_values(
183 height,
184 &network,
185 block_subsidy(height, &network).unwrap_or_default(),
186 )
187 .expect("should have valid funding stream values")
188 .remove(&FundingStreamReceiver::Deferred)
189 .unwrap_or_default()
190 .checked_sub(network.lockbox_disbursement_total_amount(height));
191
192 Some(
193 deferred_pool_balance_change
194 .expect("deferred pool balance change should be valid Amount"),
195 )
196 } else {
197 None
198 };
199
200 value_pool = value_pool
202 .add_chain_value_pool_change(
203 block
204 .chain_value_pool_change(
205 &utxos,
206 deferred_pool_balance_change.map(DeferredPoolBalanceChange::new),
207 )
208 .unwrap_or_default(),
209 )
210 .expect("value pool change should not overflow");
211
212 let mut batch = DiskWriteBatch::new();
213
214 let block_info = BlockInfo::new(value_pool, size as u32);
216 let _ = db
217 .block_info_cf()
218 .with_batch_for_writing(&mut batch)
219 .zs_insert(&height, &block_info);
220
221 for (address, change) in address_balance_changes {
223 batch.zs_merge(balance_by_transparent_addr, address, change);
226 }
227
228 db.write_batch(batch)
229 .expect("writing block info and address received changes should succeed");
230 }
231
232 Ok(())
233 }
234
235 #[allow(clippy::unwrap_in_result)]
236 fn validate(
237 &self,
238 db: &crate::ZebraDb,
239 cancel_receiver: &crossbeam_channel::Receiver<super::CancelFormatChange>,
240 ) -> Result<Result<(), String>, super::CancelFormatChange> {
241 let network = db.network();
242
243 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
245 return Err(super::CancelFormatChange);
246 }
247
248 let Some(tip_height) = db.finalized_tip_height() else {
250 return Ok(Ok(()));
251 };
252
253 let start_height = (tip_height - 1_000).unwrap_or(Height::MIN);
255
256 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
257 return Err(CancelFormatChange);
258 }
259
260 for height in start_height.0..=tip_height.0 {
263 if let Some(block_info) = db.block_info_cf().zs_get(&Height(height)) {
264 if block_info == Default::default() {
265 return Ok(Err(format!("zero block info for height: {height}")));
266 }
267 } else {
268 return Ok(Err(format!("missing block info for height: {height}")));
269 }
270 }
271
272 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
273 return Err(CancelFormatChange);
274 }
275
276 let tx_loc_range = TransactionLocation::min_for_height(start_height)..;
280 let addresses: HashSet<_> = db
281 .transactions_by_location_range(tx_loc_range)
282 .flat_map(|(_, tx)| tx.outputs().to_vec())
283 .filter_map(|output| {
284 if output.value != 0 {
285 output.address(&network)
286 } else {
287 None
288 }
289 })
290 .collect();
291
292 for address in addresses {
294 if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) {
295 return Err(CancelFormatChange);
296 }
297
298 let balance = db
299 .address_balance_location(&address)
300 .expect("should have address balances in finalized state");
301
302 if balance.received() == 0 {
303 return Ok(Err(format!(
304 "unexpected balance received for address {}: {}",
305 address,
306 balance.received(),
307 )));
308 }
309 }
310
311 Ok(Ok(()))
312 }
313}
314
315impl AddressBalanceLocationChange {
316 fn empty() -> Self {
320 Self::new(AddressLocation::from_output_index(
321 TransactionLocation::from_index(MAX_ON_DISK_HEIGHT, u16::MAX),
322 u32::MAX,
323 ))
324 }
325}