zebra_state/service/
queued_blocks.rs1use std::{
4 collections::{hash_map::Drain, BTreeMap, HashMap, HashSet, VecDeque},
5 iter, mem,
6};
7
8use tokio::sync::oneshot;
9use tracing::instrument;
10
11use zebra_chain::{block, transparent};
12
13use crate::{
14 BoxError, CheckpointVerifiedBlock, CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
15 ValidateContextError,
16};
17
18#[cfg(test)]
19mod tests;
20
21pub type QueuedCheckpointVerified = (
23 CheckpointVerifiedBlock,
24 oneshot::Sender<Result<block::Hash, BoxError>>,
25);
26
27pub type QueuedSemanticallyVerified = (
29 SemanticallyVerifiedBlock,
30 oneshot::Sender<Result<block::Hash, CommitSemanticallyVerifiedError>>,
31);
32
33#[derive(Debug, Default)]
35pub struct QueuedBlocks {
36 blocks: HashMap<block::Hash, QueuedSemanticallyVerified>,
38 by_parent: HashMap<block::Hash, HashSet<block::Hash>>,
40 by_height: BTreeMap<block::Height, HashSet<block::Hash>>,
42 known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
44}
45
46impl QueuedBlocks {
47 #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))]
53 pub fn queue(&mut self, new: QueuedSemanticallyVerified) {
54 let new_hash = new.0.hash;
55 let new_height = new.0.height;
56 let parent_hash = new.0.block.header.previous_block_hash;
57
58 if self.blocks.contains_key(&new_hash) {
59 return;
61 }
62
63 for (outpoint, ordered_utxo) in new.0.new_outputs.iter() {
65 self.known_utxos
66 .insert(*outpoint, ordered_utxo.utxo.clone());
67 }
68
69 self.blocks.insert(new_hash, new);
70 self.by_height
71 .entry(new_height)
72 .or_default()
73 .insert(new_hash);
74 self.by_parent
75 .entry(parent_hash)
76 .or_default()
77 .insert(new_hash);
78
79 tracing::trace!(%parent_hash, queued = %self.blocks.len(), "queued block");
80 self.update_metrics();
81 }
82
83 #[instrument(skip(self), fields(%parent_hash))]
85 pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool {
86 self.by_parent.contains_key(&parent_hash)
87 }
88
89 #[instrument(skip(self), fields(%parent_hash))]
92 pub fn dequeue_children(
93 &mut self,
94 parent_hash: block::Hash,
95 ) -> Vec<QueuedSemanticallyVerified> {
96 let queued_children = self
97 .by_parent
98 .remove(&parent_hash)
99 .unwrap_or_default()
100 .into_iter()
101 .map(|hash| {
102 self.blocks
103 .remove(&hash)
104 .expect("block is present if its hash is in by_parent")
105 })
106 .collect::<Vec<_>>();
107
108 for queued in &queued_children {
109 self.by_height.remove(&queued.0.height);
110 for outpoint in queued.0.new_outputs.keys() {
113 self.known_utxos.remove(outpoint);
114 }
115 }
116
117 tracing::trace!(
118 dequeued = queued_children.len(),
119 remaining = self.blocks.len(),
120 "dequeued blocks"
121 );
122 self.update_metrics();
123
124 queued_children
125 }
126
127 #[instrument(skip(self))]
130 pub fn prune_by_height(&mut self, finalized_tip_height: block::Height) {
131 let split_height = finalized_tip_height + 1;
137 let split_height =
138 split_height.expect("height after finalized tip won't exceed max height");
139 let mut by_height = self.by_height.split_off(&split_height);
140 mem::swap(&mut self.by_height, &mut by_height);
141
142 for hash in by_height.into_iter().flat_map(|(_, hashes)| hashes) {
143 let (expired_block, expired_sender) =
144 self.blocks.remove(&hash).expect("block is present");
145 let parent_hash = &expired_block.block.header.previous_block_hash;
146
147 let _ = expired_sender.send(Err(CommitSemanticallyVerifiedError::from(
149 ValidateContextError::PrunedBelowFinalizedTip {
150 block_height: expired_block.height,
151 },
152 )));
153
154 for outpoint in expired_block.new_outputs.keys() {
157 self.known_utxos.remove(outpoint);
158 }
159
160 let parent_list = self
161 .by_parent
162 .get_mut(parent_hash)
163 .expect("parent is present");
164
165 if parent_list.len() == 1 {
166 let removed = self
167 .by_parent
168 .remove(parent_hash)
169 .expect("parent is present");
170 assert!(
171 removed.contains(&hash),
172 "hash must be present in parent hash list"
173 );
174 } else {
175 assert!(
176 parent_list.remove(&hash),
177 "hash must be present in parent hash list"
178 );
179 }
180 }
181
182 tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
183 self.update_metrics();
184 }
185
186 pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> {
188 self.blocks.get_mut(hash)
189 }
190
191 fn update_metrics(&self) {
193 if let Some(min_height) = self.by_height.keys().next() {
194 metrics::gauge!("state.memory.queued.min.height").set(min_height.0 as f64);
195 } else {
196 metrics::gauge!("state.memory.queued.min.height").set(f64::NAN);
198 }
199 if let Some(max_height) = self.by_height.keys().next_back() {
200 metrics::gauge!("state.memory.queued.max.height").set(max_height.0 as f64);
201 } else {
202 metrics::gauge!("state.memory.queued.max.height").set(f64::NAN);
204 }
205
206 metrics::gauge!("state.memory.queued.block.count").set(self.blocks.len() as f64);
207 }
208
209 #[instrument(skip(self))]
211 pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
212 self.known_utxos.get(outpoint).cloned()
213 }
214
215 pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> {
220 self.known_utxos.clear();
221 self.known_utxos.shrink_to_fit();
222 self.by_parent.clear();
223 self.by_parent.shrink_to_fit();
224 self.by_height.clear();
225
226 self.blocks.drain()
227 }
228}
229
230#[derive(Debug, Default)]
231pub(crate) struct SentHashes {
232 bufs: Vec<VecDeque<(block::Hash, block::Height)>>,
235
236 curr_buf: VecDeque<(block::Hash, block::Height)>,
238
239 sent: HashMap<block::Hash, Vec<transparent::OutPoint>>,
242
243 known_utxos: HashMap<transparent::OutPoint, transparent::Utxo>,
245
246 pub(crate) can_fork_chain_at_hashes: bool,
249}
250
251impl SentHashes {
252 pub fn add(&mut self, block: &SemanticallyVerifiedBlock) {
258 let outpoints = block
260 .new_outputs
261 .iter()
262 .map(|(outpoint, ordered_utxo)| {
263 self.known_utxos
264 .insert(*outpoint, ordered_utxo.utxo.clone());
265 outpoint
266 })
267 .cloned()
268 .collect();
269
270 self.curr_buf.push_back((block.hash, block.height));
271 self.sent.insert(block.hash, outpoints);
272
273 self.update_metrics_for_block(block.height);
274 }
275
276 pub fn add_finalized(&mut self, block: &CheckpointVerifiedBlock) {
287 let outpoints = block
289 .new_outputs
290 .iter()
291 .map(|(outpoint, ordered_utxo)| {
292 self.known_utxos
293 .insert(*outpoint, ordered_utxo.utxo.clone());
294 outpoint
295 })
296 .cloned()
297 .collect();
298
299 self.curr_buf.push_back((block.hash, block.height));
300 self.sent.insert(block.hash, outpoints);
301
302 self.update_metrics_for_block(block.height);
303 }
304
305 #[instrument(skip(self))]
307 pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
308 self.known_utxos.get(outpoint).cloned()
309 }
310
311 pub fn finish_batch(&mut self) {
313 if !self.curr_buf.is_empty() {
314 self.bufs.push(std::mem::take(&mut self.curr_buf));
315 }
316 }
317
318 pub fn prune_by_height(&mut self, height_bound: block::Height) {
325 self.finish_batch();
326
327 self.bufs.retain_mut(|buf| {
330 while let Some((hash, height)) = buf.pop_front() {
331 if height > height_bound {
332 buf.push_front((hash, height));
333 return true;
334 } else if let Some(expired_outpoints) = self.sent.remove(&hash) {
335 for outpoint in expired_outpoints.iter() {
338 self.known_utxos.remove(outpoint);
339 }
340 }
341 }
342
343 false
344 });
345
346 self.sent.shrink_to_fit();
347 self.known_utxos.shrink_to_fit();
348 self.bufs.shrink_to_fit();
349
350 self.update_metrics_for_cache();
351 }
352
353 pub fn contains(&self, hash: &block::Hash) -> bool {
355 self.sent.contains_key(hash)
356 }
357
358 pub fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
360 self.can_fork_chain_at_hashes && self.contains(hash)
361 }
362
363 fn update_metrics_for_block(&self, height: block::Height) {
365 metrics::counter!("state.memory.sent.block.count").increment(1);
366 metrics::gauge!("state.memory.sent.block.height").set(height.0 as f64);
367
368 self.update_metrics_for_cache();
369 }
370
371 fn update_metrics_for_cache(&self) {
373 let batch_iter = || self.bufs.iter().chain(iter::once(&self.curr_buf));
374
375 if let Some(min_height) = batch_iter()
376 .flat_map(|batch| batch.front().map(|(_hash, height)| height))
377 .min()
378 {
379 metrics::gauge!("state.memory.sent.cache.min.height").set(min_height.0 as f64);
380 } else {
381 metrics::gauge!("state.memory.sent.cache.min.height").set(f64::NAN);
383 }
384
385 if let Some(max_height) = batch_iter()
386 .flat_map(|batch| batch.back().map(|(_hash, height)| height))
387 .max()
388 {
389 metrics::gauge!("state.memory.sent.cache.max.height").set(max_height.0 as f64);
390 } else {
391 metrics::gauge!("state.memory.sent.cache.max.height").set(f64::NAN);
393 }
394
395 metrics::gauge!("state.memory.sent.cache.block.count")
396 .set(batch_iter().flatten().count() as f64);
397
398 metrics::gauge!("state.memory.sent.cache.batch.count").set(batch_iter().count() as f64);
399 }
400}