zebra_state/service/
chain_tip.rs

1//! Access to Zebra chain tip information.
2//!
3//! Zebra has 3 different interfaces for access to chain tip information:
4//! * [zebra_state::Request](crate::request): [tower::Service] requests about chain state,
5//! * [LatestChainTip] for efficient access to the current best tip, and
6//! * [ChainTipChange] to `await` specific changes to the chain tip.
7
8use std::{fmt, sync::Arc};
9
10use chrono::{DateTime, Utc};
11use futures::TryFutureExt;
12use tokio::sync::watch;
13use tracing::{field, instrument};
14
15use zebra_chain::{
16    block,
17    chain_tip::ChainTip,
18    parameters::{Network, NetworkUpgrade},
19    transaction::{self, Transaction},
20};
21
22use crate::{
23    request::ContextuallyVerifiedBlock, service::watch_receiver::WatchReceiver, BoxError,
24    CheckpointVerifiedBlock, SemanticallyVerifiedBlock,
25};
26
27use TipAction::*;
28
29#[cfg(any(test, feature = "proptest-impl"))]
30use proptest_derive::Arbitrary;
31
32#[cfg(any(test, feature = "proptest-impl"))]
33use zebra_chain::serialization::arbitrary::datetime_full;
34
35#[cfg(test)]
36mod tests;
37
38/// The internal watch channel data type for [`ChainTipSender`], [`LatestChainTip`],
39/// and [`ChainTipChange`].
40type ChainTipData = Option<ChainTipBlock>;
41
42/// A chain tip block, with precalculated block data.
43///
44/// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`],
45/// and [`ChainTipChange`].
46#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
48pub struct ChainTipBlock {
49    /// The hash of the best chain tip block.
50    pub hash: block::Hash,
51
52    /// The height of the best chain tip block.
53    pub height: block::Height,
54
55    /// The network block time of the best chain tip block.
56    #[cfg_attr(
57        any(test, feature = "proptest-impl"),
58        proptest(strategy = "datetime_full()")
59    )]
60    pub time: DateTime<Utc>,
61
62    /// The block transactions.
63    pub transactions: Vec<Arc<Transaction>>,
64
65    /// The mined transaction IDs of the transactions in `block`,
66    /// in the same order as `block.transactions`.
67    pub transaction_hashes: Arc<[transaction::Hash]>,
68
69    /// The hash of the previous block in the best chain.
70    /// This block is immediately behind the best chain tip.
71    ///
72    /// ## Note
73    ///
74    /// If the best chain fork has changed, or some blocks have been skipped,
75    /// this hash will be different to the last returned `ChainTipBlock.hash`.
76    pub previous_block_hash: block::Hash,
77}
78
79impl fmt::Display for ChainTipBlock {
80    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
81        f.debug_struct("ChainTipBlock")
82            .field("height", &self.height)
83            .field("hash", &self.hash)
84            .field("transactions", &self.transactions.len())
85            .finish()
86    }
87}
88
89impl From<ContextuallyVerifiedBlock> for ChainTipBlock {
90    fn from(contextually_valid: ContextuallyVerifiedBlock) -> Self {
91        let ContextuallyVerifiedBlock {
92            block,
93            hash,
94            height,
95            transaction_hashes,
96            ..
97        } = contextually_valid;
98
99        Self {
100            hash,
101            height,
102            time: block.header.time,
103            transactions: block.transactions.clone(),
104            transaction_hashes,
105            previous_block_hash: block.header.previous_block_hash,
106        }
107    }
108}
109
110impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
111    fn from(prepared: SemanticallyVerifiedBlock) -> Self {
112        let SemanticallyVerifiedBlock {
113            block,
114            hash,
115            height,
116            new_outputs: _,
117            transaction_hashes,
118            deferred_balance: _,
119        } = prepared;
120
121        Self {
122            hash,
123            height,
124            time: block.header.time,
125            transactions: block.transactions.clone(),
126            transaction_hashes,
127            previous_block_hash: block.header.previous_block_hash,
128        }
129    }
130}
131
132impl From<CheckpointVerifiedBlock> for ChainTipBlock {
133    fn from(CheckpointVerifiedBlock(prepared): CheckpointVerifiedBlock) -> Self {
134        prepared.into()
135    }
136}
137
138/// A sender for changes to the non-finalized and finalized chain tips.
139#[derive(Debug)]
140pub struct ChainTipSender {
141    /// Have we got any chain tips from the non-finalized state?
142    ///
143    /// Once this flag is set, we ignore the finalized state.
144    /// `None` tips don't set this flag.
145    use_non_finalized_tip: bool,
146
147    /// The sender channel for chain tip data.
148    sender: watch::Sender<ChainTipData>,
149}
150
151impl ChainTipSender {
152    /// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`],
153    /// using an `initial_tip` and a [`Network`].
154    #[instrument(skip(initial_tip), fields(new_height, new_hash))]
155    pub fn new(
156        initial_tip: impl Into<Option<ChainTipBlock>>,
157        network: &Network,
158    ) -> (Self, LatestChainTip, ChainTipChange) {
159        let initial_tip = initial_tip.into();
160        Self::record_new_tip(&initial_tip);
161
162        let (sender, receiver) = watch::channel(None);
163
164        let mut sender = ChainTipSender {
165            use_non_finalized_tip: false,
166            sender,
167        };
168
169        let current = LatestChainTip::new(receiver);
170        let change = ChainTipChange::new(current.clone(), network);
171
172        sender.update(initial_tip);
173
174        (sender, current, change)
175    }
176
177    /// Returns a clone of itself for sending finalized tip changes,
178    /// used by `TrustedChainSync` in `zebra-rpc`.
179    pub fn finalized_sender(&self) -> Self {
180        Self {
181            use_non_finalized_tip: false,
182            sender: self.sender.clone(),
183        }
184    }
185
186    /// Update the latest finalized tip.
187    ///
188    /// May trigger an update to the best tip.
189    #[instrument(
190        skip(self, new_tip),
191        fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
192    )]
193    pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
194        let new_tip = new_tip.into();
195        self.record_fields(&new_tip);
196
197        if !self.use_non_finalized_tip {
198            self.update(new_tip);
199        }
200    }
201
202    /// Update the latest non-finalized tip.
203    ///
204    /// May trigger an update to the best tip.
205    #[instrument(
206        skip(self, new_tip),
207        fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
208    )]
209    pub fn set_best_non_finalized_tip(
210        &mut self,
211        new_tip: impl Into<Option<ChainTipBlock>> + Clone,
212    ) {
213        let new_tip = new_tip.into();
214        self.record_fields(&new_tip);
215
216        // once the non-finalized state becomes active, it is always populated
217        // but ignoring `None`s makes the tests easier
218        if new_tip.is_some() {
219            self.use_non_finalized_tip = true;
220            self.update(new_tip)
221        }
222    }
223
224    /// Possibly send an update to listeners.
225    ///
226    /// An update is only sent if the current best tip is different from the last best tip
227    /// that was sent.
228    fn update(&mut self, new_tip: Option<ChainTipBlock>) {
229        // Correctness: the `self.sender.borrow()` must not be placed in a `let` binding to prevent
230        // a read-lock being created and living beyond the `self.sender.send(..)` call. If that
231        // happens, the `send` method will attempt to obtain a write-lock and will dead-lock.
232        // Without the binding, the guard is dropped at the end of the expression.
233        let active_hash = self
234            .sender
235            .borrow()
236            .as_ref()
237            .map(|active_value| active_value.hash);
238
239        let needs_update = match (new_tip.as_ref(), active_hash) {
240            // since the blocks have been contextually validated,
241            // we know their hashes cover all the block data
242            (Some(new_tip), Some(active_hash)) => new_tip.hash != active_hash,
243            (Some(_new_tip), None) => true,
244            (None, _active_value_hash) => false,
245        };
246
247        if needs_update {
248            let _ = self.sender.send(new_tip);
249        }
250    }
251
252    /// Record `new_tip` in the current span.
253    ///
254    /// Callers should create a new span with empty `new_height` and `new_hash` fields.
255    fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
256        Self::record_tip(&tracing::Span::current(), "new", new_tip);
257    }
258
259    /// Record `new_tip` and the fields from `self` in the current span.
260    ///
261    /// The fields recorded are:
262    ///
263    /// - `new_height`
264    /// - `new_hash`
265    /// - `old_height`
266    /// - `old_hash`
267    /// - `old_use_non_finalized_tip`
268    ///
269    /// Callers should create a new span with the empty fields described above.
270    fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
271        let span = tracing::Span::current();
272
273        let old_tip = &*self.sender.borrow();
274
275        Self::record_tip(&span, "new", new_tip);
276        Self::record_tip(&span, "old", old_tip);
277
278        span.record(
279            "old_use_non_finalized_tip",
280            field::debug(self.use_non_finalized_tip),
281        );
282    }
283
284    /// Record `tip` into `span` using the `prefix` to name the fields.
285    ///
286    /// Callers should create a new span with empty `{prefix}_height` and `{prefix}_hash` fields.
287    fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
288        let height = tip.as_ref().map(|block| block.height);
289        let hash = tip.as_ref().map(|block| block.hash);
290
291        span.record(format!("{prefix}_height").as_str(), field::debug(height));
292        span.record(format!("{prefix}_hash").as_str(), field::debug(hash));
293    }
294}
295
296/// Efficient access to the state's current best chain tip.
297///
298/// Each method returns data from the latest tip,
299/// regardless of how many times you call it.
300///
301/// Cloned instances provide identical tip data.
302///
303/// The chain tip data is based on:
304/// * the best non-finalized chain tip, if available, or
305/// * the finalized tip.
306///
307/// ## Note
308///
309/// If a lot of blocks are committed at the same time,
310/// the latest tip will skip some blocks in the chain.
311#[derive(Clone, Debug)]
312pub struct LatestChainTip {
313    /// The receiver for the current chain tip's data.
314    receiver: WatchReceiver<ChainTipData>,
315}
316
317impl LatestChainTip {
318    /// Create a new [`LatestChainTip`] from a watch channel receiver.
319    fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
320        Self {
321            receiver: WatchReceiver::new(receiver),
322        }
323    }
324
325    /// Maps the current data `ChainTipData` to `Option<U>`
326    /// by applying a function to the watched value,
327    /// while holding the receiver lock as briefly as possible.
328    ///
329    /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
330    /// extract some information from it, while also adding the current chain tip block's fields as
331    /// records to the current span.
332    ///
333    /// A single read lock is acquired to clone `T`, and then released after the clone.
334    /// See the performance note on [`WatchReceiver::with_watch_data`].
335    ///
336    /// Does not mark the watched data as seen.
337    ///
338    /// # Correctness
339    ///
340    /// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
341    fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
342    where
343        F: FnOnce(&ChainTipBlock) -> U,
344    {
345        let span = tracing::Span::current();
346
347        let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
348            span.record(
349                "height",
350                tracing::field::debug(chain_tip_block.map(|block| block.height)),
351            );
352            span.record(
353                "hash",
354                tracing::field::debug(chain_tip_block.map(|block| block.hash)),
355            );
356            span.record(
357                "time",
358                tracing::field::debug(chain_tip_block.map(|block| block.time)),
359            );
360            span.record(
361                "previous_hash",
362                tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
363            );
364            span.record(
365                "transaction_count",
366                tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
367            );
368        };
369
370        self.receiver.with_watch_data(|chain_tip_block| {
371            // TODO: replace with Option::inspect when it stabilises
372            //       https://github.com/rust-lang/rust/issues/91345
373            register_span_fields(chain_tip_block.as_ref());
374
375            chain_tip_block.as_ref().map(f)
376        })
377    }
378}
379
380impl ChainTip for LatestChainTip {
381    #[instrument(skip(self))]
382    fn best_tip_height(&self) -> Option<block::Height> {
383        self.with_chain_tip_block(|block| block.height)
384    }
385
386    #[instrument(skip(self))]
387    fn best_tip_hash(&self) -> Option<block::Hash> {
388        self.with_chain_tip_block(|block| block.hash)
389    }
390
391    #[instrument(skip(self))]
392    fn best_tip_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
393        self.with_chain_tip_block(|block| (block.height, block.hash))
394    }
395
396    #[instrument(skip(self))]
397    fn best_tip_block_time(&self) -> Option<DateTime<Utc>> {
398        self.with_chain_tip_block(|block| block.time)
399    }
400
401    #[instrument(skip(self))]
402    fn best_tip_height_and_block_time(&self) -> Option<(block::Height, DateTime<Utc>)> {
403        self.with_chain_tip_block(|block| (block.height, block.time))
404    }
405
406    #[instrument(skip(self))]
407    fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
408        self.with_chain_tip_block(|block| block.transaction_hashes.clone())
409            .unwrap_or_else(|| Arc::new([]))
410    }
411
412    /// Returns when the state tip changes.
413    ///
414    /// Marks the state tip as seen when the returned future completes.
415    #[instrument(skip(self))]
416    async fn best_tip_changed(&mut self) -> Result<(), BoxError> {
417        self.receiver.changed().err_into().await
418    }
419
420    /// Mark the current best state tip as seen.
421    fn mark_best_tip_seen(&mut self) {
422        self.receiver.mark_as_seen();
423    }
424}
425
426/// A chain tip change monitor.
427///
428/// Awaits changes and resets of the state's best chain tip,
429/// returning the latest [`TipAction`] once the state is updated.
430///
431/// Each cloned instance separately tracks the last block data it provided. If
432/// the best chain fork has changed since the last tip change on that instance,
433/// it returns a [`Reset`].
434///
435/// The chain tip data is based on:
436/// * the best non-finalized chain tip, if available, or
437/// * the finalized tip.
438#[derive(Debug)]
439pub struct ChainTipChange {
440    /// The receiver for the current chain tip's data.
441    latest_chain_tip: LatestChainTip,
442
443    /// The most recent [`block::Hash`] provided by this instance.
444    ///
445    /// ## Note
446    ///
447    /// If the best chain fork has changed, or some blocks have been skipped,
448    /// this hash will be different to the last returned `ChainTipBlock.hash`.
449    last_change_hash: Option<block::Hash>,
450
451    /// The network for the chain tip.
452    network: Network,
453}
454
455/// Actions that we can take in response to a [`ChainTipChange`].
456#[derive(Clone, Debug, PartialEq, Eq)]
457pub enum TipAction {
458    /// The chain tip was updated continuously,
459    /// using a child `block` of the previous block.
460    ///
461    /// The genesis block action is a `Grow`.
462    Grow {
463        /// Information about the block used to grow the chain.
464        block: ChainTipBlock,
465    },
466
467    /// The chain tip was reset to a block with `height` and `hash`.
468    ///
469    /// Resets can happen for different reasons:
470    /// - a newly created or cloned [`ChainTipChange`], which is behind the
471    ///   current tip,
472    /// - extending the chain with a network upgrade activation block,
473    /// - switching to a different best [`Chain`][1], also known as a rollback, and
474    /// - receiving multiple blocks since the previous change.
475    ///
476    /// To keep the code and tests simple, Zebra performs the same reset
477    /// actions, regardless of the reset reason.
478    ///
479    /// `Reset`s do not have the transaction hashes from the tip block, because
480    /// all transactions should be cleared by a reset.
481    ///
482    /// [1]: super::non_finalized_state::Chain
483    Reset {
484        /// The block height of the tip, after the chain reset.
485        height: block::Height,
486
487        /// The block hash of the tip, after the chain reset.
488        ///
489        /// Mainly useful for logging and debugging.
490        hash: block::Hash,
491    },
492}
493
494impl ChainTipChange {
495    /// Wait until the tip has changed, then return the corresponding [`TipAction`].
496    ///
497    /// The returned action describes how the tip has changed
498    /// since the last call to this method.
499    ///
500    /// If there have been no changes since the last time this method was called,
501    /// it waits for the next tip change before returning.
502    ///
503    /// If there have been multiple changes since the last time this method was called,
504    /// they are combined into a single [`TipAction::Reset`].
505    ///
506    /// Returns an error if communication with the state is lost.
507    ///
508    /// ## Note
509    ///
510    /// If a lot of blocks are committed at the same time,
511    /// the change will skip some blocks, and return a [`Reset`].
512    #[instrument(
513        skip(self),
514        fields(
515            last_change_hash = ?self.last_change_hash,
516            network = ?self.network,
517        ))]
518    pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
519        let block = self.tip_block_change().await?;
520
521        let action = self.action(block.clone());
522
523        self.last_change_hash = Some(block.hash);
524
525        Ok(action)
526    }
527
528    /// Returns:
529    /// - `Some(`[`TipAction`]`)` if there has been a change since the last time the method was called.
530    /// - `None` if there has been no change.
531    ///
532    /// See [`Self::wait_for_tip_change`] for details.
533    #[instrument(
534        skip(self),
535        fields(
536            last_change_hash = ?self.last_change_hash,
537            network = ?self.network,
538        ))]
539    pub fn last_tip_change(&mut self) -> Option<TipAction> {
540        let block = self.latest_chain_tip.with_chain_tip_block(|block| {
541            if Some(block.hash) != self.last_change_hash {
542                Some(block.clone())
543            } else {
544                // Ignore an unchanged tip.
545                None
546            }
547        })??;
548
549        let block_hash = block.hash;
550        let tip_action = self.action(block);
551
552        self.last_change_hash = Some(block_hash);
553
554        Some(tip_action)
555    }
556
557    /// Return an action based on `block` and the last change we returned.
558    fn action(&self, block: ChainTipBlock) -> TipAction {
559        // check for an edge case that's dealt with by other code
560        assert!(
561            Some(block.hash) != self.last_change_hash,
562            "ChainTipSender and ChainTipChange ignore unchanged tips"
563        );
564
565        // If the previous block hash doesn't match, reset.
566        // We've either:
567        // - just initialized this instance,
568        // - changed the best chain to another fork (a rollback), or
569        // - skipped some blocks in the best chain.
570        //
571        // Consensus rules:
572        //
573        // > It is possible for a reorganization to occur
574        // > that rolls back from after the activation height, to before that height.
575        // > This can handled in the same way as any regular chain orphaning or reorganization,
576        // > as long as the new chain is valid.
577        //
578        // https://zips.z.cash/zip-0200#chain-reorganization
579
580        // If we're at a network upgrade activation block, reset.
581        //
582        // Consensus rules:
583        //
584        // > When the current chain tip height reaches ACTIVATION_HEIGHT,
585        // > the node's local transaction memory pool SHOULD be cleared of transactions
586        // > that will never be valid on the post-upgrade consensus branch.
587        //
588        // https://zips.z.cash/zip-0200#memory-pool
589        //
590        // Skipped blocks can include network upgrade activation blocks.
591        // Fork changes can activate or deactivate a network upgrade.
592        // So we must perform the same actions for network upgrades and skipped blocks.
593        if Some(block.previous_block_hash) != self.last_change_hash
594            || NetworkUpgrade::is_activation_height(&self.network, block.height)
595        {
596            TipAction::reset_with(block)
597        } else {
598            TipAction::grow_with(block)
599        }
600    }
601
602    /// Create a new [`ChainTipChange`] from a [`LatestChainTip`] receiver and [`Network`].
603    fn new(latest_chain_tip: LatestChainTip, network: &Network) -> Self {
604        Self {
605            latest_chain_tip,
606            last_change_hash: None,
607            network: network.clone(),
608        }
609    }
610
611    /// Wait until the next chain tip change, then return the corresponding [`ChainTipBlock`].
612    ///
613    /// Returns an error if communication with the state is lost.
614    async fn tip_block_change(&mut self) -> Result<ChainTipBlock, watch::error::RecvError> {
615        loop {
616            // If there are multiple changes while this code is executing,
617            // we don't rely on getting the first block or the latest block
618            // after the change notification.
619            // Any block update after the change will do,
620            // we'll catch up with the tip after the next change.
621            self.latest_chain_tip.receiver.changed().await?;
622
623            // Wait until we have a new block
624            //
625            // last_tip_change() updates last_change_hash, but it doesn't call receiver.changed().
626            // So code that uses both sync and async methods can have spurious pending changes.
627            //
628            // TODO: use `receiver.borrow_and_update()` in `with_chain_tip_block()`,
629            //       once we upgrade to tokio 1.0 (#2200)
630            //       and remove this extra check
631            let new_block = self
632                .latest_chain_tip
633                .with_chain_tip_block(|block| {
634                    if Some(block.hash) != self.last_change_hash {
635                        Some(block.clone())
636                    } else {
637                        None
638                    }
639                })
640                .flatten();
641
642            if let Some(block) = new_block {
643                return Ok(block);
644            }
645        }
646    }
647
648    /// Returns the inner `LatestChainTip`.
649    pub fn latest_chain_tip(&self) -> LatestChainTip {
650        self.latest_chain_tip.clone()
651    }
652}
653
654impl Clone for ChainTipChange {
655    fn clone(&self) -> Self {
656        Self {
657            latest_chain_tip: self.latest_chain_tip.clone(),
658
659            // clear the previous change hash, so the first action is a reset
660            last_change_hash: None,
661
662            network: self.network.clone(),
663        }
664    }
665}
666
667impl TipAction {
668    /// Is this tip action a [`Reset`]?
669    pub fn is_reset(&self) -> bool {
670        matches!(self, Reset { .. })
671    }
672
673    /// Returns the block hash of this tip action,
674    /// regardless of the underlying variant.
675    pub fn best_tip_hash(&self) -> block::Hash {
676        match self {
677            Grow { block } => block.hash,
678            Reset { hash, .. } => *hash,
679        }
680    }
681
682    /// Returns the block height of this tip action,
683    /// regardless of the underlying variant.
684    pub fn best_tip_height(&self) -> block::Height {
685        match self {
686            Grow { block } => block.height,
687            Reset { height, .. } => *height,
688        }
689    }
690
691    /// Returns the block hash and height of this tip action,
692    /// regardless of the underlying variant.
693    pub fn best_tip_hash_and_height(&self) -> (block::Hash, block::Height) {
694        match self {
695            Grow { block } => (block.hash, block.height),
696            Reset { hash, height } => (*hash, *height),
697        }
698    }
699
700    /// Returns a [`Grow`] based on `block`.
701    pub(crate) fn grow_with(block: ChainTipBlock) -> Self {
702        Grow { block }
703    }
704
705    /// Returns a [`Reset`] based on `block`.
706    pub(crate) fn reset_with(block: ChainTipBlock) -> Self {
707        Reset {
708            height: block.height,
709            hash: block.hash,
710        }
711    }
712
713    /// Converts this [`TipAction`] into a [`Reset`].
714    ///
715    /// Designed for use in tests.
716    #[cfg(test)]
717    pub(crate) fn into_reset(self) -> Self {
718        match self {
719            Grow { block } => Reset {
720                height: block.height,
721                hash: block.hash,
722            },
723            reset @ Reset { .. } => reset,
724        }
725    }
726}