zebra_rpc/
sync.rs

1//! Syncer task for maintaining a non-finalized state in Zebra's ReadStateService and updating `ChainTipSender` via RPCs
2
3use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration};
4
5use futures::{stream::FuturesOrdered, StreamExt};
6use tokio::task::JoinHandle;
7use tower::BoxError;
8use tracing::info;
9use zebra_chain::{
10    block::{self, Block, Height},
11    parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
12    serialization::ZcashDeserializeInto,
13};
14use zebra_node_services::rpc_client::RpcRequestClient;
15use zebra_state::{
16    spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
17    LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
18    MAX_BLOCK_REORG_HEIGHT,
19};
20
21use zebra_chain::diagnostic::task::WaitForPanics;
22
23use crate::{
24    methods::{hex_data::HexData, GetBlockHeightAndHash},
25    server,
26};
27
28/// How long to wait between calls to `getbestblockheightandhash` when it:
29/// - Returns an error, or
30/// - Returns the block hash of a block that the read state already contains,
31///   (so that there's nothing for the syncer to do except wait for the next chain tip change).
32///
33/// See the [`TrustedChainSync::wait_for_chain_tip_change()`] method documentation for more information.
34const POLL_DELAY: Duration = Duration::from_millis(200);
35
36/// Syncs non-finalized blocks in the best chain from a trusted Zebra node's RPC methods.
37#[derive(Debug)]
38struct TrustedChainSync {
39    /// RPC client for calling Zebra's RPC methods.
40    rpc_client: RpcRequestClient,
41    /// The read state service.
42    db: ZebraDb,
43    /// The non-finalized state - currently only contains the best chain.
44    non_finalized_state: NonFinalizedState,
45    /// The chain tip sender for updating [`LatestChainTip`] and [`ChainTipChange`].
46    chain_tip_sender: ChainTipSender,
47    /// The non-finalized state sender, for updating the [`ReadStateService`] when the non-finalized best chain changes.
48    non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
49}
50
51impl TrustedChainSync {
52    /// Creates a new [`TrustedChainSync`] with a [`ChainTipSender`], then spawns a task to sync blocks
53    /// from the node's non-finalized best chain.
54    ///
55    /// Returns the [`LatestChainTip`], [`ChainTipChange`], and a [`JoinHandle`] for the sync task.
56    pub async fn spawn(
57        rpc_address: SocketAddr,
58        db: ZebraDb,
59        non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
60    ) -> (LatestChainTip, ChainTipChange, JoinHandle<()>) {
61        let rpc_client = RpcRequestClient::new(rpc_address);
62        let non_finalized_state = NonFinalizedState::new(&db.network());
63        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
64            ChainTipSender::new(None, &db.network());
65
66        let mut syncer = Self {
67            rpc_client,
68            db,
69            non_finalized_state,
70            chain_tip_sender,
71            non_finalized_state_sender,
72        };
73
74        let sync_task = tokio::spawn(async move {
75            syncer.sync().await;
76        });
77
78        (latest_chain_tip, chain_tip_change, sync_task)
79    }
80
81    /// Starts syncing blocks from the node's non-finalized best chain and checking for chain tip changes in the finalized state.
82    ///
83    /// When the best chain tip in Zebra is not available in the finalized state or the local non-finalized state,
84    /// gets any unavailable blocks in Zebra's best chain from the RPC server, adds them to the local non-finalized state, then
85    /// sends the updated chain tip block and non-finalized state to the [`ChainTipSender`] and non-finalized state sender.
86    async fn sync(&mut self) {
87        self.try_catch_up_with_primary().await;
88        let mut last_chain_tip_hash =
89            if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
90                let last_chain_tip_hash = finalized_tip_block.hash;
91                self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
92                last_chain_tip_hash
93            } else {
94                GENESIS_PREVIOUS_BLOCK_HASH
95            };
96
97        loop {
98            let (target_tip_height, target_tip_hash) =
99                self.wait_for_chain_tip_change(last_chain_tip_hash).await;
100
101            info!(
102                ?target_tip_height,
103                ?target_tip_hash,
104                "got a chain tip change"
105            );
106
107            if self.is_finalized_tip_change(target_tip_hash).await {
108                let block = self.finalized_chain_tip_block().await.expect(
109                    "should have genesis block after successful bestblockheightandhash response",
110                );
111
112                last_chain_tip_hash = block.hash;
113                self.chain_tip_sender.set_finalized_tip(block);
114                continue;
115            }
116
117            // If the new best chain tip is unavailable in the finalized state, start syncing non-finalized blocks from
118            // the non-finalized best chain tip height or finalized tip height.
119            let (next_block_height, mut current_tip_hash) =
120                self.next_block_height_and_prev_hash().await;
121
122            last_chain_tip_hash = current_tip_hash;
123
124            let rpc_client = self.rpc_client.clone();
125            let mut block_futs =
126                rpc_client.block_range_ordered(next_block_height..=target_tip_height);
127
128            let should_reset_non_finalized_state = loop {
129                let block = match block_futs.next().await {
130                    Some(Ok(Some(block)))
131                        if block.header.previous_block_hash == current_tip_hash =>
132                    {
133                        SemanticallyVerifiedBlock::from(block)
134                    }
135                    // Clear the non-finalized state and re-fetch every block past the finalized tip if:
136                    // - the next block's previous block hash doesn't match the expected hash,
137                    // - the next block is missing
138                    // - the target tip hash is missing from the blocks in `block_futs`
139                    // because there was likely a chain re-org/fork.
140                    Some(Ok(_)) | None => break true,
141                    // If calling the `getblock` RPC method fails with an unexpected error, wait for the next chain tip change
142                    // without resetting the non-finalized state.
143                    Some(Err(err)) => {
144                        tracing::warn!(
145                            ?err,
146                            "encountered an unexpected error while calling getblock method"
147                        );
148
149                        break false;
150                    }
151                };
152
153                // # Correctness
154                //
155                // Ensure that the secondary rocksdb instance has caught up to the primary instance
156                // before attempting to commit the new block to the non-finalized state. It is sufficient
157                // to call this once here, as a new chain tip block has already been retrieved and so
158                // we know that the primary rocksdb instance has already been updated.
159                self.try_catch_up_with_primary().await;
160
161                let block_hash = block.hash;
162                let commit_result = if self.non_finalized_state.chain_count() == 0 {
163                    self.non_finalized_state
164                        .commit_new_chain(block.clone(), &self.db)
165                } else {
166                    self.non_finalized_state
167                        .commit_block(block.clone(), &self.db)
168                };
169
170                // The previous block hash is checked above, if committing the block fails for some reason, try again.
171                if let Err(error) = commit_result {
172                    tracing::warn!(
173                        ?error,
174                        ?block_hash,
175                        "failed to commit block to non-finalized state"
176                    );
177
178                    break false;
179                }
180
181                // TODO: Check the finalized tip height and finalize blocks from the non-finalized state until
182                //       all non-finalized state chain root previous block hashes match the finalized tip hash.
183                while self
184                    .non_finalized_state
185                    .best_chain_len()
186                    .expect("just successfully inserted a non-finalized block above")
187                    > MAX_BLOCK_REORG_HEIGHT
188                {
189                    tracing::trace!("finalizing block past the reorg limit");
190                    self.non_finalized_state.finalize();
191                }
192
193                self.update_channels(block);
194                current_tip_hash = block_hash;
195                last_chain_tip_hash = current_tip_hash;
196
197                // If the block hash matches the output from the `getbestblockhash` RPC method, we can wait until
198                // the best block hash changes to get the next block.
199                if block_hash == target_tip_hash {
200                    break false;
201                }
202            };
203
204            if should_reset_non_finalized_state {
205                self.try_catch_up_with_primary().await;
206                let block = self.finalized_chain_tip_block().await.expect(
207                    "should have genesis block after successful bestblockheightandhash response",
208                );
209
210                last_chain_tip_hash = block.hash;
211                self.non_finalized_state =
212                    NonFinalizedState::new(&self.non_finalized_state.network);
213                self.update_channels(block);
214            }
215        }
216    }
217
218    /// Tries to catch up to the primary db instance for an up-to-date view of finalized blocks.
219    async fn try_catch_up_with_primary(&self) {
220        let db = self.db.clone();
221        tokio::task::spawn_blocking(move || {
222            if let Err(catch_up_error) = db.try_catch_up_with_primary() {
223                tracing::warn!(?catch_up_error, "failed to catch up to primary");
224            }
225        })
226        .wait_for_panics()
227        .await
228    }
229
230    /// If the non-finalized state is empty, tries to catch up to the primary db instance for
231    /// an up-to-date view of finalized blocks.
232    ///
233    /// Returns true if the non-finalized state is empty and the target hash is in the finalized state.
234    async fn is_finalized_tip_change(&self, target_tip_hash: block::Hash) -> bool {
235        self.non_finalized_state.chain_count() == 0 && {
236            let db = self.db.clone();
237            tokio::task::spawn_blocking(move || {
238                if let Err(catch_up_error) = db.try_catch_up_with_primary() {
239                    tracing::warn!(?catch_up_error, "failed to catch up to primary");
240                }
241                db.contains_hash(target_tip_hash)
242            })
243            .wait_for_panics()
244            .await
245        }
246    }
247
248    /// Returns the current tip hash and the next height immediately after the current tip height.
249    async fn next_block_height_and_prev_hash(&self) -> (block::Height, block::Hash) {
250        if let Some(tip) = self.non_finalized_state.best_tip() {
251            Some(tip)
252        } else {
253            let db = self.db.clone();
254            tokio::task::spawn_blocking(move || db.tip())
255                .wait_for_panics()
256                .await
257        }
258        .map(|(current_tip_height, current_tip_hash)| {
259            (
260                current_tip_height.next().expect("should be valid height"),
261                current_tip_hash,
262            )
263        })
264        .unwrap_or((Height::MIN, GENESIS_PREVIOUS_BLOCK_HASH))
265    }
266
267    /// Reads the finalized tip block from the secondary db instance and converts it to a [`ChainTipBlock`].
268    async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
269        let db = self.db.clone();
270        tokio::task::spawn_blocking(move || {
271            let (height, hash) = db.tip()?;
272            db.block(height.into())
273                .map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
274                .map(ChainTipBlock::from)
275        })
276        .wait_for_panics()
277        .await
278    }
279
280    /// Accepts a block hash.
281    ///
282    /// Polls `getbestblockheightandhash` RPC method until it successfully returns a block hash that is different from the last chain tip hash.
283    ///
284    /// Returns the node's best block hash.
285    async fn wait_for_chain_tip_change(
286        &self,
287        last_chain_tip_hash: block::Hash,
288    ) -> (block::Height, block::Hash) {
289        loop {
290            let Some(target_height_and_hash) = self
291                .rpc_client
292                .get_best_block_height_and_hash()
293                .await
294                .filter(|&(_height, hash)| hash != last_chain_tip_hash)
295            else {
296                // If `get_best_block_height_and_hash()` returns an error, or returns
297                // the current chain tip hash, wait [`POLL_DELAY`], then try again.
298                tokio::time::sleep(POLL_DELAY).await;
299                continue;
300            };
301
302            break target_height_and_hash;
303        }
304    }
305
306    /// Sends the new chain tip and non-finalized state to the latest chain channels.
307    fn update_channels(&mut self, best_tip: impl Into<ChainTipBlock>) {
308        // If the final receiver was just dropped, ignore the error.
309        let _ = self
310            .non_finalized_state_sender
311            .send(self.non_finalized_state.clone());
312        self.chain_tip_sender
313            .set_best_non_finalized_tip(Some(best_tip.into()));
314    }
315}
316
317/// Accepts a [zebra-state configuration](zebra_state::Config), a [`Network`], and
318/// the [`SocketAddr`] of a Zebra node's RPC server.
319///
320/// Initializes a [`ReadStateService`] and a [`TrustedChainSync`] to update the
321/// non-finalized best chain and the latest chain tip.
322///
323/// Returns a [`ReadStateService`], [`LatestChainTip`], [`ChainTipChange`], and
324/// a [`JoinHandle`] for the sync task.
325pub fn init_read_state_with_syncer(
326    config: zebra_state::Config,
327    network: &Network,
328    rpc_address: SocketAddr,
329) -> tokio::task::JoinHandle<
330    Result<
331        (
332            ReadStateService,
333            LatestChainTip,
334            ChainTipChange,
335            tokio::task::JoinHandle<()>,
336        ),
337        BoxError,
338    >,
339> {
340    let network = network.clone();
341    tokio::spawn(async move {
342        if config.ephemeral {
343            return Err("standalone read state service cannot be used with ephemeral state".into());
344        }
345
346        let (read_state, db, non_finalized_state_sender) =
347            spawn_init_read_only(config, &network).await?;
348        let (latest_chain_tip, chain_tip_change, sync_task) =
349            TrustedChainSync::spawn(rpc_address, db, non_finalized_state_sender).await;
350        Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
351    })
352}
353
354trait SyncerRpcMethods {
355    async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)>;
356    async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError>;
357    fn block_range_ordered(
358        &self,
359        height_range: RangeInclusive<Height>,
360    ) -> FuturesOrdered<impl std::future::Future<Output = Result<Option<Arc<Block>>, BoxError>>>
361    {
362        let &Height(start_height) = height_range.start();
363        let &Height(end_height) = height_range.end();
364        let mut futs = FuturesOrdered::new();
365
366        for height in start_height..=end_height {
367            futs.push_back(self.get_block(height));
368        }
369
370        futs
371    }
372}
373
374impl SyncerRpcMethods for RpcRequestClient {
375    async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
376        self.json_result_from_call("getbestblockheightandhash", "[]")
377            .await
378            .map(|GetBlockHeightAndHash { height, hash }| (height, hash))
379            .ok()
380    }
381
382    async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError> {
383        match self
384            .json_result_from_call("getblock", format!(r#"["{}", 0]"#, height))
385            .await
386        {
387            Ok(HexData(raw_block)) => {
388                let block = raw_block.zcash_deserialize_into::<Block>()?;
389                Ok(Some(Arc::new(block)))
390            }
391            Err(err)
392                if err
393                    .downcast_ref::<jsonrpsee_types::ErrorCode>()
394                    .is_some_and(|err| {
395                        let code: i32 = server::error::LegacyCode::InvalidParameter.into();
396                        err.code() == code
397                    }) =>
398            {
399                Ok(None)
400            }
401            Err(err) => Err(err),
402        }
403    }
404}