zebra_state/service/chain_tip.rs
1//! Access to Zebra chain tip information.
2//!
3//! Zebra has 3 different interfaces for access to chain tip information:
4//! * [zebra_state::Request](crate::request): [tower::Service] requests about chain state,
5//! * [LatestChainTip] for efficient access to the current best tip, and
6//! * [ChainTipChange] to `await` specific changes to the chain tip.
7
8use std::{fmt, sync::Arc};
9
10use chrono::{DateTime, Utc};
11use futures::TryFutureExt;
12use tokio::sync::watch;
13use tracing::{field, instrument};
14
15use zebra_chain::{
16 block,
17 chain_tip::ChainTip,
18 parameters::{Network, NetworkUpgrade},
19 transaction::{self, Transaction},
20};
21
22use crate::{
23 request::ContextuallyVerifiedBlock, service::watch_receiver::WatchReceiver, BoxError,
24 CheckpointVerifiedBlock, SemanticallyVerifiedBlock,
25};
26
27use TipAction::*;
28
29#[cfg(any(test, feature = "proptest-impl"))]
30use proptest_derive::Arbitrary;
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use zebra_chain::serialization::arbitrary::datetime_full;
34
35#[cfg(test)]
36mod tests;
37
38/// The internal watch channel data type for [`ChainTipSender`], [`LatestChainTip`],
39/// and [`ChainTipChange`].
40type ChainTipData = Option<ChainTipBlock>;
41
42/// A chain tip block, with precalculated block data.
43///
44/// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`],
45/// and [`ChainTipChange`].
46#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
48pub struct ChainTipBlock {
49 /// The hash of the best chain tip block.
50 pub hash: block::Hash,
51
52 /// The height of the best chain tip block.
53 pub height: block::Height,
54
55 /// The network block time of the best chain tip block.
56 #[cfg_attr(
57 any(test, feature = "proptest-impl"),
58 proptest(strategy = "datetime_full()")
59 )]
60 pub time: DateTime<Utc>,
61
62 /// The block transactions.
63 pub transactions: Vec<Arc<Transaction>>,
64
65 /// The mined transaction IDs of the transactions in `block`,
66 /// in the same order as `block.transactions`.
67 pub transaction_hashes: Arc<[transaction::Hash]>,
68
69 /// The hash of the previous block in the best chain.
70 /// This block is immediately behind the best chain tip.
71 ///
72 /// ## Note
73 ///
74 /// If the best chain fork has changed, or some blocks have been skipped,
75 /// this hash will be different to the last returned `ChainTipBlock.hash`.
76 pub previous_block_hash: block::Hash,
77}
78
79impl fmt::Display for ChainTipBlock {
80 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81 f.debug_struct("ChainTipBlock")
82 .field("height", &self.height)
83 .field("hash", &self.hash)
84 .field("transactions", &self.transactions.len())
85 .finish()
86 }
87}
88
89impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
90 fn from(contextually_valid: ContextuallyVerifiedBlock) -> Self {
91 let ContextuallyVerifiedBlock {
92 block,
93 hash,
94 height,
95 transaction_hashes,
96 ..
97 } = contextually_valid;
98
99 Self {
100 hash,
101 height,
102 time: block.header.time,
103 transactions: block.transactions.clone(),
104 transaction_hashes,
105 previous_block_hash: block.header.previous_block_hash,
106 }
107 }
108}
109
110impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
111 fn from(prepared: SemanticallyVerifiedBlock) -> Self {
112 let SemanticallyVerifiedBlock {
113 block,
114 hash,
115 height,
116 new_outputs: _,
117 transaction_hashes,
118 deferred_balance: _,
119 } = prepared;
120
121 Self {
122 hash,
123 height,
124 time: block.header.time,
125 transactions: block.transactions.clone(),
126 transaction_hashes,
127 previous_block_hash: block.header.previous_block_hash,
128 }
129 }
130}
131
132impl From<CheckpointVerifiedBlock> for ChainTipBlock {
133 fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
134 prepared.into()
135 }
136}
137
138/// A sender for changes to the non-finalized and finalized chain tips.
139#[derive(Debug)]
140pub struct ChainTipSender {
141 /// Have we got any chain tips from the non-finalized state?
142 ///
143 /// Once this flag is set, we ignore the finalized state.
144 /// `None` tips don't set this flag.
145 use_non_finalized_tip: bool,
146
147 /// The sender channel for chain tip data.
148 sender: watch::Sender<ChainTipData>,
149}
150
151impl ChainTipSender {
152 /// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`],
153 /// using an `initial_tip` and a [`Network`].
154 #[instrument(skip(initial_tip), fields(new_height, new_hash))]
155 pub fn new(
156 initial_tip: impl Into<Option<ChainTipBlock>>,
157 network: &Network,
158 ) -> (Self, LatestChainTip, ChainTipChange) {
159 let initial_tip = initial_tip.into();
160 Self::record_new_tip(&initial_tip);
161
162 let (sender, receiver) = watch::channel(None);
163
164 let mut sender = ChainTipSender {
165 use_non_finalized_tip: false,
166 sender,
167 };
168
169 let current = LatestChainTip::new(receiver);
170 let change = ChainTipChange::new(current.clone(), network);
171
172 sender.update(initial_tip);
173
174 (sender, current, change)
175 }
176
177 /// Returns a clone of itself for sending finalized tip changes,
178 /// used by `TrustedChainSync` in `zebra-rpc`.
179 pub fn finalized_sender(&self) -> Self {
180 Self {
181 use_non_finalized_tip: false,
182 sender: self.sender.clone(),
183 }
184 }
185
186 /// Update the latest finalized tip.
187 ///
188 /// May trigger an update to the best tip.
189 #[instrument(
190 skip(self, new_tip),
191 fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
192 )]
193 pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
194 let new_tip = new_tip.into();
195 self.record_fields(&new_tip);
196
197 if !self.use_non_finalized_tip {
198 self.update(new_tip);
199 }
200 }
201
202 /// Update the latest non-finalized tip.
203 ///
204 /// May trigger an update to the best tip.
205 #[instrument(
206 skip(self, new_tip),
207 fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
208 )]
209 pub fn set_best_non_finalized_tip(
210 &mut self,
211 new_tip: impl Into<Option<ChainTipBlock>> + Clone,
212 ) {
213 let new_tip = new_tip.into();
214 self.record_fields(&new_tip);
215
216 // once the non-finalized state becomes active, it is always populated
217 // but ignoring `None`s makes the tests easier
218 if new_tip.is_some() {
219 self.use_non_finalized_tip = true;
220 self.update(new_tip)
221 }
222 }
223
224 /// Possibly send an update to listeners.
225 ///
226 /// An update is only sent if the current best tip is different from the last best tip
227 /// that was sent.
228 fn update(&mut self, new_tip: Option<ChainTipBlock>) {
229 // Correctness: the `self.sender.borrow()` must not be placed in a `let` binding to prevent
230 // a read-lock being created and living beyond the `self.sender.send(..)` call. If that
231 // happens, the `send` method will attempt to obtain a write-lock and will dead-lock.
232 // Without the binding, the guard is dropped at the end of the expression.
233 let active_hash = self
234 .sender
235 .borrow()
236 .as_ref()
237 .map(|active_value| active_value.hash);
238
239 let needs_update = match (new_tip.as_ref(), active_hash) {
240 // since the blocks have been contextually validated,
241 // we know their hashes cover all the block data
242 (Some(new_tip), Some(active_hash)) => new_tip.hash != active_hash,
243 (Some(_new_tip), None) => true,
244 (None, _active_value_hash) => false,
245 };
246
247 if needs_update {
248 let _ = self.sender.send(new_tip);
249 }
250 }
251
252 /// Record `new_tip` in the current span.
253 ///
254 /// Callers should create a new span with empty `new_height` and `new_hash` fields.
255 fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
256 Self::record_tip(&tracing::Span::current(), "new", new_tip);
257 }
258
259 /// Record `new_tip` and the fields from `self` in the current span.
260 ///
261 /// The fields recorded are:
262 ///
263 /// - `new_height`
264 /// - `new_hash`
265 /// - `old_height`
266 /// - `old_hash`
267 /// - `old_use_non_finalized_tip`
268 ///
269 /// Callers should create a new span with the empty fields described above.
270 fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
271 let span = tracing::Span::current();
272
273 let old_tip = &*self.sender.borrow();
274
275 Self::record_tip(&span, "new", new_tip);
276 Self::record_tip(&span, "old", old_tip);
277
278 span.record(
279 "old_use_non_finalized_tip",
280 field::debug(self.use_non_finalized_tip),
281 );
282 }
283
284 /// Record `tip` into `span` using the `prefix` to name the fields.
285 ///
286 /// Callers should create a new span with empty `{prefix}_height` and `{prefix}_hash` fields.
287 fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
288 let height = tip.as_ref().map(|block| block.height);
289 let hash = tip.as_ref().map(|block| block.hash);
290
291 span.record(format!("{prefix}_height").as_str(), field::debug(height));
292 span.record(format!("{prefix}_hash").as_str(), field::debug(hash));
293 }
294}
295
296/// Efficient access to the state's current best chain tip.
297///
298/// Each method returns data from the latest tip,
299/// regardless of how many times you call it.
300///
301/// Cloned instances provide identical tip data.
302///
303/// The chain tip data is based on:
304/// * the best non-finalized chain tip, if available, or
305/// * the finalized tip.
306///
307/// ## Note
308///
309/// If a lot of blocks are committed at the same time,
310/// the latest tip will skip some blocks in the chain.
311#[derive(Clone, Debug)]
312pub struct LatestChainTip {
313 /// The receiver for the current chain tip's data.
314 receiver: WatchReceiver<ChainTipData>,
315}
316
317impl LatestChainTip {
318 /// Create a new [`LatestChainTip`] from a watch channel receiver.
319 fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
320 Self {
321 receiver: WatchReceiver::new(receiver),
322 }
323 }
324
325 /// Maps the current data `ChainTipData` to `Option<U>`
326 /// by applying a function to the watched value,
327 /// while holding the receiver lock as briefly as possible.
328 ///
329 /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
330 /// extract some information from it, while also adding the current chain tip block's fields as
331 /// records to the current span.
332 ///
333 /// A single read lock is acquired to clone `T`, and then released after the clone.
334 /// See the performance note on [`WatchReceiver::with_watch_data`].
335 ///
336 /// Does not mark the watched data as seen.
337 ///
338 /// # Correctness
339 ///
340 /// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
341 fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
342 where
343 F: FnOnce(&ChainTipBlock) -> U,
344 {
345 let span = tracing::Span::current();
346
347 let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
348 span.record(
349 "height",
350 tracing::field::debug(chain_tip_block.map(|block| block.height)),
351 );
352 span.record(
353 "hash",
354 tracing::field::debug(chain_tip_block.map(|block| block.hash)),
355 );
356 span.record(
357 "time",
358 tracing::field::debug(chain_tip_block.map(|block| block.time)),
359 );
360 span.record(
361 "previous_hash",
362 tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
363 );
364 span.record(
365 "transaction_count",
366 tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
367 );
368 };
369
370 self.receiver.with_watch_data(|chain_tip_block| {
371 // TODO: replace with Option::inspect when it stabilises
372 // https://github.com/rust-lang/rust/issues/91345
373 register_span_fields(chain_tip_block.as_ref());
374
375 chain_tip_block.as_ref().map(f)
376 })
377 }
378}
379
380impl ChainTip for LatestChainTip {
381 #[instrument(skip(self))]
382 fn best_tip_height(&self) -> Option<block::Height> {
383 self.with_chain_tip_block(|block| block.height)
384 }
385
386 #[instrument(skip(self))]
387 fn best_tip_hash(&self) -> Option<block::Hash> {
388 self.with_chain_tip_block(|block| block.hash)
389 }
390
391 #[instrument(skip(self))]
392 fn best_tip_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
393 self.with_chain_tip_block(|block| (block.height, block.hash))
394 }
395
396 #[instrument(skip(self))]
397 fn best_tip_block_time(&self) -> Option<DateTime<Utc>> {
398 self.with_chain_tip_block(|block| block.time)
399 }
400
401 #[instrument(skip(self))]
402 fn best_tip_height_and_block_time(&self) -> Option<(block::Height, DateTime<Utc>)> {
403 self.with_chain_tip_block(|block| (block.height, block.time))
404 }
405
406 #[instrument(skip(self))]
407 fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
408 self.with_chain_tip_block(|block| block.transaction_hashes.clone())
409 .unwrap_or_else(|| Arc::new([]))
410 }
411
412 /// Returns when the state tip changes.
413 ///
414 /// Marks the state tip as seen when the returned future completes.
415 #[instrument(skip(self))]
416 async fn best_tip_changed(&mut self) -> Result<(), BoxError> {
417 self.receiver.changed().err_into().await
418 }
419
420 /// Mark the current best state tip as seen.
421 fn mark_best_tip_seen(&mut self) {
422 self.receiver.mark_as_seen();
423 }
424}
425
426/// A chain tip change monitor.
427///
428/// Awaits changes and resets of the state's best chain tip,
429/// returning the latest [`TipAction`] once the state is updated.
430///
431/// Each cloned instance separately tracks the last block data it provided. If
432/// the best chain fork has changed since the last tip change on that instance,
433/// it returns a [`Reset`].
434///
435/// The chain tip data is based on:
436/// * the best non-finalized chain tip, if available, or
437/// * the finalized tip.
438#[derive(Debug)]
439pub struct ChainTipChange {
440 /// The receiver for the current chain tip's data.
441 latest_chain_tip: LatestChainTip,
442
443 /// The most recent [`block::Hash`] provided by this instance.
444 ///
445 /// ## Note
446 ///
447 /// If the best chain fork has changed, or some blocks have been skipped,
448 /// this hash will be different to the last returned `ChainTipBlock.hash`.
449 last_change_hash: Option<block::Hash>,
450
451 /// The network for the chain tip.
452 network: Network,
453}
454
455/// Actions that we can take in response to a [`ChainTipChange`].
456#[derive(Clone, Debug, PartialEq, Eq)]
457pub enum TipAction {
458 /// The chain tip was updated continuously,
459 /// using a child `block` of the previous block.
460 ///
461 /// The genesis block action is a `Grow`.
462 Grow {
463 /// Information about the block used to grow the chain.
464 block: ChainTipBlock,
465 },
466
467 /// The chain tip was reset to a block with `height` and `hash`.
468 ///
469 /// Resets can happen for different reasons:
470 /// - a newly created or cloned [`ChainTipChange`], which is behind the
471 /// current tip,
472 /// - extending the chain with a network upgrade activation block,
473 /// - switching to a different best [`Chain`][1], also known as a rollback, and
474 /// - receiving multiple blocks since the previous change.
475 ///
476 /// To keep the code and tests simple, Zebra performs the same reset
477 /// actions, regardless of the reset reason.
478 ///
479 /// `Reset`s do not have the transaction hashes from the tip block, because
480 /// all transactions should be cleared by a reset.
481 ///
482 /// [1]: super::non_finalized_state::Chain
483 Reset {
484 /// The block height of the tip, after the chain reset.
485 height: block::Height,
486
487 /// The block hash of the tip, after the chain reset.
488 ///
489 /// Mainly useful for logging and debugging.
490 hash: block::Hash,
491 },
492}
493
494impl ChainTipChange {
495 /// Wait until the tip has changed, then return the corresponding [`TipAction`].
496 ///
497 /// The returned action describes how the tip has changed
498 /// since the last call to this method.
499 ///
500 /// If there have been no changes since the last time this method was called,
501 /// it waits for the next tip change before returning.
502 ///
503 /// If there have been multiple changes since the last time this method was called,
504 /// they are combined into a single [`TipAction::Reset`].
505 ///
506 /// Returns an error if communication with the state is lost.
507 ///
508 /// ## Note
509 ///
510 /// If a lot of blocks are committed at the same time,
511 /// the change will skip some blocks, and return a [`Reset`].
512 #[instrument(
513 skip(self),
514 fields(
515 last_change_hash = ?self.last_change_hash,
516 network = ?self.network,
517 ))]
518 pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
519 let block = self.tip_block_change().await?;
520
521 let action = self.action(block.clone());
522
523 self.last_change_hash = Some(block.hash);
524
525 Ok(action)
526 }
527
528 /// Returns:
529 /// - `Some(`[`TipAction`]`)` if there has been a change since the last time the method was called.
530 /// - `None` if there has been no change.
531 ///
532 /// See [`Self::wait_for_tip_change`] for details.
533 #[instrument(
534 skip(self),
535 fields(
536 last_change_hash = ?self.last_change_hash,
537 network = ?self.network,
538 ))]
539 pub fn last_tip_change(&mut self) -> Option<TipAction> {
540 let block = self.latest_chain_tip.with_chain_tip_block(|block| {
541 if Some(block.hash) != self.last_change_hash {
542 Some(block.clone())
543 } else {
544 // Ignore an unchanged tip.
545 None
546 }
547 })??;
548
549 let block_hash = block.hash;
550 let tip_action = self.action(block);
551
552 self.last_change_hash = Some(block_hash);
553
554 Some(tip_action)
555 }
556
557 /// Return an action based on `block` and the last change we returned.
558 fn action(&self, block: ChainTipBlock) -> TipAction {
559 // check for an edge case that's dealt with by other code
560 assert!(
561 Some(block.hash) != self.last_change_hash,
562 "ChainTipSender and ChainTipChange ignore unchanged tips"
563 );
564
565 // If the previous block hash doesn't match, reset.
566 // We've either:
567 // - just initialized this instance,
568 // - changed the best chain to another fork (a rollback), or
569 // - skipped some blocks in the best chain.
570 //
571 // Consensus rules:
572 //
573 // > It is possible for a reorganization to occur
574 // > that rolls back from after the activation height, to before that height.
575 // > This can handled in the same way as any regular chain orphaning or reorganization,
576 // > as long as the new chain is valid.
577 //
578 // https://zips.z.cash/zip-0200#chain-reorganization
579
580 // If we're at a network upgrade activation block, reset.
581 //
582 // Consensus rules:
583 //
584 // > When the current chain tip height reaches ACTIVATION_HEIGHT,
585 // > the node's local transaction memory pool SHOULD be cleared of transactions
586 // > that will never be valid on the post-upgrade consensus branch.
587 //
588 // https://zips.z.cash/zip-0200#memory-pool
589 //
590 // Skipped blocks can include network upgrade activation blocks.
591 // Fork changes can activate or deactivate a network upgrade.
592 // So we must perform the same actions for network upgrades and skipped blocks.
593 if Some(block.previous_block_hash) != self.last_change_hash
594 || NetworkUpgrade::is_activation_height(&self.network, block.height)
595 {
596 TipAction::reset_with(block)
597 } else {
598 TipAction::grow_with(block)
599 }
600 }
601
602 /// Create a new [`ChainTipChange`] from a [`LatestChainTip`] receiver and [`Network`].
603 fn new(latest_chain_tip: LatestChainTip, network: &Network) -> Self {
604 Self {
605 latest_chain_tip,
606 last_change_hash: None,
607 network: network.clone(),
608 }
609 }
610
611 /// Wait until the next chain tip change, then return the corresponding [`ChainTipBlock`].
612 ///
613 /// Returns an error if communication with the state is lost.
614 async fn tip_block_change(&mut self) -> Result<ChainTipBlock, watch::error::RecvError> {
615 loop {
616 // If there are multiple changes while this code is executing,
617 // we don't rely on getting the first block or the latest block
618 // after the change notification.
619 // Any block update after the change will do,
620 // we'll catch up with the tip after the next change.
621 self.latest_chain_tip.receiver.changed().await?;
622
623 // Wait until we have a new block
624 //
625 // last_tip_change() updates last_change_hash, but it doesn't call receiver.changed().
626 // So code that uses both sync and async methods can have spurious pending changes.
627 //
628 // TODO: use `receiver.borrow_and_update()` in `with_chain_tip_block()`,
629 // once we upgrade to tokio 1.0 (#2200)
630 // and remove this extra check
631 let new_block = self
632 .latest_chain_tip
633 .with_chain_tip_block(|block| {
634 if Some(block.hash) != self.last_change_hash {
635 Some(block.clone())
636 } else {
637 None
638 }
639 })
640 .flatten();
641
642 if let Some(block) = new_block {
643 return Ok(block);
644 }
645 }
646 }
647
648 /// Returns the inner `LatestChainTip`.
649 pub fn latest_chain_tip(&self) -> LatestChainTip {
650 self.latest_chain_tip.clone()
651 }
652}
653
654impl Clone for ChainTipChange {
655 fn clone(&self) -> Self {
656 Self {
657 latest_chain_tip: self.latest_chain_tip.clone(),
658
659 // clear the previous change hash, so the first action is a reset
660 last_change_hash: None,
661
662 network: self.network.clone(),
663 }
664 }
665}
666
667impl TipAction {
668 /// Is this tip action a [`Reset`]?
669 pub fn is_reset(&self) -> bool {
670 matches!(self, Reset { .. })
671 }
672
673 /// Returns the block hash of this tip action,
674 /// regardless of the underlying variant.
675 pub fn best_tip_hash(&self) -> block::Hash {
676 match self {
677 Grow { block } => block.hash,
678 Reset { hash, .. } => *hash,
679 }
680 }
681
682 /// Returns the block height of this tip action,
683 /// regardless of the underlying variant.
684 pub fn best_tip_height(&self) -> block::Height {
685 match self {
686 Grow { block } => block.height,
687 Reset { height, .. } => *height,
688 }
689 }
690
691 /// Returns the block hash and height of this tip action,
692 /// regardless of the underlying variant.
693 pub fn best_tip_hash_and_height(&self) -> (block::Hash, block::Height) {
694 match self {
695 Grow { block } => (block.hash, block.height),
696 Reset { hash, height } => (*hash, *height),
697 }
698 }
699
700 /// Returns a [`Grow`] based on `block`.
701 pub(crate) fn grow_with(block: ChainTipBlock) -> Self {
702 Grow { block }
703 }
704
705 /// Returns a [`Reset`] based on `block`.
706 pub(crate) fn reset_with(block: ChainTipBlock) -> Self {
707 Reset {
708 height: block.height,
709 hash: block.hash,
710 }
711 }
712
713 /// Converts this [`TipAction`] into a [`Reset`].
714 ///
715 /// Designed for use in tests.
716 #[cfg(test)]
717 pub(crate) fn into_reset(self) -> Self {
718 match self {
719 Grow { block } => Reset {
720 height: block.height,
721 hash: block.hash,
722 },
723 reset @ Reset { .. } => reset,
724 }
725 }
726}