zebra_state/service/
chain_tip.rs

1//! Access to Zebra chain tip information.
2//!
3//! Zebra has 3 different interfaces for access to chain tip information:
4//! * [zebra_state::Request](crate::request): [tower::Service] requests about chain state,
5//! * [LatestChainTip] for efficient access to the current best tip, and
6//! * [ChainTipChange] to `await` specific changes to the chain tip.
7
8use std::{fmt, sync::Arc};
9
10use chrono::{DateTime, Utc};
11use futures::TryFutureExt;
12use tokio::sync::watch;
13use tracing::{field, instrument};
14
15use zebra_chain::{
16    block,
17    chain_tip::{BestTipChanged, ChainTip},
18    parameters::{Network, NetworkUpgrade},
19    transaction::{self, Transaction},
20};
21
22use crate::{
23    request::ContextuallyVerifiedBlock, service::watch_receiver::WatchReceiver,
24    CheckpointVerifiedBlock, SemanticallyVerifiedBlock,
25};
26
27use TipAction::*;
28
29#[cfg(any(test, feature = "proptest-impl"))]
30use proptest_derive::Arbitrary;
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use zebra_chain::serialization::arbitrary::datetime_full;
34
35#[cfg(test)]
36mod tests;
37
38/// The internal watch channel data type for [`ChainTipSender`], [`LatestChainTip`],
39/// and [`ChainTipChange`].
40type ChainTipData = Option<ChainTipBlock>;
41
42/// A chain tip block, with precalculated block data.
43///
44/// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`],
45/// and [`ChainTipChange`].
46#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
48pub struct ChainTipBlock {
49    /// The hash of the best chain tip block.
50    pub hash: block::Hash,
51
52    /// The height of the best chain tip block.
53    pub height: block::Height,
54
55    /// The network block time of the best chain tip block.
56    #[cfg_attr(
57        any(test, feature = "proptest-impl"),
58        proptest(strategy = "datetime_full()")
59    )]
60    pub time: DateTime<Utc>,
61
62    /// The block transactions.
63    pub transactions: Vec<Arc<Transaction>>,
64
65    /// The mined transaction IDs of the transactions in `block`,
66    /// in the same order as `block.transactions`.
67    pub transaction_hashes: Arc<[transaction::Hash]>,
68
69    /// The hash of the previous block in the best chain.
70    /// This block is immediately behind the best chain tip.
71    ///
72    /// ## Note
73    ///
74    /// If the best chain fork has changed, or some blocks have been skipped,
75    /// this hash will be different to the last returned `ChainTipBlock.hash`.
76    pub previous_block_hash: block::Hash,
77}
78
79impl fmt::Display for ChainTipBlock {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        f.debug_struct("ChainTipBlock")
82            .field("height", &self.height)
83            .field("hash", &self.hash)
84            .field("transactions", &self.transactions.len())
85            .finish()
86    }
87}
88
89impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
90    fn from(contextually_valid: ContextuallyVerifiedBlock) -> Self {
91        let ContextuallyVerifiedBlock {
92            block,
93            hash,
94            height,
95            transaction_hashes,
96            ..
97        } = contextually_valid;
98
99        Self {
100            hash,
101            height,
102            time: block.header.time,
103            transactions: block.transactions.clone(),
104            transaction_hashes,
105            previous_block_hash: block.header.previous_block_hash,
106        }
107    }
108}
109
110impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
111    fn from(prepared: SemanticallyVerifiedBlock) -> Self {
112        let SemanticallyVerifiedBlock {
113            block,
114            hash,
115            height,
116            new_outputs: _,
117            transaction_hashes,
118            deferred_balance: _,
119        } = prepared;
120
121        Self {
122            hash,
123            height,
124            time: block.header.time,
125            transactions: block.transactions.clone(),
126            transaction_hashes,
127            previous_block_hash: block.header.previous_block_hash,
128        }
129    }
130}
131
132impl From<CheckpointVerifiedBlock> for ChainTipBlock {
133    fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
134        prepared.into()
135    }
136}
137
138/// A sender for changes to the non-finalized and finalized chain tips.
139#[derive(Debug)]
140pub struct ChainTipSender {
141    /// Have we got any chain tips from the non-finalized state?
142    ///
143    /// Once this flag is set, we ignore the finalized state.
144    /// `None` tips don't set this flag.
145    use_non_finalized_tip: bool,
146
147    /// The sender channel for chain tip data.
148    sender: watch::Sender<ChainTipData>,
149}
150
151impl ChainTipSender {
152    /// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`],
153    /// using an `initial_tip` and a [`Network`].
154    #[instrument(skip(initial_tip), fields(new_height, new_hash))]
155    pub fn new(
156        initial_tip: impl Into<Option<ChainTipBlock>>,
157        network: &Network,
158    ) -> (Self, LatestChainTip, ChainTipChange) {
159        let initial_tip = initial_tip.into();
160        Self::record_new_tip(&initial_tip);
161
162        let (sender, receiver) = watch::channel(None);
163
164        let mut sender = ChainTipSender {
165            use_non_finalized_tip: false,
166            sender,
167        };
168
169        let current = LatestChainTip::new(receiver);
170        let change = ChainTipChange::new(current.clone(), network);
171
172        sender.update(initial_tip);
173
174        (sender, current, change)
175    }
176
177    /// Update the latest finalized tip.
178    ///
179    /// May trigger an update to the best tip.
180    #[instrument(
181        skip(self, new_tip),
182        fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
183    )]
184    pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
185        let new_tip = new_tip.into();
186        self.record_fields(&new_tip);
187
188        if !self.use_non_finalized_tip {
189            self.update(new_tip);
190        }
191    }
192
193    /// Update the latest non-finalized tip.
194    ///
195    /// May trigger an update to the best tip.
196    #[instrument(
197        skip(self, new_tip),
198        fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
199    )]
200    pub fn set_best_non_finalized_tip(
201        &mut self,
202        new_tip: impl Into<Option<ChainTipBlock>> + Clone,
203    ) {
204        let new_tip = new_tip.into();
205        self.record_fields(&new_tip);
206
207        // once the non-finalized state becomes active, it is always populated
208        // but ignoring `None`s makes the tests easier
209        if new_tip.is_some() {
210            self.use_non_finalized_tip = true;
211            self.update(new_tip)
212        }
213    }
214
215    /// Possibly send an update to listeners.
216    ///
217    /// An update is only sent if the current best tip is different from the last best tip
218    /// that was sent.
219    fn update(&mut self, new_tip: Option<ChainTipBlock>) {
220        // Correctness: the `self.sender.borrow()` must not be placed in a `let` binding to prevent
221        // a read-lock being created and living beyond the `self.sender.send(..)` call. If that
222        // happens, the `send` method will attempt to obtain a write-lock and will dead-lock.
223        // Without the binding, the guard is dropped at the end of the expression.
224        let active_hash = self
225            .sender
226            .borrow()
227            .as_ref()
228            .map(|active_value| active_value.hash);
229
230        let needs_update = match (new_tip.as_ref(), active_hash) {
231            // since the blocks have been contextually validated,
232            // we know their hashes cover all the block data
233            (Some(new_tip), Some(active_hash)) => new_tip.hash != active_hash,
234            (Some(_new_tip), None) => true,
235            (None, _active_value_hash) => false,
236        };
237
238        if needs_update {
239            let _ = self.sender.send(new_tip);
240        }
241    }
242
243    /// Record `new_tip` in the current span.
244    ///
245    /// Callers should create a new span with empty `new_height` and `new_hash` fields.
246    fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
247        Self::record_tip(&tracing::Span::current(), "new", new_tip);
248    }
249
250    /// Record `new_tip` and the fields from `self` in the current span.
251    ///
252    /// The fields recorded are:
253    ///
254    /// - `new_height`
255    /// - `new_hash`
256    /// - `old_height`
257    /// - `old_hash`
258    /// - `old_use_non_finalized_tip`
259    ///
260    /// Callers should create a new span with the empty fields described above.
261    fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
262        let span = tracing::Span::current();
263
264        let old_tip = &*self.sender.borrow();
265
266        Self::record_tip(&span, "new", new_tip);
267        Self::record_tip(&span, "old", old_tip);
268
269        span.record(
270            "old_use_non_finalized_tip",
271            field::debug(self.use_non_finalized_tip),
272        );
273    }
274
275    /// Record `tip` into `span` using the `prefix` to name the fields.
276    ///
277    /// Callers should create a new span with empty `{prefix}_height` and `{prefix}_hash` fields.
278    fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
279        let height = tip.as_ref().map(|block| block.height);
280        let hash = tip.as_ref().map(|block| block.hash);
281
282        span.record(format!("{prefix}_height").as_str(), field::debug(height));
283        span.record(format!("{prefix}_hash").as_str(), field::debug(hash));
284    }
285}
286
287/// Efficient access to the state's current best chain tip.
288///
289/// Each method returns data from the latest tip,
290/// regardless of how many times you call it.
291///
292/// Cloned instances provide identical tip data.
293///
294/// The chain tip data is based on:
295/// * the best non-finalized chain tip, if available, or
296/// * the finalized tip.
297///
298/// ## Note
299///
300/// If a lot of blocks are committed at the same time,
301/// the latest tip will skip some blocks in the chain.
302#[derive(Clone, Debug)]
303pub struct LatestChainTip {
304    /// The receiver for the current chain tip's data.
305    receiver: WatchReceiver<ChainTipData>,
306}
307
308impl LatestChainTip {
309    /// Create a new [`LatestChainTip`] from a watch channel receiver.
310    fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
311        Self {
312            receiver: WatchReceiver::new(receiver),
313        }
314    }
315
316    /// Maps the current data `ChainTipData` to `Option<U>`
317    /// by applying a function to the watched value,
318    /// while holding the receiver lock as briefly as possible.
319    ///
320    /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
321    /// extract some information from it, while also adding the current chain tip block's fields as
322    /// records to the current span.
323    ///
324    /// A single read lock is acquired to clone `T`, and then released after the clone.
325    /// See the performance note on [`WatchReceiver::with_watch_data`].
326    ///
327    /// Does not mark the watched data as seen.
328    ///
329    /// # Correctness
330    ///
331    /// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
332    fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
333    where
334        F: FnOnce(&ChainTipBlock) -> U,
335    {
336        let span = tracing::Span::current();
337
338        let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
339            span.record(
340                "height",
341                tracing::field::debug(chain_tip_block.map(|block| block.height)),
342            );
343            span.record(
344                "hash",
345                tracing::field::debug(chain_tip_block.map(|block| block.hash)),
346            );
347            span.record(
348                "time",
349                tracing::field::debug(chain_tip_block.map(|block| block.time)),
350            );
351            span.record(
352                "previous_hash",
353                tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
354            );
355            span.record(
356                "transaction_count",
357                tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
358            );
359        };
360
361        self.receiver.with_watch_data(|chain_tip_block| {
362            // TODO: replace with Option::inspect when it stabilises
363            //       https://github.com/rust-lang/rust/issues/91345
364            register_span_fields(chain_tip_block.as_ref());
365
366            chain_tip_block.as_ref().map(f)
367        })
368    }
369}
370
371impl ChainTip for LatestChainTip {
372    #[instrument(skip(self))]
373    fn best_tip_height(&self) -> Option<block::Height> {
374        self.with_chain_tip_block(|block| block.height)
375    }
376
377    #[instrument(skip(self))]
378    fn best_tip_hash(&self) -> Option<block::Hash> {
379        self.with_chain_tip_block(|block| block.hash)
380    }
381
382    #[instrument(skip(self))]
383    fn best_tip_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
384        self.with_chain_tip_block(|block| (block.height, block.hash))
385    }
386
387    #[instrument(skip(self))]
388    fn best_tip_block_time(&self) -> Option<DateTime<Utc>> {
389        self.with_chain_tip_block(|block| block.time)
390    }
391
392    #[instrument(skip(self))]
393    fn best_tip_height_and_block_time(&self) -> Option<(block::Height, DateTime<Utc>)> {
394        self.with_chain_tip_block(|block| (block.height, block.time))
395    }
396
397    #[instrument(skip(self))]
398    fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
399        self.with_chain_tip_block(|block| block.transaction_hashes.clone())
400            .unwrap_or_else(|| Arc::new([]))
401    }
402
403    /// Returns when the state tip changes.
404    ///
405    /// Marks the state tip as seen when the returned future completes.
406    #[instrument(skip(self))]
407    fn best_tip_changed(&mut self) -> BestTipChanged {
408        BestTipChanged::new(self.receiver.changed().err_into())
409    }
410
411    /// Mark the current best state tip as seen.
412    fn mark_best_tip_seen(&mut self) {
413        self.receiver.mark_as_seen();
414    }
415}
416
417/// A chain tip change monitor.
418///
419/// Awaits changes and resets of the state's best chain tip,
420/// returning the latest [`TipAction`] once the state is updated.
421///
422/// Each cloned instance separately tracks the last block data it provided. If
423/// the best chain fork has changed since the last tip change on that instance,
424/// it returns a [`Reset`].
425///
426/// The chain tip data is based on:
427/// * the best non-finalized chain tip, if available, or
428/// * the finalized tip.
429#[derive(Debug)]
430pub struct ChainTipChange {
431    /// The receiver for the current chain tip's data.
432    latest_chain_tip: LatestChainTip,
433
434    /// The most recent [`block::Hash`] provided by this instance.
435    ///
436    /// ## Note
437    ///
438    /// If the best chain fork has changed, or some blocks have been skipped,
439    /// this hash will be different to the last returned `ChainTipBlock.hash`.
440    last_change_hash: Option<block::Hash>,
441
442    /// The network for the chain tip.
443    network: Network,
444}
445
446/// Actions that we can take in response to a [`ChainTipChange`].
447#[derive(Clone, Debug, PartialEq, Eq)]
448pub enum TipAction {
449    /// The chain tip was updated continuously,
450    /// using a child `block` of the previous block.
451    ///
452    /// The genesis block action is a `Grow`.
453    Grow {
454        /// Information about the block used to grow the chain.
455        block: ChainTipBlock,
456    },
457
458    /// The chain tip was reset to a block with `height` and `hash`.
459    ///
460    /// Resets can happen for different reasons:
461    /// - a newly created or cloned [`ChainTipChange`], which is behind the
462    ///   current tip,
463    /// - extending the chain with a network upgrade activation block,
464    /// - switching to a different best [`Chain`][1], also known as a rollback, and
465    /// - receiving multiple blocks since the previous change.
466    ///
467    /// To keep the code and tests simple, Zebra performs the same reset
468    /// actions, regardless of the reset reason.
469    ///
470    /// `Reset`s do not have the transaction hashes from the tip block, because
471    /// all transactions should be cleared by a reset.
472    ///
473    /// [1]: super::non_finalized_state::Chain
474    Reset {
475        /// The block height of the tip, after the chain reset.
476        height: block::Height,
477
478        /// The block hash of the tip, after the chain reset.
479        ///
480        /// Mainly useful for logging and debugging.
481        hash: block::Hash,
482    },
483}
484
485impl ChainTipChange {
486    /// Wait until the tip has changed, then return the corresponding [`TipAction`].
487    ///
488    /// The returned action describes how the tip has changed
489    /// since the last call to this method.
490    ///
491    /// If there have been no changes since the last time this method was called,
492    /// it waits for the next tip change before returning.
493    ///
494    /// If there have been multiple changes since the last time this method was called,
495    /// they are combined into a single [`TipAction::Reset`].
496    ///
497    /// Returns an error if communication with the state is lost.
498    ///
499    /// ## Note
500    ///
501    /// If a lot of blocks are committed at the same time,
502    /// the change will skip some blocks, and return a [`Reset`].
503    #[instrument(
504        skip(self),
505        fields(
506            last_change_hash = ?self.last_change_hash,
507            network = ?self.network,
508        ))]
509    pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
510        let block = self.tip_block_change().await?;
511
512        let action = self.action(block.clone());
513
514        self.last_change_hash = Some(block.hash);
515
516        Ok(action)
517    }
518
519    /// Returns:
520    /// - `Some(`[`TipAction`]`)` if there has been a change since the last time the method was called.
521    /// - `None` if there has been no change.
522    ///
523    /// See [`Self::wait_for_tip_change`] for details.
524    #[instrument(
525        skip(self),
526        fields(
527            last_change_hash = ?self.last_change_hash,
528            network = ?self.network,
529        ))]
530    pub fn last_tip_change(&mut self) -> Option<TipAction> {
531        let block = self.latest_chain_tip.with_chain_tip_block(|block| {
532            if Some(block.hash) != self.last_change_hash {
533                Some(block.clone())
534            } else {
535                // Ignore an unchanged tip.
536                None
537            }
538        })??;
539
540        let block_hash = block.hash;
541        let tip_action = self.action(block);
542
543        self.last_change_hash = Some(block_hash);
544
545        Some(tip_action)
546    }
547
548    /// Return an action based on `block` and the last change we returned.
549    fn action(&self, block: ChainTipBlock) -> TipAction {
550        // check for an edge case that's dealt with by other code
551        assert!(
552            Some(block.hash) != self.last_change_hash,
553            "ChainTipSender and ChainTipChange ignore unchanged tips"
554        );
555
556        // If the previous block hash doesn't match, reset.
557        // We've either:
558        // - just initialized this instance,
559        // - changed the best chain to another fork (a rollback), or
560        // - skipped some blocks in the best chain.
561        //
562        // Consensus rules:
563        //
564        // > It is possible for a reorganization to occur
565        // > that rolls back from after the activation height, to before that height.
566        // > This can handled in the same way as any regular chain orphaning or reorganization,
567        // > as long as the new chain is valid.
568        //
569        // https://zips.z.cash/zip-0200#chain-reorganization
570
571        // If we're at a network upgrade activation block, reset.
572        //
573        // Consensus rules:
574        //
575        // > When the current chain tip height reaches ACTIVATION_HEIGHT,
576        // > the node's local transaction memory pool SHOULD be cleared of transactions
577        // > that will never be valid on the post-upgrade consensus branch.
578        //
579        // https://zips.z.cash/zip-0200#memory-pool
580        //
581        // Skipped blocks can include network upgrade activation blocks.
582        // Fork changes can activate or deactivate a network upgrade.
583        // So we must perform the same actions for network upgrades and skipped blocks.
584        if Some(block.previous_block_hash) != self.last_change_hash
585            || NetworkUpgrade::is_activation_height(&self.network, block.height)
586        {
587            TipAction::reset_with(block)
588        } else {
589            TipAction::grow_with(block)
590        }
591    }
592
593    /// Create a new [`ChainTipChange`] from a [`LatestChainTip`] receiver and [`Network`].
594    fn new(latest_chain_tip: LatestChainTip, network: &Network) -> Self {
595        Self {
596            latest_chain_tip,
597            last_change_hash: None,
598            network: network.clone(),
599        }
600    }
601
602    /// Wait until the next chain tip change, then return the corresponding [`ChainTipBlock`].
603    ///
604    /// Returns an error if communication with the state is lost.
605    async fn tip_block_change(&mut self) -> Result<ChainTipBlock, watch::error::RecvError> {
606        loop {
607            // If there are multiple changes while this code is executing,
608            // we don't rely on getting the first block or the latest block
609            // after the change notification.
610            // Any block update after the change will do,
611            // we'll catch up with the tip after the next change.
612            self.latest_chain_tip.receiver.changed().await?;
613
614            // Wait until we have a new block
615            //
616            // last_tip_change() updates last_change_hash, but it doesn't call receiver.changed().
617            // So code that uses both sync and async methods can have spurious pending changes.
618            //
619            // TODO: use `receiver.borrow_and_update()` in `with_chain_tip_block()`,
620            //       once we upgrade to tokio 1.0 (#2200)
621            //       and remove this extra check
622            let new_block = self
623                .latest_chain_tip
624                .with_chain_tip_block(|block| {
625                    if Some(block.hash) != self.last_change_hash {
626                        Some(block.clone())
627                    } else {
628                        None
629                    }
630                })
631                .flatten();
632
633            if let Some(block) = new_block {
634                return Ok(block);
635            }
636        }
637    }
638
639    /// Returns the inner `LatestChainTip`.
640    pub fn latest_chain_tip(&self) -> LatestChainTip {
641        self.latest_chain_tip.clone()
642    }
643}
644
645impl Clone for ChainTipChange {
646    fn clone(&self) -> Self {
647        Self {
648            latest_chain_tip: self.latest_chain_tip.clone(),
649
650            // clear the previous change hash, so the first action is a reset
651            last_change_hash: None,
652
653            network: self.network.clone(),
654        }
655    }
656}
657
658impl TipAction {
659    /// Is this tip action a [`Reset`]?
660    pub fn is_reset(&self) -> bool {
661        matches!(self, Reset { .. })
662    }
663
664    /// Returns the block hash of this tip action,
665    /// regardless of the underlying variant.
666    pub fn best_tip_hash(&self) -> block::Hash {
667        match self {
668            Grow { block } => block.hash,
669            Reset { hash, .. } => *hash,
670        }
671    }
672
673    /// Returns the block height of this tip action,
674    /// regardless of the underlying variant.
675    pub fn best_tip_height(&self) -> block::Height {
676        match self {
677            Grow { block } => block.height,
678            Reset { height, .. } => *height,
679        }
680    }
681
682    /// Returns the block hash and height of this tip action,
683    /// regardless of the underlying variant.
684    pub fn best_tip_hash_and_height(&self) -> (block::Hash, block::Height) {
685        match self {
686            Grow { block } => (block.hash, block.height),
687            Reset { hash, height } => (*hash, *height),
688        }
689    }
690
691    /// Returns a [`Grow`] based on `block`.
692    pub(crate) fn grow_with(block: ChainTipBlock) -> Self {
693        Grow { block }
694    }
695
696    /// Returns a [`Reset`] based on `block`.
697    pub(crate) fn reset_with(block: ChainTipBlock) -> Self {
698        Reset {
699            height: block.height,
700            hash: block.hash,
701        }
702    }
703
704    /// Converts this [`TipAction`] into a [`Reset`].
705    ///
706    /// Designed for use in tests.
707    #[cfg(test)]
708    pub(crate) fn into_reset(self) -> Self {
709        match self {
710            Grow { block } => Reset {
711                height: block.height,
712                hash: block.hash,
713            },
714            reset @ Reset { .. } => reset,
715        }
716    }
717}