zebra_state/service/
queued_blocks.rs

1//! Queued blocks that are awaiting their parent block for verification.
2
3use std::{
4    collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque},
5    iter, mem,
6};
7
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use zebra_chain::{block, transparent};
12
13use crate::{BoxError, CheckpointVerifiedBlock, SemanticallyVerifiedBlock};
14
15#[cfg(test)]
16mod tests;
17
18/// A queued checkpoint verified block, and its corresponding [`Result`] channel.
19pub type QueuedCheckpointVerified = (
20    CheckpointVerifiedBlock,
21    oneshot::Sender<Result<block::Hash, BoxError>>,
22);
23
24/// A queued semantically verified block, and its corresponding [`Result`] channel.
25pub type QueuedSemanticallyVerified = (
26    SemanticallyVerifiedBlock,
27    oneshot::Sender<Result<block::Hash, BoxError>>,
28);
29
30/// A queue of blocks, awaiting the arrival of parent blocks.
31#[derive(Debug, Default)]
32pub struct QueuedBlocks {
33    /// Blocks awaiting their parent blocks for contextual verification.
34    blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
35    /// Hashes from `queued_blocks`, indexed by parent hash.
36    by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
37    /// Hashes from `queued_blocks`, indexed by block height.
38    by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
39    /// Known UTXOs.
40    known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
41}
42
43impl QueuedBlocks {
44    /// Queue a block for eventual verification and commit.
45    ///
46    /// # Panics
47    ///
48    /// - if a block with the same `block::Hash` has already been queued.
49    #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
50    pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
51        let new_hash = new.0.hash;
52        let new_height = new.0.height;
53        let parent_hash = new.0.block.header.previous_block_hash;
54
55        if self.blocks.contains_key(&new_hash) {
56            // Skip queueing the block and return early if the hash is not unique
57            return;
58        }
59
60        // Track known UTXOs in queued blocks.
61        for (outpoint, ordered_utxo) in new.0.new_outputs.iter() {
62            self.known_utxos
63                .insert(*outpoint, ordered_utxo.utxo.clone());
64        }
65
66        self.blocks.insert(new_hash, new);
67        self.by_height
68            .entry(new_height)
69            .or_default()
70            .insert(new_hash);
71        self.by_parent
72            .entry(parent_hash)
73            .or_default()
74            .insert(new_hash);
75
76        tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block");
77        self.update_metrics();
78    }
79
80    /// Returns `true` if there are any queued children of `parent_hash`.
81    #[instrument(skip(self), fields(%parent_hash))]
82    pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
83        self.by_parent.contains_key(&parent_hash)
84    }
85
86    /// Dequeue and return all blocks that were waiting for the arrival of
87    /// `parent`.
88    #[instrument(skip(self), fields(%parent_hash))]
89    pub fn dequeue_children(
90        &mut self,
91        parent_hash: block::Hash,
92    ) -> Vec<QueuedSemanticallyVerified> {
93        let queued_children = self
94            .by_parent
95            .remove(&parent_hash)
96            .unwrap_or_default()
97            .into_iter()
98            .map(|hash| {
99                self.blocks
100                    .remove(&hash)
101                    .expect("block is present if its hash is in by_parent")
102            })
103            .collect::<Vec<_>>();
104
105        for queued in &queued_children {
106            self.by_height.remove(&queued.0.height);
107            // TODO: only remove UTXOs if there are no queued blocks with that UTXO
108            //       (known_utxos is best-effort, so this is ok for now)
109            for outpoint in queued.0.new_outputs.keys() {
110                self.known_utxos.remove(outpoint);
111            }
112        }
113
114        tracing::trace!(
115            dequeued = queued_children.len(),
116            remaining = self.blocks.len(),
117            "dequeued blocks"
118        );
119        self.update_metrics();
120
121        queued_children
122    }
123
124    /// Remove all queued blocks whose height is less than or equal to the given
125    /// `finalized_tip_height`.
126    #[instrument(skip(self))]
127    pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
128        // split_off returns the values _greater than or equal to_ the key. What
129        // we need is the keys that are less than or equal to
130        // `finalized_tip_height`. To get this we have split at
131        // `finalized_tip_height + 1` and swap the removed portion of the list
132        // with the remainder.
133        let split_height = finalized_tip_height + 1;
134        let split_height =
135            split_height.expect("height after finalized tip won't exceed max height");
136        let mut by_height = self.by_height.split_off(&split_height);
137        mem::swap(&mut self.by_height, &mut by_height);
138
139        for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
140            let (expired_block, expired_sender) =
141                self.blocks.remove(&hash).expect("block is present");
142            let parent_hash = &expired_block.block.header.previous_block_hash;
143
144            // we don't care if the receiver was dropped
145            let _ = expired_sender.send(Err(
146                "pruned block at or below the finalized tip height".into()
147            ));
148
149            // TODO: only remove UTXOs if there are no queued blocks with that UTXO
150            //       (known_utxos is best-effort, so this is ok for now)
151            for outpoint in expired_block.new_outputs.keys() {
152                self.known_utxos.remove(outpoint);
153            }
154
155            let parent_list = self
156                .by_parent
157                .get_mut(parent_hash)
158                .expect("parent is present");
159
160            if parent_list.len() == 1 {
161                let removed = self
162                    .by_parent
163                    .remove(parent_hash)
164                    .expect("parent is present");
165                assert!(
166                    removed.contains(&hash),
167                    "hash must be present in parent hash list"
168                );
169            } else {
170                assert!(
171                    parent_list.remove(&hash),
172                    "hash must be present in parent hash list"
173                );
174            }
175        }
176
177        tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
178        self.update_metrics();
179    }
180
181    /// Return the queued block if it has already been registered
182    pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
183        self.blocks.get_mut(hash)
184    }
185
186    /// Update metrics after the queue is modified
187    fn update_metrics(&self) {
188        if let Some(min_height) = self.by_height.keys().next() {
189            metrics::gauge!("state.memory.queued.min.height").set(min_height.0 as f64);
190        } else {
191            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
192            metrics::gauge!("state.memory.queued.min.height").set(f64::NAN);
193        }
194        if let Some(max_height) = self.by_height.keys().next_back() {
195            metrics::gauge!("state.memory.queued.max.height").set(max_height.0 as f64);
196        } else {
197            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
198            metrics::gauge!("state.memory.queued.max.height").set(f64::NAN);
199        }
200
201        metrics::gauge!("state.memory.queued.block.count").set(self.blocks.len() as f64);
202    }
203
204    /// Try to look up this UTXO in any queued block.
205    #[instrument(skip(self))]
206    pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
207        self.known_utxos.get(outpoint).cloned()
208    }
209
210    /// Clears known_utxos, by_parent, and by_height, then drains blocks.
211    /// Returns all key-value pairs of blocks as an iterator.
212    ///
213    /// Doesn't update the metrics, because it is only used when the state is being dropped.
214    pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
215        self.known_utxos.clear();
216        self.known_utxos.shrink_to_fit();
217        self.by_parent.clear();
218        self.by_parent.shrink_to_fit();
219        self.by_height.clear();
220
221        self.blocks.drain()
222    }
223}
224
225#[derive(Debug, Default)]
226pub(crate) struct SentHashes {
227    /// A list of previously sent block batches, each batch is in increasing height order.
228    /// We use this list to efficiently prune outdated hashes that are at or below the finalized tip.
229    bufs: Vec<VecDeque<(block::Hash, block::Height)>>,
230
231    /// The list of blocks sent in the current batch, in increasing height order.
232    curr_buf: VecDeque<(block::Hash, block::Height)>,
233
234    /// Stores a set of hashes that have been sent to the block write task but
235    /// may not be in the finalized state yet.
236    sent: HashMap<block::Hash, Vec<transparent::OutPoint>>,
237
238    /// Known UTXOs.
239    known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
240
241    /// Whether the hashes in this struct can be used check if the chain can be forked.
242    /// This is set to false until all checkpoint-verified block hashes have been pruned.
243    pub(crate) can_fork_chain_at_hashes: bool,
244}
245
246impl SentHashes {
247    /// Stores the `block`'s hash, height, and UTXOs, so they can be used to check if a block or UTXO
248    /// is available in the state.
249    ///
250    /// Assumes that blocks are added in the order of their height between `finish_batch` calls
251    /// for efficient pruning.
252    pub fn add(&mut self, block: &SemanticallyVerifiedBlock) {
253        // Track known UTXOs in sent blocks.
254        let outpoints = block
255            .new_outputs
256            .iter()
257            .map(|(outpoint, ordered_utxo)| {
258                self.known_utxos
259                    .insert(*outpoint, ordered_utxo.utxo.clone());
260                outpoint
261            })
262            .cloned()
263            .collect();
264
265        self.curr_buf.push_back((block.hash, block.height));
266        self.sent.insert(block.hash, outpoints);
267
268        self.update_metrics_for_block(block.height);
269    }
270
271    /// Stores the checkpoint verified `block`'s hash, height, and UTXOs, so they can be used to check if a
272    /// block or UTXO is available in the state.
273    ///
274    /// Used for checkpoint verified blocks close to the final checkpoint, so the semantic block verifier can look up
275    /// their UTXOs.
276    ///
277    /// Assumes that blocks are added in the order of their height between `finish_batch` calls
278    /// for efficient pruning.
279    ///
280    /// For more details see `add()`.
281    pub fn add_finalized(&mut self, block: &CheckpointVerifiedBlock) {
282        // Track known UTXOs in sent blocks.
283        let outpoints = block
284            .new_outputs
285            .iter()
286            .map(|(outpoint, ordered_utxo)| {
287                self.known_utxos
288                    .insert(*outpoint, ordered_utxo.utxo.clone());
289                outpoint
290            })
291            .cloned()
292            .collect();
293
294        self.curr_buf.push_back((block.hash, block.height));
295        self.sent.insert(block.hash, outpoints);
296
297        self.update_metrics_for_block(block.height);
298    }
299
300    /// Try to look up this UTXO in any sent block.
301    #[instrument(skip(self))]
302    pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
303        self.known_utxos.get(outpoint).cloned()
304    }
305
306    /// Finishes the current block batch, and stores it for efficient pruning.
307    pub fn finish_batch(&mut self) {
308        if !self.curr_buf.is_empty() {
309            self.bufs.push(std::mem::take(&mut self.curr_buf));
310        }
311    }
312
313    /// Prunes sent blocks at or below `height_bound`.
314    ///
315    /// Finishes the batch if `finish_batch()` hasn't been called already.
316    ///
317    /// Assumes that blocks will be added in order of their heights between each `finish_batch()` call,
318    /// so that blocks can be efficiently and reliably removed by height.
319    pub fn prune_by_height(&mut self, height_bound: block::Height) {
320        self.finish_batch();
321
322        // Iterates over each buf in `sent_bufs`, removing sent blocks until reaching
323        // the first block with a height above the `height_bound`.
324        self.bufs.retain_mut(|buf| {
325            while let Some((hash, height)) = buf.pop_front() {
326                if height > height_bound {
327                    buf.push_front((hash, height));
328                    return true;
329                } else if let Some(expired_outpoints) = self.sent.remove(&hash) {
330                    // TODO: only remove UTXOs if there are no queued blocks with that UTXO
331                    //       (known_utxos is best-effort, so this is ok for now)
332                    for outpoint in expired_outpoints.iter() {
333                        self.known_utxos.remove(outpoint);
334                    }
335                }
336            }
337
338            false
339        });
340
341        self.sent.shrink_to_fit();
342        self.known_utxos.shrink_to_fit();
343        self.bufs.shrink_to_fit();
344
345        self.update_metrics_for_cache();
346    }
347
348    /// Returns true if SentHashes contains the `hash`
349    pub fn contains(&self, hash: &block::Hash) -> bool {
350        self.sent.contains_key(hash)
351    }
352
353    /// Returns true if the chain can be forked at the provided hash
354    pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
355        self.can_fork_chain_at_hashes && self.contains(hash)
356    }
357
358    /// Update sent block metrics after a block is sent.
359    fn update_metrics_for_block(&self, height: block::Height) {
360        metrics::counter!("state.memory.sent.block.count").increment(1);
361        metrics::gauge!("state.memory.sent.block.height").set(height.0 as f64);
362
363        self.update_metrics_for_cache();
364    }
365
366    /// Update sent block cache metrics after the sent blocks are modified.
367    fn update_metrics_for_cache(&self) {
368        let batch_iter = || self.bufs.iter().chain(iter::once(&self.curr_buf));
369
370        if let Some(min_height) = batch_iter()
371            .flat_map(|batch| batch.front().map(|(_hash, height)| height))
372            .min()
373        {
374            metrics::gauge!("state.memory.sent.cache.min.height").set(min_height.0 as f64);
375        } else {
376            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
377            metrics::gauge!("state.memory.sent.cache.min.height").set(f64::NAN);
378        }
379
380        if let Some(max_height) = batch_iter()
381            .flat_map(|batch| batch.back().map(|(_hash, height)| height))
382            .max()
383        {
384            metrics::gauge!("state.memory.sent.cache.max.height").set(max_height.0 as f64);
385        } else {
386            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
387            metrics::gauge!("state.memory.sent.cache.max.height").set(f64::NAN);
388        }
389
390        metrics::gauge!("state.memory.sent.cache.block.count")
391            .set(batch_iter().flatten().count() as f64);
392
393        metrics::gauge!("state.memory.sent.cache.batch.count").set(batch_iter().count() as f64);
394    }
395}