zebra_state/service/chain_tip.rs
1//! Access to Zebra chain tip information.
2//!
3//! Zebra has 3 different interfaces for access to chain tip information:
4//! * [zebra_state::Request](crate::request): [tower::Service] requests about chain state,
5//! * [LatestChainTip] for efficient access to the current best tip, and
6//! * [ChainTipChange] to `await` specific changes to the chain tip.
7
8use std::{fmt, sync::Arc};
9
10use chrono::{DateTime, Utc};
11use futures::TryFutureExt;
12use tokio::sync::watch;
13use tracing::{field, instrument};
14
15use zebra_chain::{
16 block,
17 chain_tip::{BestTipChanged, ChainTip},
18 parameters::{Network, NetworkUpgrade},
19 transaction::{self, Transaction},
20};
21
22use crate::{
23 request::ContextuallyVerifiedBlock, service::watch_receiver::WatchReceiver,
24 CheckpointVerifiedBlock, SemanticallyVerifiedBlock,
25};
26
27use TipAction::*;
28
29#[cfg(any(test, feature = "proptest-impl"))]
30use proptest_derive::Arbitrary;
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use zebra_chain::serialization::arbitrary::datetime_full;
34
35#[cfg(test)]
36mod tests;
37
38/// The internal watch channel data type for [`ChainTipSender`], [`LatestChainTip`],
39/// and [`ChainTipChange`].
40type ChainTipData = Option<ChainTipBlock>;
41
42/// A chain tip block, with precalculated block data.
43///
44/// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`],
45/// and [`ChainTipChange`].
46#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
48pub struct ChainTipBlock {
49 /// The hash of the best chain tip block.
50 pub hash: block::Hash,
51
52 /// The height of the best chain tip block.
53 pub height: block::Height,
54
55 /// The network block time of the best chain tip block.
56 #[cfg_attr(
57 any(test, feature = "proptest-impl"),
58 proptest(strategy = "datetime_full()")
59 )]
60 pub time: DateTime<Utc>,
61
62 /// The block transactions.
63 pub transactions: Vec<Arc<Transaction>>,
64
65 /// The mined transaction IDs of the transactions in `block`,
66 /// in the same order as `block.transactions`.
67 pub transaction_hashes: Arc<[transaction::Hash]>,
68
69 /// The hash of the previous block in the best chain.
70 /// This block is immediately behind the best chain tip.
71 ///
72 /// ## Note
73 ///
74 /// If the best chain fork has changed, or some blocks have been skipped,
75 /// this hash will be different to the last returned `ChainTipBlock.hash`.
76 pub previous_block_hash: block::Hash,
77}
78
79impl fmt::Display for ChainTipBlock {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 f.debug_struct("ChainTipBlock")
82 .field("height", &self.height)
83 .field("hash", &self.hash)
84 .field("transactions", &self.transactions.len())
85 .finish()
86 }
87}
88
89impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
90 fn from(contextually_valid: ContextuallyVerifiedBlock) -> Self {
91 let ContextuallyVerifiedBlock {
92 block,
93 hash,
94 height,
95 transaction_hashes,
96 ..
97 } = contextually_valid;
98
99 Self {
100 hash,
101 height,
102 time: block.header.time,
103 transactions: block.transactions.clone(),
104 transaction_hashes,
105 previous_block_hash: block.header.previous_block_hash,
106 }
107 }
108}
109
110impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
111 fn from(prepared: SemanticallyVerifiedBlock) -> Self {
112 let SemanticallyVerifiedBlock {
113 block,
114 hash,
115 height,
116 new_outputs: _,
117 transaction_hashes,
118 deferred_balance: _,
119 } = prepared;
120
121 Self {
122 hash,
123 height,
124 time: block.header.time,
125 transactions: block.transactions.clone(),
126 transaction_hashes,
127 previous_block_hash: block.header.previous_block_hash,
128 }
129 }
130}
131
132impl From<CheckpointVerifiedBlock> for ChainTipBlock {
133 fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
134 prepared.into()
135 }
136}
137
138/// A sender for changes to the non-finalized and finalized chain tips.
139#[derive(Debug)]
140pub struct ChainTipSender {
141 /// Have we got any chain tips from the non-finalized state?
142 ///
143 /// Once this flag is set, we ignore the finalized state.
144 /// `None` tips don't set this flag.
145 use_non_finalized_tip: bool,
146
147 /// The sender channel for chain tip data.
148 sender: watch::Sender<ChainTipData>,
149}
150
151impl ChainTipSender {
152 /// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`],
153 /// using an `initial_tip` and a [`Network`].
154 #[instrument(skip(initial_tip), fields(new_height, new_hash))]
155 pub fn new(
156 initial_tip: impl Into<Option<ChainTipBlock>>,
157 network: &Network,
158 ) -> (Self, LatestChainTip, ChainTipChange) {
159 let initial_tip = initial_tip.into();
160 Self::record_new_tip(&initial_tip);
161
162 let (sender, receiver) = watch::channel(None);
163
164 let mut sender = ChainTipSender {
165 use_non_finalized_tip: false,
166 sender,
167 };
168
169 let current = LatestChainTip::new(receiver);
170 let change = ChainTipChange::new(current.clone(), network);
171
172 sender.update(initial_tip);
173
174 (sender, current, change)
175 }
176
177 /// Update the latest finalized tip.
178 ///
179 /// May trigger an update to the best tip.
180 #[instrument(
181 skip(self, new_tip),
182 fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
183 )]
184 pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
185 let new_tip = new_tip.into();
186 self.record_fields(&new_tip);
187
188 if !self.use_non_finalized_tip {
189 self.update(new_tip);
190 }
191 }
192
193 /// Update the latest non-finalized tip.
194 ///
195 /// May trigger an update to the best tip.
196 #[instrument(
197 skip(self, new_tip),
198 fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
199 )]
200 pub fn set_best_non_finalized_tip(
201 &mut self,
202 new_tip: impl Into<Option<ChainTipBlock>> + Clone,
203 ) {
204 let new_tip = new_tip.into();
205 self.record_fields(&new_tip);
206
207 // once the non-finalized state becomes active, it is always populated
208 // but ignoring `None`s makes the tests easier
209 if new_tip.is_some() {
210 self.use_non_finalized_tip = true;
211 self.update(new_tip)
212 }
213 }
214
215 /// Possibly send an update to listeners.
216 ///
217 /// An update is only sent if the current best tip is different from the last best tip
218 /// that was sent.
219 fn update(&mut self, new_tip: Option<ChainTipBlock>) {
220 // Correctness: the `self.sender.borrow()` must not be placed in a `let` binding to prevent
221 // a read-lock being created and living beyond the `self.sender.send(..)` call. If that
222 // happens, the `send` method will attempt to obtain a write-lock and will dead-lock.
223 // Without the binding, the guard is dropped at the end of the expression.
224 let active_hash = self
225 .sender
226 .borrow()
227 .as_ref()
228 .map(|active_value| active_value.hash);
229
230 let needs_update = match (new_tip.as_ref(), active_hash) {
231 // since the blocks have been contextually validated,
232 // we know their hashes cover all the block data
233 (Some(new_tip), Some(active_hash)) => new_tip.hash != active_hash,
234 (Some(_new_tip), None) => true,
235 (None, _active_value_hash) => false,
236 };
237
238 if needs_update {
239 let _ = self.sender.send(new_tip);
240 }
241 }
242
243 /// Record `new_tip` in the current span.
244 ///
245 /// Callers should create a new span with empty `new_height` and `new_hash` fields.
246 fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
247 Self::record_tip(&tracing::Span::current(), "new", new_tip);
248 }
249
250 /// Record `new_tip` and the fields from `self` in the current span.
251 ///
252 /// The fields recorded are:
253 ///
254 /// - `new_height`
255 /// - `new_hash`
256 /// - `old_height`
257 /// - `old_hash`
258 /// - `old_use_non_finalized_tip`
259 ///
260 /// Callers should create a new span with the empty fields described above.
261 fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
262 let span = tracing::Span::current();
263
264 let old_tip = &*self.sender.borrow();
265
266 Self::record_tip(&span, "new", new_tip);
267 Self::record_tip(&span, "old", old_tip);
268
269 span.record(
270 "old_use_non_finalized_tip",
271 field::debug(self.use_non_finalized_tip),
272 );
273 }
274
275 /// Record `tip` into `span` using the `prefix` to name the fields.
276 ///
277 /// Callers should create a new span with empty `{prefix}_height` and `{prefix}_hash` fields.
278 fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
279 let height = tip.as_ref().map(|block| block.height);
280 let hash = tip.as_ref().map(|block| block.hash);
281
282 span.record(format!("{prefix}_height").as_str(), field::debug(height));
283 span.record(format!("{prefix}_hash").as_str(), field::debug(hash));
284 }
285}
286
287/// Efficient access to the state's current best chain tip.
288///
289/// Each method returns data from the latest tip,
290/// regardless of how many times you call it.
291///
292/// Cloned instances provide identical tip data.
293///
294/// The chain tip data is based on:
295/// * the best non-finalized chain tip, if available, or
296/// * the finalized tip.
297///
298/// ## Note
299///
300/// If a lot of blocks are committed at the same time,
301/// the latest tip will skip some blocks in the chain.
302#[derive(Clone, Debug)]
303pub struct LatestChainTip {
304 /// The receiver for the current chain tip's data.
305 receiver: WatchReceiver<ChainTipData>,
306}
307
308impl LatestChainTip {
309 /// Create a new [`LatestChainTip`] from a watch channel receiver.
310 fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
311 Self {
312 receiver: WatchReceiver::new(receiver),
313 }
314 }
315
316 /// Maps the current data `ChainTipData` to `Option<U>`
317 /// by applying a function to the watched value,
318 /// while holding the receiver lock as briefly as possible.
319 ///
320 /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
321 /// extract some information from it, while also adding the current chain tip block's fields as
322 /// records to the current span.
323 ///
324 /// A single read lock is acquired to clone `T`, and then released after the clone.
325 /// See the performance note on [`WatchReceiver::with_watch_data`].
326 ///
327 /// Does not mark the watched data as seen.
328 ///
329 /// # Correctness
330 ///
331 /// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
332 fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
333 where
334 F: FnOnce(&ChainTipBlock) -> U,
335 {
336 let span = tracing::Span::current();
337
338 let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
339 span.record(
340 "height",
341 tracing::field::debug(chain_tip_block.map(|block| block.height)),
342 );
343 span.record(
344 "hash",
345 tracing::field::debug(chain_tip_block.map(|block| block.hash)),
346 );
347 span.record(
348 "time",
349 tracing::field::debug(chain_tip_block.map(|block| block.time)),
350 );
351 span.record(
352 "previous_hash",
353 tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
354 );
355 span.record(
356 "transaction_count",
357 tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
358 );
359 };
360
361 self.receiver.with_watch_data(|chain_tip_block| {
362 // TODO: replace with Option::inspect when it stabilises
363 // https://github.com/rust-lang/rust/issues/91345
364 register_span_fields(chain_tip_block.as_ref());
365
366 chain_tip_block.as_ref().map(f)
367 })
368 }
369}
370
371impl ChainTip for LatestChainTip {
372 #[instrument(skip(self))]
373 fn best_tip_height(&self) -> Option<block::Height> {
374 self.with_chain_tip_block(|block| block.height)
375 }
376
377 #[instrument(skip(self))]
378 fn best_tip_hash(&self) -> Option<block::Hash> {
379 self.with_chain_tip_block(|block| block.hash)
380 }
381
382 #[instrument(skip(self))]
383 fn best_tip_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
384 self.with_chain_tip_block(|block| (block.height, block.hash))
385 }
386
387 #[instrument(skip(self))]
388 fn best_tip_block_time(&self) -> Option<DateTime<Utc>> {
389 self.with_chain_tip_block(|block| block.time)
390 }
391
392 #[instrument(skip(self))]
393 fn best_tip_height_and_block_time(&self) -> Option<(block::Height, DateTime<Utc>)> {
394 self.with_chain_tip_block(|block| (block.height, block.time))
395 }
396
397 #[instrument(skip(self))]
398 fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
399 self.with_chain_tip_block(|block| block.transaction_hashes.clone())
400 .unwrap_or_else(|| Arc::new([]))
401 }
402
403 /// Returns when the state tip changes.
404 ///
405 /// Marks the state tip as seen when the returned future completes.
406 #[instrument(skip(self))]
407 fn best_tip_changed(&mut self) -> BestTipChanged {
408 BestTipChanged::new(self.receiver.changed().err_into())
409 }
410
411 /// Mark the current best state tip as seen.
412 fn mark_best_tip_seen(&mut self) {
413 self.receiver.mark_as_seen();
414 }
415}
416
417/// A chain tip change monitor.
418///
419/// Awaits changes and resets of the state's best chain tip,
420/// returning the latest [`TipAction`] once the state is updated.
421///
422/// Each cloned instance separately tracks the last block data it provided. If
423/// the best chain fork has changed since the last tip change on that instance,
424/// it returns a [`Reset`].
425///
426/// The chain tip data is based on:
427/// * the best non-finalized chain tip, if available, or
428/// * the finalized tip.
429#[derive(Debug)]
430pub struct ChainTipChange {
431 /// The receiver for the current chain tip's data.
432 latest_chain_tip: LatestChainTip,
433
434 /// The most recent [`block::Hash`] provided by this instance.
435 ///
436 /// ## Note
437 ///
438 /// If the best chain fork has changed, or some blocks have been skipped,
439 /// this hash will be different to the last returned `ChainTipBlock.hash`.
440 last_change_hash: Option<block::Hash>,
441
442 /// The network for the chain tip.
443 network: Network,
444}
445
446/// Actions that we can take in response to a [`ChainTipChange`].
447#[derive(Clone, Debug, PartialEq, Eq)]
448pub enum TipAction {
449 /// The chain tip was updated continuously,
450 /// using a child `block` of the previous block.
451 ///
452 /// The genesis block action is a `Grow`.
453 Grow {
454 /// Information about the block used to grow the chain.
455 block: ChainTipBlock,
456 },
457
458 /// The chain tip was reset to a block with `height` and `hash`.
459 ///
460 /// Resets can happen for different reasons:
461 /// - a newly created or cloned [`ChainTipChange`], which is behind the
462 /// current tip,
463 /// - extending the chain with a network upgrade activation block,
464 /// - switching to a different best [`Chain`][1], also known as a rollback, and
465 /// - receiving multiple blocks since the previous change.
466 ///
467 /// To keep the code and tests simple, Zebra performs the same reset
468 /// actions, regardless of the reset reason.
469 ///
470 /// `Reset`s do not have the transaction hashes from the tip block, because
471 /// all transactions should be cleared by a reset.
472 ///
473 /// [1]: super::non_finalized_state::Chain
474 Reset {
475 /// The block height of the tip, after the chain reset.
476 height: block::Height,
477
478 /// The block hash of the tip, after the chain reset.
479 ///
480 /// Mainly useful for logging and debugging.
481 hash: block::Hash,
482 },
483}
484
485impl ChainTipChange {
486 /// Wait until the tip has changed, then return the corresponding [`TipAction`].
487 ///
488 /// The returned action describes how the tip has changed
489 /// since the last call to this method.
490 ///
491 /// If there have been no changes since the last time this method was called,
492 /// it waits for the next tip change before returning.
493 ///
494 /// If there have been multiple changes since the last time this method was called,
495 /// they are combined into a single [`TipAction::Reset`].
496 ///
497 /// Returns an error if communication with the state is lost.
498 ///
499 /// ## Note
500 ///
501 /// If a lot of blocks are committed at the same time,
502 /// the change will skip some blocks, and return a [`Reset`].
503 #[instrument(
504 skip(self),
505 fields(
506 last_change_hash = ?self.last_change_hash,
507 network = ?self.network,
508 ))]
509 pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
510 let block = self.tip_block_change().await?;
511
512 let action = self.action(block.clone());
513
514 self.last_change_hash = Some(block.hash);
515
516 Ok(action)
517 }
518
519 /// Returns:
520 /// - `Some(`[`TipAction`]`)` if there has been a change since the last time the method was called.
521 /// - `None` if there has been no change.
522 ///
523 /// See [`Self::wait_for_tip_change`] for details.
524 #[instrument(
525 skip(self),
526 fields(
527 last_change_hash = ?self.last_change_hash,
528 network = ?self.network,
529 ))]
530 pub fn last_tip_change(&mut self) -> Option<TipAction> {
531 let block = self.latest_chain_tip.with_chain_tip_block(|block| {
532 if Some(block.hash) != self.last_change_hash {
533 Some(block.clone())
534 } else {
535 // Ignore an unchanged tip.
536 None
537 }
538 })??;
539
540 let block_hash = block.hash;
541 let tip_action = self.action(block);
542
543 self.last_change_hash = Some(block_hash);
544
545 Some(tip_action)
546 }
547
548 /// Return an action based on `block` and the last change we returned.
549 fn action(&self, block: ChainTipBlock) -> TipAction {
550 // check for an edge case that's dealt with by other code
551 assert!(
552 Some(block.hash) != self.last_change_hash,
553 "ChainTipSender and ChainTipChange ignore unchanged tips"
554 );
555
556 // If the previous block hash doesn't match, reset.
557 // We've either:
558 // - just initialized this instance,
559 // - changed the best chain to another fork (a rollback), or
560 // - skipped some blocks in the best chain.
561 //
562 // Consensus rules:
563 //
564 // > It is possible for a reorganization to occur
565 // > that rolls back from after the activation height, to before that height.
566 // > This can handled in the same way as any regular chain orphaning or reorganization,
567 // > as long as the new chain is valid.
568 //
569 // https://zips.z.cash/zip-0200#chain-reorganization
570
571 // If we're at a network upgrade activation block, reset.
572 //
573 // Consensus rules:
574 //
575 // > When the current chain tip height reaches ACTIVATION_HEIGHT,
576 // > the node's local transaction memory pool SHOULD be cleared of transactions
577 // > that will never be valid on the post-upgrade consensus branch.
578 //
579 // https://zips.z.cash/zip-0200#memory-pool
580 //
581 // Skipped blocks can include network upgrade activation blocks.
582 // Fork changes can activate or deactivate a network upgrade.
583 // So we must perform the same actions for network upgrades and skipped blocks.
584 if Some(block.previous_block_hash) != self.last_change_hash
585 || NetworkUpgrade::is_activation_height(&self.network, block.height)
586 {
587 TipAction::reset_with(block)
588 } else {
589 TipAction::grow_with(block)
590 }
591 }
592
593 /// Create a new [`ChainTipChange`] from a [`LatestChainTip`] receiver and [`Network`].
594 fn new(latest_chain_tip: LatestChainTip, network: &Network) -> Self {
595 Self {
596 latest_chain_tip,
597 last_change_hash: None,
598 network: network.clone(),
599 }
600 }
601
602 /// Wait until the next chain tip change, then return the corresponding [`ChainTipBlock`].
603 ///
604 /// Returns an error if communication with the state is lost.
605 async fn tip_block_change(&mut self) -> Result<ChainTipBlock, watch::error::RecvError> {
606 loop {
607 // If there are multiple changes while this code is executing,
608 // we don't rely on getting the first block or the latest block
609 // after the change notification.
610 // Any block update after the change will do,
611 // we'll catch up with the tip after the next change.
612 self.latest_chain_tip.receiver.changed().await?;
613
614 // Wait until we have a new block
615 //
616 // last_tip_change() updates last_change_hash, but it doesn't call receiver.changed().
617 // So code that uses both sync and async methods can have spurious pending changes.
618 //
619 // TODO: use `receiver.borrow_and_update()` in `with_chain_tip_block()`,
620 // once we upgrade to tokio 1.0 (#2200)
621 // and remove this extra check
622 let new_block = self
623 .latest_chain_tip
624 .with_chain_tip_block(|block| {
625 if Some(block.hash) != self.last_change_hash {
626 Some(block.clone())
627 } else {
628 None
629 }
630 })
631 .flatten();
632
633 if let Some(block) = new_block {
634 return Ok(block);
635 }
636 }
637 }
638
639 /// Returns the inner `LatestChainTip`.
640 pub fn latest_chain_tip(&self) -> LatestChainTip {
641 self.latest_chain_tip.clone()
642 }
643}
644
645impl Clone for ChainTipChange {
646 fn clone(&self) -> Self {
647 Self {
648 latest_chain_tip: self.latest_chain_tip.clone(),
649
650 // clear the previous change hash, so the first action is a reset
651 last_change_hash: None,
652
653 network: self.network.clone(),
654 }
655 }
656}
657
658impl TipAction {
659 /// Is this tip action a [`Reset`]?
660 pub fn is_reset(&self) -> bool {
661 matches!(self, Reset { .. })
662 }
663
664 /// Returns the block hash of this tip action,
665 /// regardless of the underlying variant.
666 pub fn best_tip_hash(&self) -> block::Hash {
667 match self {
668 Grow { block } => block.hash,
669 Reset { hash, .. } => *hash,
670 }
671 }
672
673 /// Returns the block height of this tip action,
674 /// regardless of the underlying variant.
675 pub fn best_tip_height(&self) -> block::Height {
676 match self {
677 Grow { block } => block.height,
678 Reset { height, .. } => *height,
679 }
680 }
681
682 /// Returns the block hash and height of this tip action,
683 /// regardless of the underlying variant.
684 pub fn best_tip_hash_and_height(&self) -> (block::Hash, block::Height) {
685 match self {
686 Grow { block } => (block.hash, block.height),
687 Reset { hash, height } => (*hash, *height),
688 }
689 }
690
691 /// Returns a [`Grow`] based on `block`.
692 pub(crate) fn grow_with(block: ChainTipBlock) -> Self {
693 Grow { block }
694 }
695
696 /// Returns a [`Reset`] based on `block`.
697 pub(crate) fn reset_with(block: ChainTipBlock) -> Self {
698 Reset {
699 height: block.height,
700 hash: block.hash,
701 }
702 }
703
704 /// Converts this [`TipAction`] into a [`Reset`].
705 ///
706 /// Designed for use in tests.
707 #[cfg(test)]
708 pub(crate) fn into_reset(self) -> Self {
709 match self {
710 Grow { block } => Reset {
711 height: block.height,
712 hash: block.hash,
713 },
714 reset @ Reset { .. } => reset,
715 }
716 }
717}