1//! Queued blocks that are awaiting their parent block for verification.
23use std::{
4 collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque},
5 iter, mem,
6};
78use tokio::sync::oneshot;
9use tracing::instrument;
1011use zebra_chain::{block, transparent};
1213use crate::{BoxError, CheckpointVerifiedBlock, SemanticallyVerifiedBlock};
1415#[cfg(test)]
16mod tests;
1718/// A queued checkpoint verified block, and its corresponding [`Result`] channel.
19pub type QueuedCheckpointVerified = (
20 CheckpointVerifiedBlock,
21 oneshot::Sender<Result<block::Hash, BoxError>>,
22);
2324/// A queued semantically verified block, and its corresponding [`Result`] channel.
25pub type QueuedSemanticallyVerified = (
26 SemanticallyVerifiedBlock,
27 oneshot::Sender<Result<block::Hash, BoxError>>,
28);
2930/// A queue of blocks, awaiting the arrival of parent blocks.
31#[derive(Debug, Default)]
32pub struct QueuedBlocks {
33/// Blocks awaiting their parent blocks for contextual verification.
34blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
35/// Hashes from `queued_blocks`, indexed by parent hash.
36by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
37/// Hashes from `queued_blocks`, indexed by block height.
38by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
39/// Known UTXOs.
40known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
41}
4243impl QueuedBlocks {
44/// Queue a block for eventual verification and commit.
45 ///
46 /// # Panics
47 ///
48 /// - if a block with the same `block::Hash` has already been queued.
49#[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
50pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
51let new_hash = new.0.hash;
52let new_height = new.0.height;
53let parent_hash = new.0.block.header.previous_block_hash;
5455if self.blocks.contains_key(&new_hash) {
56// Skip queueing the block and return early if the hash is not unique
57return;
58 }
5960// Track known UTXOs in queued blocks.
61for (outpoint, ordered_utxo) in new.0.new_outputs.iter() {
62self.known_utxos
63 .insert(*outpoint, ordered_utxo.utxo.clone());
64 }
6566self.blocks.insert(new_hash, new);
67self.by_height
68 .entry(new_height)
69 .or_default()
70 .insert(new_hash);
71self.by_parent
72 .entry(parent_hash)
73 .or_default()
74 .insert(new_hash);
7576tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block");
77self.update_metrics();
78 }
7980/// Returns `true` if there are any queued children of `parent_hash`.
81#[instrument(skip(self), fields(%parent_hash))]
82pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
83self.by_parent.contains_key(&parent_hash)
84 }
8586/// Dequeue and return all blocks that were waiting for the arrival of
87 /// `parent`.
88#[instrument(skip(self), fields(%parent_hash))]
89pub fn dequeue_children(
90&mut self,
91 parent_hash: block::Hash,
92 ) -> Vec<QueuedSemanticallyVerified> {
93let queued_children = self
94.by_parent
95 .remove(&parent_hash)
96 .unwrap_or_default()
97 .into_iter()
98 .map(|hash| {
99self.blocks
100 .remove(&hash)
101 .expect("block is present if its hash is in by_parent")
102 })
103 .collect::<Vec<_>>();
104105for queued in &queued_children {
106self.by_height.remove(&queued.0.height);
107// TODO: only remove UTXOs if there are no queued blocks with that UTXO
108 // (known_utxos is best-effort, so this is ok for now)
109for outpoint in queued.0.new_outputs.keys() {
110self.known_utxos.remove(outpoint);
111 }
112 }
113114tracing::trace!(
115 dequeued = queued_children.len(),
116 remaining = self.blocks.len(),
117"dequeued blocks"
118);
119self.update_metrics();
120121 queued_children
122 }
123124/// Remove all queued blocks whose height is less than or equal to the given
125 /// `finalized_tip_height`.
126#[instrument(skip(self))]
127pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
128// split_off returns the values _greater than or equal to_ the key. What
129 // we need is the keys that are less than or equal to
130 // `finalized_tip_height`. To get this we have split at
131 // `finalized_tip_height + 1` and swap the removed portion of the list
132 // with the remainder.
133let split_height = finalized_tip_height + 1;
134let split_height =
135 split_height.expect("height after finalized tip won't exceed max height");
136let mut by_height = self.by_height.split_off(&split_height);
137 mem::swap(&mut self.by_height, &mut by_height);
138139for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
140let (expired_block, expired_sender) =
141self.blocks.remove(&hash).expect("block is present");
142let parent_hash = &expired_block.block.header.previous_block_hash;
143144// we don't care if the receiver was dropped
145let _ = expired_sender.send(Err(
146"pruned block at or below the finalized tip height".into()
147 ));
148149// TODO: only remove UTXOs if there are no queued blocks with that UTXO
150 // (known_utxos is best-effort, so this is ok for now)
151for outpoint in expired_block.new_outputs.keys() {
152self.known_utxos.remove(outpoint);
153 }
154155let parent_list = self
156.by_parent
157 .get_mut(parent_hash)
158 .expect("parent is present");
159160if parent_list.len() == 1 {
161let removed = self
162.by_parent
163 .remove(parent_hash)
164 .expect("parent is present");
165assert!(
166 removed.contains(&hash),
167"hash must be present in parent hash list"
168);
169 } else {
170assert!(
171 parent_list.remove(&hash),
172"hash must be present in parent hash list"
173);
174 }
175 }
176177tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
178self.update_metrics();
179 }
180181/// Return the queued block if it has already been registered
182pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
183self.blocks.get_mut(hash)
184 }
185186/// Update metrics after the queue is modified
187fn update_metrics(&self) {
188if let Some(min_height) = self.by_height.keys().next() {
189metrics::gauge!("state.memory.queued.min.height").set(min_height.0 as f64);
190 } else {
191// use f64::NAN as a sentinel value for "None", because 0 is a valid height
192metrics::gauge!("state.memory.queued.min.height").set(f64::NAN);
193 }
194if let Some(max_height) = self.by_height.keys().next_back() {
195metrics::gauge!("state.memory.queued.max.height").set(max_height.0 as f64);
196 } else {
197// use f64::NAN as a sentinel value for "None", because 0 is a valid height
198metrics::gauge!("state.memory.queued.max.height").set(f64::NAN);
199 }
200201metrics::gauge!("state.memory.queued.block.count").set(self.blocks.len() as f64);
202 }
203204/// Try to look up this UTXO in any queued block.
205#[instrument(skip(self))]
206pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
207self.known_utxos.get(outpoint).cloned()
208 }
209210/// Clears known_utxos, by_parent, and by_height, then drains blocks.
211 /// Returns all key-value pairs of blocks as an iterator.
212 ///
213 /// Doesn't update the metrics, because it is only used when the state is being dropped.
214pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
215self.known_utxos.clear();
216self.known_utxos.shrink_to_fit();
217self.by_parent.clear();
218self.by_parent.shrink_to_fit();
219self.by_height.clear();
220221self.blocks.drain()
222 }
223}
224225#[derive(Debug, Default)]
226pub(crate) struct SentHashes {
227/// A list of previously sent block batches, each batch is in increasing height order.
228 /// We use this list to efficiently prune outdated hashes that are at or below the finalized tip.
229bufs: Vec<VecDeque<(block::Hash, block::Height)>>,
230231/// The list of blocks sent in the current batch, in increasing height order.
232curr_buf: VecDeque<(block::Hash, block::Height)>,
233234/// Stores a set of hashes that have been sent to the block write task but
235 /// may not be in the finalized state yet.
236sent: HashMap<block::Hash, Vec<transparent::OutPoint>>,
237238/// Known UTXOs.
239known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
240241/// Whether the hashes in this struct can be used check if the chain can be forked.
242 /// This is set to false until all checkpoint-verified block hashes have been pruned.
243pub(crate) can_fork_chain_at_hashes: bool,
244}
245246impl SentHashes {
247/// Stores the `block`'s hash, height, and UTXOs, so they can be used to check if a block or UTXO
248 /// is available in the state.
249 ///
250 /// Assumes that blocks are added in the order of their height between `finish_batch` calls
251 /// for efficient pruning.
252pub fn add(&mut self, block: &SemanticallyVerifiedBlock) {
253// Track known UTXOs in sent blocks.
254let outpoints = block
255 .new_outputs
256 .iter()
257 .map(|(outpoint, ordered_utxo)| {
258self.known_utxos
259 .insert(*outpoint, ordered_utxo.utxo.clone());
260 outpoint
261 })
262 .cloned()
263 .collect();
264265self.curr_buf.push_back((block.hash, block.height));
266self.sent.insert(block.hash, outpoints);
267268self.update_metrics_for_block(block.height);
269 }
270271/// Stores the checkpoint verified `block`'s hash, height, and UTXOs, so they can be used to check if a
272 /// block or UTXO is available in the state.
273 ///
274 /// Used for checkpoint verified blocks close to the final checkpoint, so the semantic block verifier can look up
275 /// their UTXOs.
276 ///
277 /// Assumes that blocks are added in the order of their height between `finish_batch` calls
278 /// for efficient pruning.
279 ///
280 /// For more details see `add()`.
281pub fn add_finalized(&mut self, block: &CheckpointVerifiedBlock) {
282// Track known UTXOs in sent blocks.
283let outpoints = block
284 .new_outputs
285 .iter()
286 .map(|(outpoint, ordered_utxo)| {
287self.known_utxos
288 .insert(*outpoint, ordered_utxo.utxo.clone());
289 outpoint
290 })
291 .cloned()
292 .collect();
293294self.curr_buf.push_back((block.hash, block.height));
295self.sent.insert(block.hash, outpoints);
296297self.update_metrics_for_block(block.height);
298 }
299300/// Try to look up this UTXO in any sent block.
301#[instrument(skip(self))]
302pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
303self.known_utxos.get(outpoint).cloned()
304 }
305306/// Finishes the current block batch, and stores it for efficient pruning.
307pub fn finish_batch(&mut self) {
308if !self.curr_buf.is_empty() {
309self.bufs.push(std::mem::take(&mut self.curr_buf));
310 }
311 }
312313/// Prunes sent blocks at or below `height_bound`.
314 ///
315 /// Finishes the batch if `finish_batch()` hasn't been called already.
316 ///
317 /// Assumes that blocks will be added in order of their heights between each `finish_batch()` call,
318 /// so that blocks can be efficiently and reliably removed by height.
319pub fn prune_by_height(&mut self, height_bound: block::Height) {
320self.finish_batch();
321322// Iterates over each buf in `sent_bufs`, removing sent blocks until reaching
323 // the first block with a height above the `height_bound`.
324self.bufs.retain_mut(|buf| {
325while let Some((hash, height)) = buf.pop_front() {
326if height > height_bound {
327 buf.push_front((hash, height));
328return true;
329 } else if let Some(expired_outpoints) = self.sent.remove(&hash) {
330// TODO: only remove UTXOs if there are no queued blocks with that UTXO
331 // (known_utxos is best-effort, so this is ok for now)
332for outpoint in expired_outpoints.iter() {
333self.known_utxos.remove(outpoint);
334 }
335 }
336 }
337338false
339});
340341self.sent.shrink_to_fit();
342self.known_utxos.shrink_to_fit();
343self.bufs.shrink_to_fit();
344345self.update_metrics_for_cache();
346 }
347348/// Returns true if SentHashes contains the `hash`
349pub fn contains(&self, hash: &block::Hash) -> bool {
350self.sent.contains_key(hash)
351 }
352353/// Returns true if the chain can be forked at the provided hash
354pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
355self.can_fork_chain_at_hashes && self.contains(hash)
356 }
357358/// Update sent block metrics after a block is sent.
359fn update_metrics_for_block(&self, height: block::Height) {
360metrics::counter!("state.memory.sent.block.count").increment(1);
361metrics::gauge!("state.memory.sent.block.height").set(height.0 as f64);
362363self.update_metrics_for_cache();
364 }
365366/// Update sent block cache metrics after the sent blocks are modified.
367fn update_metrics_for_cache(&self) {
368let batch_iter = || self.bufs.iter().chain(iter::once(&self.curr_buf));
369370if let Some(min_height) = batch_iter()
371 .flat_map(|batch| batch.front().map(|(_hash, height)| height))
372 .min()
373 {
374metrics::gauge!("state.memory.sent.cache.min.height").set(min_height.0 as f64);
375 } else {
376// use f64::NAN as a sentinel value for "None", because 0 is a valid height
377metrics::gauge!("state.memory.sent.cache.min.height").set(f64::NAN);
378 }
379380if let Some(max_height) = batch_iter()
381 .flat_map(|batch| batch.back().map(|(_hash, height)| height))
382 .max()
383 {
384metrics::gauge!("state.memory.sent.cache.max.height").set(max_height.0 as f64);
385 } else {
386// use f64::NAN as a sentinel value for "None", because 0 is a valid height
387metrics::gauge!("state.memory.sent.cache.max.height").set(f64::NAN);
388 }
389390metrics::gauge!("state.memory.sent.cache.block.count")
391 .set(batch_iter().flatten().count() as f64);
392393metrics::gauge!("state.memory.sent.cache.batch.count").set(batch_iter().count() as f64);
394 }
395}