1use std::{
13 collections::{BTreeMap, HashMap, HashSet},
14 ops::RangeBounds,
15 sync::Arc,
16};
17
18use chrono::{DateTime, Utc};
19use itertools::Itertools;
20
21use zebra_chain::{
22 amount::NonNegative,
23 block::{self, Block, Height},
24 orchard,
25 parallel::tree::NoteCommitmentTrees,
26 parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
27 sapling,
28 serialization::{CompactSizeMessage, TrustedPreallocate, ZcashSerialize as _},
29 transaction::{self, Transaction},
30 transparent,
31 value_balance::ValueBalance,
32};
33
34use crate::{
35 request::FinalizedBlock,
36 service::finalized_state::{
37 disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk},
38 disk_format::{
39 block::TransactionLocation,
40 transparent::{AddressBalanceLocationChange, OutputLocation},
41 },
42 zebra_db::{metrics::block_precommit_metrics, ZebraDb},
43 FromDisk, RawBytes,
44 },
45 BoxError, HashOrHeight,
46};
47
48#[cfg(feature = "indexer")]
49use crate::request::Spend;
50
51#[cfg(test)]
52mod tests;
53
54impl ZebraDb {
55 pub fn is_empty(&self) -> bool {
61 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
62 self.db.zs_is_empty(&hash_by_height)
63 }
64
65 #[allow(clippy::unwrap_in_result)]
70 pub fn tip(&self) -> Option<(block::Height, block::Hash)> {
71 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
72 self.db.zs_last_key_value(&hash_by_height)
73 }
74
75 #[allow(clippy::unwrap_in_result)]
77 pub fn contains_height(&self, height: block::Height) -> bool {
78 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
79
80 self.db.zs_contains(&hash_by_height, &height)
81 }
82
83 #[allow(clippy::unwrap_in_result)]
85 pub fn hash(&self, height: block::Height) -> Option<block::Hash> {
86 let hash_by_height = self.db.cf_handle("hash_by_height").unwrap();
87 self.db.zs_get(&hash_by_height, &height)
88 }
89
90 #[allow(clippy::unwrap_in_result)]
92 pub fn contains_hash(&self, hash: block::Hash) -> bool {
93 let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
94
95 self.db.zs_contains(&height_by_hash, &hash)
96 }
97
98 #[allow(clippy::unwrap_in_result)]
100 pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
101 let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
102 self.db.zs_get(&height_by_hash, &hash)
103 }
104
105 #[allow(dead_code)]
107 pub fn prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
108 let height = self.height(hash)?;
109 let prev_height = height.previous().ok()?;
110
111 self.hash(prev_height)
112 }
113
114 #[allow(dead_code)]
116 pub fn prev_block_height_for_hash(&self, hash: block::Hash) -> Option<block::Height> {
117 let height = self.height(hash)?;
118
119 height.previous().ok()
120 }
121
122 #[allow(clippy::unwrap_in_result)]
127 pub fn block_header(&self, hash_or_height: HashOrHeight) -> Option<Arc<block::Header>> {
128 let block_header_by_height = self.db.cf_handle("block_header_by_height").unwrap();
130
131 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
132 let header = self.db.zs_get(&block_header_by_height, &height)?;
133
134 Some(header)
135 }
136
137 #[allow(clippy::unwrap_in_result)]
140 fn raw_block_header(&self, hash_or_height: HashOrHeight) -> Option<RawBytes> {
141 let block_header_by_height = self.db.cf_handle("block_header_by_height").unwrap();
143
144 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
145 let header: RawBytes = self.db.zs_get(&block_header_by_height, &height)?;
146
147 Some(header)
148 }
149
150 #[allow(clippy::unwrap_in_result)]
155 pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
156 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
158 let header = self.block_header(height.into())?;
159
160 let transactions = self
168 .transactions_by_height(height)
169 .map(|(_, tx)| tx)
170 .map(Arc::new)
171 .collect();
172
173 Some(Arc::new(Block {
174 header,
175 transactions,
176 }))
177 }
178
179 #[allow(clippy::unwrap_in_result)]
182 pub fn block_and_size(&self, hash_or_height: HashOrHeight) -> Option<(Arc<Block>, usize)> {
183 let (raw_header, raw_txs) = self.raw_block(hash_or_height)?;
184
185 let header = Arc::<block::Header>::from_bytes(raw_header.raw_bytes());
186 let txs: Vec<_> = raw_txs
187 .iter()
188 .map(|raw_tx| Arc::<Transaction>::from_bytes(raw_tx.raw_bytes()))
189 .collect();
190
191 let tx_count = CompactSizeMessage::try_from(txs.len())
196 .expect("must work for a previously serialized block");
197 let tx_raw = tx_count
198 .zcash_serialize_to_vec()
199 .expect("must work for a previously serialized block");
200 let size = raw_header.raw_bytes().len()
201 + raw_txs
202 .iter()
203 .map(|raw_tx| raw_tx.raw_bytes().len())
204 .sum::<usize>()
205 + tx_raw.len();
206
207 let block = Block {
208 header,
209 transactions: txs,
210 };
211 Some((Arc::new(block), size))
212 }
213
214 #[allow(clippy::unwrap_in_result)]
217 fn raw_block(&self, hash_or_height: HashOrHeight) -> Option<(RawBytes, Vec<RawBytes>)> {
218 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
220 let header = self.raw_block_header(height.into())?;
221
222 let transactions = self
225 .raw_transactions_by_height(height)
226 .map(|(_, tx)| tx)
227 .collect();
228
229 Some((header, transactions))
230 }
231
232 #[allow(clippy::unwrap_in_result)]
235 pub fn sapling_tree_by_hash_or_height(
236 &self,
237 hash_or_height: HashOrHeight,
238 ) -> Option<Arc<sapling::tree::NoteCommitmentTree>> {
239 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
240
241 self.sapling_tree_by_height(&height)
242 }
243
244 #[allow(clippy::unwrap_in_result)]
247 pub fn orchard_tree_by_hash_or_height(
248 &self,
249 hash_or_height: HashOrHeight,
250 ) -> Option<Arc<orchard::tree::NoteCommitmentTree>> {
251 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
252
253 self.orchard_tree_by_height(&height)
254 }
255
256 pub fn finalized_tip_hash(&self) -> block::Hash {
260 self.tip()
261 .map(|(_, hash)| hash)
262 .unwrap_or(GENESIS_PREVIOUS_BLOCK_HASH)
264 }
265
266 pub fn finalized_tip_height(&self) -> Option<block::Height> {
268 self.tip().map(|(height, _)| height)
269 }
270
271 pub fn tip_block(&self) -> Option<Arc<Block>> {
273 let (height, _hash) = self.tip()?;
274 self.block(height.into())
275 }
276
277 #[allow(clippy::unwrap_in_result)]
282 pub fn transaction(
283 &self,
284 hash: transaction::Hash,
285 ) -> Option<(Arc<Transaction>, Height, DateTime<Utc>)> {
286 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
287
288 let transaction_location = self.transaction_location(hash)?;
289
290 let block_time = self
291 .block_header(transaction_location.height.into())
292 .map(|header| header.time);
293
294 self.db
295 .zs_get(&tx_by_loc, &transaction_location)
296 .and_then(|tx| block_time.map(|time| (tx, transaction_location.height, time)))
297 }
298
299 #[allow(clippy::unwrap_in_result)]
301 pub fn transactions_by_height(
302 &self,
303 height: Height,
304 ) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_ {
305 self.transactions_by_location_range(
306 TransactionLocation::min_for_height(height)
307 ..=TransactionLocation::max_for_height(height),
308 )
309 }
310
311 #[allow(clippy::unwrap_in_result)]
314 fn raw_transactions_by_height(
315 &self,
316 height: Height,
317 ) -> impl Iterator<Item = (TransactionLocation, RawBytes)> + '_ {
318 self.raw_transactions_by_location_range(
319 TransactionLocation::min_for_height(height)
320 ..=TransactionLocation::max_for_height(height),
321 )
322 }
323
324 #[allow(clippy::unwrap_in_result)]
327 pub fn transactions_by_location_range<R>(
328 &self,
329 range: R,
330 ) -> impl Iterator<Item = (TransactionLocation, Transaction)> + '_
331 where
332 R: RangeBounds<TransactionLocation>,
333 {
334 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
335 self.db.zs_forward_range_iter(tx_by_loc, range)
336 }
337
338 #[allow(clippy::unwrap_in_result)]
341 pub fn raw_transactions_by_location_range<R>(
342 &self,
343 range: R,
344 ) -> impl Iterator<Item = (TransactionLocation, RawBytes)> + '_
345 where
346 R: RangeBounds<TransactionLocation>,
347 {
348 let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap();
349 self.db.zs_forward_range_iter(tx_by_loc, range)
350 }
351
352 #[allow(clippy::unwrap_in_result)]
355 pub fn transaction_location(&self, hash: transaction::Hash) -> Option<TransactionLocation> {
356 let tx_loc_by_hash = self.db.cf_handle("tx_loc_by_hash").unwrap();
357 self.db.zs_get(&tx_loc_by_hash, &hash)
358 }
359
360 #[allow(clippy::unwrap_in_result)]
363 #[allow(dead_code)]
364 pub fn transaction_hash(&self, location: TransactionLocation) -> Option<transaction::Hash> {
365 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
366 self.db.zs_get(&hash_by_tx_loc, &location)
367 }
368
369 #[cfg(feature = "indexer")]
372 pub fn spending_transaction_hash(&self, spend: &Spend) -> Option<transaction::Hash> {
373 let tx_loc = match spend {
374 Spend::OutPoint(outpoint) => self.spending_tx_loc(outpoint)?,
375 Spend::Sprout(nullifier) => self.sprout_revealing_tx_loc(nullifier)?,
376 Spend::Sapling(nullifier) => self.sapling_revealing_tx_loc(nullifier)?,
377 Spend::Orchard(nullifier) => self.orchard_revealing_tx_loc(nullifier)?,
378 };
379
380 self.transaction_hash(tx_loc)
381 }
382
383 #[allow(clippy::unwrap_in_result)]
390 pub fn transaction_hashes_for_block(
391 &self,
392 hash_or_height: HashOrHeight,
393 ) -> Option<Arc<[transaction::Hash]>> {
394 let height = hash_or_height.height_or_else(|hash| self.height(hash))?;
396
397 let hash_by_tx_loc = self.db.cf_handle("hash_by_tx_loc").unwrap();
399
400 let mut transaction_hashes = Vec::new();
402
403 for tx_index in 0..=Transaction::max_allocation() {
404 let tx_loc = TransactionLocation::from_u64(height, tx_index);
405
406 if let Some(tx_hash) = self.db.zs_get(&hash_by_tx_loc, &tx_loc) {
407 transaction_hashes.push(tx_hash);
408 } else {
409 break;
410 }
411 }
412
413 Some(transaction_hashes.into())
414 }
415
416 #[allow(clippy::unwrap_in_result)]
430 pub(in super::super) fn write_block(
431 &mut self,
432 finalized: FinalizedBlock,
433 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
434 network: &Network,
435 source: &str,
436 ) -> Result<block::Hash, BoxError> {
437 let tx_hash_indexes: HashMap<transaction::Hash, usize> = finalized
438 .transaction_hashes
439 .iter()
440 .enumerate()
441 .map(|(index, hash)| (*hash, index))
442 .collect();
443
444 let new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo> = finalized
450 .new_outputs
451 .iter()
452 .map(|(outpoint, ordered_utxo)| {
453 (
454 lookup_out_loc(finalized.height, outpoint, &tx_hash_indexes),
455 ordered_utxo.utxo.clone(),
456 )
457 })
458 .collect();
459
460 let spent_utxos: Vec<(transparent::OutPoint, OutputLocation, transparent::Utxo)> =
462 finalized
463 .block
464 .transactions
465 .iter()
466 .flat_map(|tx| tx.inputs().iter())
467 .flat_map(|input| input.outpoint())
468 .map(|outpoint| {
469 (
470 outpoint,
471 self.output_location(&outpoint).unwrap_or_else(|| {
474 lookup_out_loc(finalized.height, &outpoint, &tx_hash_indexes)
475 }),
476 self.utxo(&outpoint)
477 .map(|ordered_utxo| ordered_utxo.utxo)
478 .or_else(|| {
479 finalized
480 .new_outputs
481 .get(&outpoint)
482 .map(|ordered_utxo| ordered_utxo.utxo.clone())
483 })
484 .expect("already checked UTXO was in state or block"),
485 )
486 })
487 .collect();
488
489 let spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo> =
490 spent_utxos
491 .iter()
492 .map(|(outpoint, _output_loc, utxo)| (*outpoint, utxo.clone()))
493 .collect();
494
495 #[cfg(feature = "indexer")]
497 let out_loc_by_outpoint: HashMap<transparent::OutPoint, OutputLocation> = spent_utxos
498 .iter()
499 .map(|(outpoint, out_loc, _utxo)| (*outpoint, *out_loc))
500 .collect();
501 let spent_utxos_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo> = spent_utxos
502 .into_iter()
503 .map(|(_outpoint, out_loc, utxo)| (out_loc, utxo))
504 .collect();
505
506 let changed_addresses: HashSet<transparent::Address> = spent_utxos_by_out_loc
508 .values()
509 .chain(
510 finalized
511 .new_outputs
512 .values()
513 .map(|ordered_utxo| &ordered_utxo.utxo),
514 )
515 .filter_map(|utxo| utxo.output.address(network))
516 .unique()
517 .collect();
518
519 let address_balances: HashMap<transparent::Address, AddressBalanceLocationChange> =
521 changed_addresses
522 .into_iter()
523 .filter_map(|address| {
524 let addr_loc = self.address_balance_location(&address)?.into_new_change();
529 Some((address.clone(), addr_loc))
530 })
531 .collect();
532
533 let mut batch = DiskWriteBatch::new();
534
535 batch.prepare_block_batch(
537 self,
538 network,
539 &finalized,
540 new_outputs_by_out_loc,
541 spent_utxos_by_outpoint,
542 spent_utxos_by_out_loc,
543 #[cfg(feature = "indexer")]
544 out_loc_by_outpoint,
545 address_balances,
546 self.finalized_value_pool(),
547 prev_note_commitment_trees,
548 )?;
549
550 self.db.write(batch)?;
551
552 tracing::trace!(?source, "committed block from");
553
554 Ok(finalized.hash)
555 }
556
557 pub fn write_batch(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
559 self.db.write(batch)
560 }
561}
562
563fn lookup_out_loc(
567 height: Height,
568 outpoint: &transparent::OutPoint,
569 tx_hash_indexes: &HashMap<transaction::Hash, usize>,
570) -> OutputLocation {
571 let tx_index = tx_hash_indexes
572 .get(&outpoint.hash)
573 .expect("already checked UTXO was in state or block");
574
575 let tx_loc = TransactionLocation::from_usize(height, *tx_index);
576
577 OutputLocation::from_outpoint(tx_loc, outpoint)
578}
579
580impl DiskWriteBatch {
581 #[allow(clippy::too_many_arguments)]
593 pub fn prepare_block_batch(
594 &mut self,
595 zebra_db: &ZebraDb,
596 network: &Network,
597 finalized: &FinalizedBlock,
598 new_outputs_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
599 spent_utxos_by_outpoint: HashMap<transparent::OutPoint, transparent::Utxo>,
600 spent_utxos_by_out_loc: BTreeMap<OutputLocation, transparent::Utxo>,
601 #[cfg(feature = "indexer")] out_loc_by_outpoint: HashMap<
602 transparent::OutPoint,
603 OutputLocation,
604 >,
605 address_balances: HashMap<transparent::Address, AddressBalanceLocationChange>,
606 value_pool: ValueBalance<NonNegative>,
607 prev_note_commitment_trees: Option<NoteCommitmentTrees>,
608 ) -> Result<(), BoxError> {
609 let db = &zebra_db.db;
610
611 self.prepare_block_header_and_transaction_data_batch(db, finalized)?;
613
614 self.prepare_shielded_transaction_batch(zebra_db, finalized)?;
621 self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees)?;
622
623 if !finalized.height.is_min() {
634 self.prepare_transparent_transaction_batch(
636 zebra_db,
637 network,
638 finalized,
639 &new_outputs_by_out_loc,
640 &spent_utxos_by_outpoint,
641 &spent_utxos_by_out_loc,
642 #[cfg(feature = "indexer")]
643 &out_loc_by_outpoint,
644 address_balances,
645 )?;
646 }
647 self.prepare_chain_value_pools_batch(
649 zebra_db,
650 finalized,
651 spent_utxos_by_outpoint,
652 value_pool,
653 )?;
654
655 block_precommit_metrics(&finalized.block, finalized.hash, finalized.height);
657
658 Ok(())
659 }
660
661 #[allow(clippy::unwrap_in_result)]
668 pub fn prepare_block_header_and_transaction_data_batch(
669 &mut self,
670 db: &DiskDb,
671 finalized: &FinalizedBlock,
672 ) -> Result<(), BoxError> {
673 let block_header_by_height = db.cf_handle("block_header_by_height").unwrap();
675 let hash_by_height = db.cf_handle("hash_by_height").unwrap();
676 let height_by_hash = db.cf_handle("height_by_hash").unwrap();
677
678 let tx_by_loc = db.cf_handle("tx_by_loc").unwrap();
680 let hash_by_tx_loc = db.cf_handle("hash_by_tx_loc").unwrap();
681 let tx_loc_by_hash = db.cf_handle("tx_loc_by_hash").unwrap();
682
683 let FinalizedBlock {
684 block,
685 hash,
686 height,
687 transaction_hashes,
688 ..
689 } = finalized;
690
691 self.zs_insert(&block_header_by_height, height, &block.header);
693
694 self.zs_insert(&hash_by_height, height, hash);
696 self.zs_insert(&height_by_hash, hash, height);
697
698 for (transaction_index, (transaction, transaction_hash)) in block
699 .transactions
700 .iter()
701 .zip(transaction_hashes.iter())
702 .enumerate()
703 {
704 let transaction_location = TransactionLocation::from_usize(*height, transaction_index);
705
706 self.zs_insert(&tx_by_loc, transaction_location, transaction);
708
709 self.zs_insert(&hash_by_tx_loc, transaction_location, transaction_hash);
711 self.zs_insert(&tx_loc_by_hash, transaction_hash, transaction_location);
712 }
713
714 Ok(())
715 }
716}