zebra_state/service/
queued_blocks.rs

1//! Queued blocks that are awaiting their parent block for verification.
2
3use std::{
4    collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque},
5    iter, mem,
6};
7
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use zebra_chain::{block, transparent};
12
13use crate::{
14    BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, NonFinalizedState,
15    SemanticallyVerifiedBlock, ValidateContextError,
16};
17
18#[cfg(test)]
19mod tests;
20
21/// A queued checkpoint verified block, and its corresponding [`Result`] channel.
22pub type QueuedCheckpointVerified = (
23    CheckpointVerifiedBlock,
24    oneshot::Sender<Result<block::Hash, BoxError>>,
25);
26
27/// A queued semantically verified block, and its corresponding [`Result`] channel.
28pub type QueuedSemanticallyVerified = (
29    SemanticallyVerifiedBlock,
30    oneshot::Sender<Result<block::Hash, CommitSemanticallyVerifiedError>>,
31);
32
33/// A queue of blocks, awaiting the arrival of parent blocks.
34#[derive(Debug, Default)]
35pub struct QueuedBlocks {
36    /// Blocks awaiting their parent blocks for contextual verification.
37    blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
38    /// Hashes from `queued_blocks`, indexed by parent hash.
39    by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
40    /// Hashes from `queued_blocks`, indexed by block height.
41    by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
42    /// Known UTXOs.
43    known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
44}
45
46impl QueuedBlocks {
47    /// Queue a block for eventual verification and commit.
48    ///
49    /// # Panics
50    ///
51    /// - if a block with the same `block::Hash` has already been queued.
52    #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
53    pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
54        let new_hash = new.0.hash;
55        let new_height = new.0.height;
56        let parent_hash = new.0.block.header.previous_block_hash;
57
58        if self.blocks.contains_key(&new_hash) {
59            // Skip queueing the block and return early if the hash is not unique
60            return;
61        }
62
63        // Track known UTXOs in queued blocks.
64        for (outpoint, ordered_utxo) in new.0.new_outputs.iter() {
65            self.known_utxos
66                .insert(*outpoint, ordered_utxo.utxo.clone());
67        }
68
69        self.blocks.insert(new_hash, new);
70        self.by_height
71            .entry(new_height)
72            .or_default()
73            .insert(new_hash);
74        self.by_parent
75            .entry(parent_hash)
76            .or_default()
77            .insert(new_hash);
78
79        tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block");
80        self.update_metrics();
81    }
82
83    /// Returns `true` if there are any queued children of `parent_hash`.
84    #[instrument(skip(self), fields(%parent_hash))]
85    pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
86        self.by_parent.contains_key(&parent_hash)
87    }
88
89    /// Dequeue and return all blocks that were waiting for the arrival of
90    /// `parent`.
91    #[instrument(skip(self), fields(%parent_hash))]
92    pub fn dequeue_children(
93        &mut self,
94        parent_hash: block::Hash,
95    ) -> Vec<QueuedSemanticallyVerified> {
96        let queued_children = self
97            .by_parent
98            .remove(&parent_hash)
99            .unwrap_or_default()
100            .into_iter()
101            .map(|hash| {
102                self.blocks
103                    .remove(&hash)
104                    .expect("block is present if its hash is in by_parent")
105            })
106            .collect::<Vec<_>>();
107
108        for queued in &queued_children {
109            self.by_height.remove(&queued.0.height);
110            // TODO: only remove UTXOs if there are no queued blocks with that UTXO
111            //       (known_utxos is best-effort, so this is ok for now)
112            for outpoint in queued.0.new_outputs.keys() {
113                self.known_utxos.remove(outpoint);
114            }
115        }
116
117        tracing::trace!(
118            dequeued = queued_children.len(),
119            remaining = self.blocks.len(),
120            "dequeued blocks"
121        );
122        self.update_metrics();
123
124        queued_children
125    }
126
127    /// Remove all queued blocks whose height is less than or equal to the given
128    /// `finalized_tip_height`.
129    #[instrument(skip(self))]
130    pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
131        // split_off returns the values _greater than or equal to_ the key. What
132        // we need is the keys that are less than or equal to
133        // `finalized_tip_height`. To get this we have split at
134        // `finalized_tip_height + 1` and swap the removed portion of the list
135        // with the remainder.
136        let split_height = finalized_tip_height + 1;
137        let split_height =
138            split_height.expect("height after finalized tip won't exceed max height");
139        let mut by_height = self.by_height.split_off(&split_height);
140        mem::swap(&mut self.by_height, &mut by_height);
141
142        for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
143            let (expired_block, expired_sender) =
144                self.blocks.remove(&hash).expect("block is present");
145            let parent_hash = &expired_block.block.header.previous_block_hash;
146
147            // we don't care if the receiver was dropped
148            let _ = expired_sender.send(Err(CommitSemanticallyVerifiedError::from(
149                ValidateContextError::PrunedBelowFinalizedTip {
150                    block_height: expired_block.height,
151                },
152            )));
153
154            // TODO: only remove UTXOs if there are no queued blocks with that UTXO
155            //       (known_utxos is best-effort, so this is ok for now)
156            for outpoint in expired_block.new_outputs.keys() {
157                self.known_utxos.remove(outpoint);
158            }
159
160            let parent_list = self
161                .by_parent
162                .get_mut(parent_hash)
163                .expect("parent is present");
164
165            if parent_list.len() == 1 {
166                let removed = self
167                    .by_parent
168                    .remove(parent_hash)
169                    .expect("parent is present");
170                assert!(
171                    removed.contains(&hash),
172                    "hash must be present in parent hash list"
173                );
174            } else {
175                assert!(
176                    parent_list.remove(&hash),
177                    "hash must be present in parent hash list"
178                );
179            }
180        }
181
182        tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
183        self.update_metrics();
184    }
185
186    /// Return the queued block if it has already been registered
187    pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
188        self.blocks.get_mut(hash)
189    }
190
191    /// Update metrics after the queue is modified
192    fn update_metrics(&self) {
193        if let Some(min_height) = self.by_height.keys().next() {
194            metrics::gauge!("state.memory.queued.min.height").set(min_height.0 as f64);
195        } else {
196            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
197            metrics::gauge!("state.memory.queued.min.height").set(f64::NAN);
198        }
199        if let Some(max_height) = self.by_height.keys().next_back() {
200            metrics::gauge!("state.memory.queued.max.height").set(max_height.0 as f64);
201        } else {
202            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
203            metrics::gauge!("state.memory.queued.max.height").set(f64::NAN);
204        }
205
206        metrics::gauge!("state.memory.queued.block.count").set(self.blocks.len() as f64);
207    }
208
209    /// Try to look up this UTXO in any queued block.
210    #[instrument(skip(self))]
211    pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
212        self.known_utxos.get(outpoint).cloned()
213    }
214
215    /// Clears known_utxos, by_parent, and by_height, then drains blocks.
216    /// Returns all key-value pairs of blocks as an iterator.
217    ///
218    /// Doesn't update the metrics, because it is only used when the state is being dropped.
219    pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
220        self.known_utxos.clear();
221        self.known_utxos.shrink_to_fit();
222        self.by_parent.clear();
223        self.by_parent.shrink_to_fit();
224        self.by_height.clear();
225
226        self.blocks.drain()
227    }
228}
229
230#[derive(Debug, Default)]
231pub(crate) struct SentHashes {
232    /// A list of previously sent block batches, each batch is in increasing height order.
233    /// We use this list to efficiently prune outdated hashes that are at or below the finalized tip.
234    bufs: Vec<VecDeque<(block::Hash, block::Height)>>,
235
236    /// The list of blocks sent in the current batch, in increasing height order.
237    curr_buf: VecDeque<(block::Hash, block::Height)>,
238
239    /// Stores a set of hashes that have been sent to the block write task but
240    /// may not be in the finalized state yet.
241    pub sent: HashMap<block::Hash, Vec<transparent::OutPoint>>,
242
243    /// Known UTXOs.
244    known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
245
246    /// Whether the hashes in this struct can be used check if the chain can be forked.
247    /// This is set to false until all checkpoint-verified block hashes have been pruned.
248    pub(crate) can_fork_chain_at_hashes: bool,
249}
250
251impl SentHashes {
252    /// Creates a new [`SentHashes`] with the block hashes and UTXOs in the provided non-finalized state.
253    pub fn new(non_finalized_state: &NonFinalizedState) -> Self {
254        let mut sent_hashes = Self::default();
255        for (_, block) in non_finalized_state
256            .chain_iter()
257            .flat_map(|c| c.blocks.clone())
258        {
259            sent_hashes.add(&block.into());
260        }
261
262        if !sent_hashes.sent.is_empty() {
263            sent_hashes.can_fork_chain_at_hashes = true;
264        }
265
266        sent_hashes
267    }
268
269    /// Stores the `block`'s hash, height, and UTXOs, so they can be used to check if a block or UTXO
270    /// is available in the state.
271    ///
272    /// Assumes that blocks are added in the order of their height between `finish_batch` calls
273    /// for efficient pruning.
274    pub fn add(&mut self, block: &SemanticallyVerifiedBlock) {
275        // Track known UTXOs in sent blocks.
276        let outpoints = block
277            .new_outputs
278            .iter()
279            .map(|(outpoint, ordered_utxo)| {
280                self.known_utxos
281                    .insert(*outpoint, ordered_utxo.utxo.clone());
282                outpoint
283            })
284            .cloned()
285            .collect();
286
287        self.curr_buf.push_back((block.hash, block.height));
288        self.sent.insert(block.hash, outpoints);
289
290        self.update_metrics_for_block(block.height);
291    }
292
293    /// Stores the checkpoint verified `block`'s hash, height, and UTXOs, so they can be used to check if a
294    /// block or UTXO is available in the state.
295    ///
296    /// Used for checkpoint verified blocks close to the final checkpoint, so the semantic block verifier can look up
297    /// their UTXOs.
298    ///
299    /// Assumes that blocks are added in the order of their height between `finish_batch` calls
300    /// for efficient pruning.
301    ///
302    /// For more details see `add()`.
303    pub fn add_finalized(&mut self, block: &CheckpointVerifiedBlock) {
304        // Track known UTXOs in sent blocks.
305        let outpoints = block
306            .new_outputs
307            .iter()
308            .map(|(outpoint, ordered_utxo)| {
309                self.known_utxos
310                    .insert(*outpoint, ordered_utxo.utxo.clone());
311                outpoint
312            })
313            .cloned()
314            .collect();
315
316        self.curr_buf.push_back((block.hash, block.height));
317        self.sent.insert(block.hash, outpoints);
318
319        self.update_metrics_for_block(block.height);
320    }
321
322    /// Try to look up this UTXO in any sent block.
323    #[instrument(skip(self))]
324    pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
325        self.known_utxos.get(outpoint).cloned()
326    }
327
328    /// Finishes the current block batch, and stores it for efficient pruning.
329    pub fn finish_batch(&mut self) {
330        if !self.curr_buf.is_empty() {
331            self.bufs.push(std::mem::take(&mut self.curr_buf));
332        }
333    }
334
335    /// Prunes sent blocks at or below `height_bound`.
336    ///
337    /// Finishes the batch if `finish_batch()` hasn't been called already.
338    ///
339    /// Assumes that blocks will be added in order of their heights between each `finish_batch()` call,
340    /// so that blocks can be efficiently and reliably removed by height.
341    pub fn prune_by_height(&mut self, height_bound: block::Height) {
342        self.finish_batch();
343
344        // Iterates over each buf in `sent_bufs`, removing sent blocks until reaching
345        // the first block with a height above the `height_bound`.
346        self.bufs.retain_mut(|buf| {
347            while let Some((hash, height)) = buf.pop_front() {
348                if height > height_bound {
349                    buf.push_front((hash, height));
350                    return true;
351                } else if let Some(expired_outpoints) = self.sent.remove(&hash) {
352                    // TODO: only remove UTXOs if there are no queued blocks with that UTXO
353                    //       (known_utxos is best-effort, so this is ok for now)
354                    for outpoint in expired_outpoints.iter() {
355                        self.known_utxos.remove(outpoint);
356                    }
357                }
358            }
359
360            false
361        });
362
363        self.sent.shrink_to_fit();
364        self.known_utxos.shrink_to_fit();
365        self.bufs.shrink_to_fit();
366
367        self.update_metrics_for_cache();
368    }
369
370    /// Returns true if SentHashes contains the `hash`
371    pub fn contains(&self, hash: &block::Hash) -> bool {
372        self.sent.contains_key(hash)
373    }
374
375    /// Returns true if the chain can be forked at the provided hash
376    pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
377        self.can_fork_chain_at_hashes && self.contains(hash)
378    }
379
380    /// Update sent block metrics after a block is sent.
381    fn update_metrics_for_block(&self, height: block::Height) {
382        metrics::counter!("state.memory.sent.block.count").increment(1);
383        metrics::gauge!("state.memory.sent.block.height").set(height.0 as f64);
384
385        self.update_metrics_for_cache();
386    }
387
388    /// Update sent block cache metrics after the sent blocks are modified.
389    fn update_metrics_for_cache(&self) {
390        let batch_iter = || self.bufs.iter().chain(iter::once(&self.curr_buf));
391
392        if let Some(min_height) = batch_iter()
393            .flat_map(|batch| batch.front().map(|(_hash, height)| height))
394            .min()
395        {
396            metrics::gauge!("state.memory.sent.cache.min.height").set(min_height.0 as f64);
397        } else {
398            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
399            metrics::gauge!("state.memory.sent.cache.min.height").set(f64::NAN);
400        }
401
402        if let Some(max_height) = batch_iter()
403            .flat_map(|batch| batch.back().map(|(_hash, height)| height))
404            .max()
405        {
406            metrics::gauge!("state.memory.sent.cache.max.height").set(max_height.0 as f64);
407        } else {
408            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
409            metrics::gauge!("state.memory.sent.cache.max.height").set(f64::NAN);
410        }
411
412        metrics::gauge!("state.memory.sent.cache.block.count")
413            .set(batch_iter().flatten().count() as f64);
414
415        metrics::gauge!("state.memory.sent.cache.batch.count").set(batch_iter().count() as f64);
416    }
417}