zebra_rpc/
sync.rs

1//! Syncer task for maintaining a non-finalized state in Zebra's ReadStateService and updating `ChainTipSender` via RPCs
2
3use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration};
4
5use futures::{stream::FuturesOrdered, StreamExt};
6use tokio::{sync::Mutex, task::JoinHandle};
7use tower::BoxError;
8use tracing::info;
9use zebra_chain::{
10    block::{self, Block, Height},
11    parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
12    serialization::ZcashDeserializeInto,
13};
14use zebra_node_services::rpc_client::RpcRequestClient;
15use zebra_state::{
16    spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
17    LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
18    MAX_BLOCK_REORG_HEIGHT,
19};
20
21use zebra_chain::diagnostic::task::WaitForPanics;
22
23use crate::{
24    indexer::{self, indexer_client::IndexerClient, BlockHashAndHeight, Empty},
25    methods::{hex_data::HexData, GetBlockHeightAndHash},
26    server,
27};
28
29/// How long to wait between calls to `getbestblockheightandhash` when it returns an error.
30const POLL_DELAY: Duration = Duration::from_millis(200);
31
32/// Syncs non-finalized blocks in the best chain from a trusted Zebra node's RPC methods.
33#[derive(Debug)]
34pub struct TrustedChainSync {
35    /// RPC client for calling Zebra's RPC methods.
36    rpc_client: RpcRequestClient,
37    /// gRPC client for calling Zebra's indexer methods.
38    pub indexer_rpc_client: IndexerClient<tonic::transport::Channel>,
39    /// A stream of best chain tip changes from the indexer RPCs `chain_tip_change` method.
40    chain_tip_change: Option<Mutex<tonic::Streaming<indexer::BlockHashAndHeight>>>,
41    /// The read state service.
42    db: ZebraDb,
43    /// The non-finalized state - currently only contains the best chain.
44    non_finalized_state: NonFinalizedState,
45    /// The chain tip sender for updating [`LatestChainTip`] and [`ChainTipChange`].
46    chain_tip_sender: ChainTipSender,
47    /// The non-finalized state sender, for updating the [`ReadStateService`] when the non-finalized best chain changes.
48    non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
49}
50
51impl TrustedChainSync {
52    /// Creates a new [`TrustedChainSync`] with a [`ChainTipSender`], then spawns a task to sync blocks
53    /// from the node's non-finalized best chain.
54    ///
55    /// Returns the [`LatestChainTip`], [`ChainTipChange`], and a [`JoinHandle`] for the sync task.
56    pub async fn spawn(
57        rpc_address: SocketAddr,
58        indexer_rpc_address: SocketAddr,
59        db: ZebraDb,
60        non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
61    ) -> Result<(LatestChainTip, ChainTipChange, JoinHandle<()>), BoxError> {
62        let rpc_client = RpcRequestClient::new(rpc_address);
63        let indexer_rpc_client =
64            IndexerClient::connect(format!("http://{indexer_rpc_address}")).await?;
65
66        let non_finalized_state = NonFinalizedState::new(&db.network());
67        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
68            ChainTipSender::new(None, &db.network());
69
70        let mut syncer = Self {
71            rpc_client,
72            indexer_rpc_client,
73            chain_tip_change: None,
74            db,
75            non_finalized_state,
76            chain_tip_sender,
77            non_finalized_state_sender,
78        };
79
80        let sync_task = tokio::spawn(async move {
81            syncer.sync().await;
82        });
83
84        Ok((latest_chain_tip, chain_tip_change, sync_task))
85    }
86
87    /// Starts syncing blocks from the node's non-finalized best chain and checking for chain tip changes in the finalized state.
88    ///
89    /// When the best chain tip in Zebra is not available in the finalized state or the local non-finalized state,
90    /// gets any unavailable blocks in Zebra's best chain from the RPC server, adds them to the local non-finalized state, then
91    /// sends the updated chain tip block and non-finalized state to the [`ChainTipSender`] and non-finalized state sender.
92    async fn sync(&mut self) {
93        let mut should_reset_non_finalized_state = false;
94        self.try_catch_up_with_primary().await;
95        if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
96            self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
97        }
98
99        loop {
100            tracing::info!(
101                ?should_reset_non_finalized_state,
102                "waiting for a chain tip change"
103            );
104
105            let (target_tip_hash, target_tip_height) = if !should_reset_non_finalized_state {
106                self.wait_for_tip_change().await
107            } else {
108                match self.rpc_client.get_best_block_height_and_hash().await {
109                    Ok((height, hash)) => {
110                        info!(
111                            ?height,
112                            ?hash,
113                            "got best height and hash from jsonrpc after resetting non-finalized state"
114                        );
115
116                        self.try_catch_up_with_primary().await;
117                        let block: ChainTipBlock = self.finalized_chain_tip_block().await.expect(
118                            "should have genesis block after successful bestblockheightandhash response",
119                        );
120
121                        self.non_finalized_state =
122                            NonFinalizedState::new(&self.non_finalized_state.network);
123
124                        self.update_channels(block);
125
126                        should_reset_non_finalized_state = false;
127                        (hash, height)
128                    }
129                    Err(error) => {
130                        tracing::warn!(?error, "failed to get best block height and hash");
131                        // If the RPC server is unavailable, wait for the next chain tip change.
132                        tokio::time::sleep(POLL_DELAY).await;
133                        continue;
134                    }
135                }
136            };
137
138            info!(
139                ?target_tip_height,
140                ?target_tip_hash,
141                "got a chain tip change"
142            );
143
144            if self.is_finalized_tip_change(target_tip_hash).await {
145                let block = self
146                    .finalized_chain_tip_block()
147                    .await
148                    .expect("should have genesis block after a chain tip change");
149
150                self.chain_tip_sender.set_finalized_tip(block);
151                continue;
152            }
153
154            should_reset_non_finalized_state =
155                self.sync_once(target_tip_hash, target_tip_height).await;
156
157            info!(?should_reset_non_finalized_state, "finished sync_once");
158        }
159    }
160
161    /// Returns a bool indicating whether there was an unexpected block hash at some height indicating that
162    /// there was a chain reorg in the connected zebrad instance.
163    async fn sync_once(&mut self, target_tip_hash: block::Hash, target_tip_height: Height) -> bool {
164        let rpc_client = self.rpc_client.clone();
165
166        // If the new best chain tip is unavailable in the finalized state, start syncing non-finalized blocks from
167        // the non-finalized best chain tip height or finalized tip height.
168        let (next_block_height, mut current_tip_hash) =
169            self.next_block_height_and_prev_hash().await;
170
171        info!(
172            ?next_block_height,
173            ?current_tip_hash,
174            "syncing non-finalized blocks from the best chain"
175        );
176
177        let mut block_futs = rpc_client.block_range_ordered(next_block_height..=target_tip_height);
178
179        loop {
180            let block = match block_futs.next().await {
181                Some(Ok(Some(block))) if block.header.previous_block_hash == current_tip_hash => {
182                    SemanticallyVerifiedBlock::from(block)
183                }
184                // Clear the non-finalized state and re-fetch every block past the finalized tip if:
185                // - the next block's previous block hash doesn't match the expected hash,
186                // - the next block is missing
187                // - the target tip hash is missing from the blocks in `block_futs`
188                // because there was likely a chain re-org/fork.
189                Some(Ok(_)) | None => {
190                    info!("mismatch between block hash and prev hash of next expected block");
191
192                    break true;
193                }
194                // If calling the `getblock` RPC method fails with an unexpected error, wait for the next chain tip change
195                // without resetting the non-finalized state.
196                Some(Err(err)) => {
197                    tracing::warn!(
198                        ?err,
199                        "encountered an unexpected error while calling getblock method"
200                    );
201
202                    break false;
203                }
204            };
205
206            // # Correctness
207            //
208            // Ensure that the secondary rocksdb instance has caught up to the primary instance
209            // before attempting to commit the new block to the non-finalized state. It is sufficient
210            // to call this once here, as a new chain tip block has already been retrieved and so
211            // we know that the primary rocksdb instance has already been updated.
212            self.try_catch_up_with_primary().await;
213
214            let block_hash = block.hash;
215            let commit_result = if self.non_finalized_state.chain_count() == 0 {
216                self.non_finalized_state
217                    .commit_new_chain(block.clone(), &self.db)
218            } else {
219                self.non_finalized_state
220                    .commit_block(block.clone(), &self.db)
221            };
222
223            // The previous block hash is checked above, if committing the block fails for some reason, try again.
224            if let Err(error) = commit_result {
225                tracing::warn!(
226                    ?error,
227                    ?block_hash,
228                    "failed to commit block to non-finalized state"
229                );
230
231                break false;
232            }
233
234            // TODO: Check the finalized tip height and finalize blocks from the non-finalized state until
235            //       all non-finalized state chain root previous block hashes match the finalized tip hash.
236            while self
237                .non_finalized_state
238                .best_chain_len()
239                .expect("just successfully inserted a non-finalized block above")
240                > MAX_BLOCK_REORG_HEIGHT
241            {
242                tracing::trace!("finalizing block past the reorg limit");
243                self.non_finalized_state.finalize();
244            }
245
246            self.update_channels(block);
247            current_tip_hash = block_hash;
248
249            // If the block hash matches the output from the `getbestblockhash` RPC method, we can wait until
250            // the best block hash changes to get the next block.
251            if block_hash == target_tip_hash {
252                break false;
253            }
254        }
255    }
256
257    async fn wait_for_tip_change(&mut self) -> (block::Hash, block::Height) {
258        loop {
259            self.subscribe_to_chain_tip_change(false).await;
260
261            if let Some(stream) = self.chain_tip_change.as_mut() {
262                if let Some(block_hash_and_height) = stream
263                    .lock()
264                    .await
265                    .message()
266                    .await
267                    .ok()
268                    .flatten()
269                    .and_then(BlockHashAndHeight::try_into_hash_and_height)
270                {
271                    return block_hash_and_height;
272                }
273            }
274
275            // wait [`POLL_DELAY`], then try again, if:
276            // - calling `chain_tip_change()` fails,
277            // - it closes the stream, or
278            // - it returns an error.
279            tokio::time::sleep(POLL_DELAY).await;
280        }
281    }
282
283    /// If `should_replace` is true or if it is false and there is no existing chain tip change stream,
284    /// calls `chain_tip_change()` on the indexer RPC client to subscribe to chain tip changes and sets
285    /// the `chain_tip_change` field as the new stream if it succeeds.
286    async fn subscribe_to_chain_tip_change(&mut self, should_replace: bool) {
287        if !should_replace && self.chain_tip_change.is_some() {
288            return;
289        }
290
291        self.chain_tip_change = self
292            .indexer_rpc_client
293            .clone()
294            .chain_tip_change(Empty {})
295            .await
296            .map(|a| Mutex::new(a.into_inner()))
297            .ok()
298            .or(self.chain_tip_change.take());
299    }
300
301    /// Tries to catch up to the primary db instance for an up-to-date view of finalized blocks.
302    async fn try_catch_up_with_primary(&self) {
303        let db = self.db.clone();
304        tokio::task::spawn_blocking(move || {
305            if let Err(catch_up_error) = db.try_catch_up_with_primary() {
306                tracing::warn!(?catch_up_error, "failed to catch up to primary");
307            }
308        })
309        .wait_for_panics()
310        .await
311    }
312
313    /// If the non-finalized state is empty, tries to catch up to the primary db instance for
314    /// an up-to-date view of finalized blocks.
315    ///
316    /// Returns true if the non-finalized state is empty and the target hash is in the finalized state.
317    async fn is_finalized_tip_change(&self, target_tip_hash: block::Hash) -> bool {
318        self.non_finalized_state.chain_count() == 0 && {
319            let db = self.db.clone();
320            tokio::task::spawn_blocking(move || {
321                if let Err(catch_up_error) = db.try_catch_up_with_primary() {
322                    tracing::warn!(?catch_up_error, "failed to catch up to primary");
323                }
324                db.contains_hash(target_tip_hash)
325            })
326            .wait_for_panics()
327            .await
328        }
329    }
330
331    /// Returns the current tip hash and the next height immediately after the current tip height.
332    async fn next_block_height_and_prev_hash(&self) -> (block::Height, block::Hash) {
333        if let Some(tip) = self.non_finalized_state.best_tip() {
334            Some(tip)
335        } else {
336            let db = self.db.clone();
337            tokio::task::spawn_blocking(move || db.tip())
338                .wait_for_panics()
339                .await
340        }
341        .map(|(current_tip_height, current_tip_hash)| {
342            (
343                current_tip_height.next().expect("should be valid height"),
344                current_tip_hash,
345            )
346        })
347        .unwrap_or((Height::MIN, GENESIS_PREVIOUS_BLOCK_HASH))
348    }
349
350    /// Reads the finalized tip block from the secondary db instance and converts it to a [`ChainTipBlock`].
351    async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
352        let db = self.db.clone();
353        tokio::task::spawn_blocking(move || {
354            let (height, hash) = db.tip()?;
355            db.block(height.into())
356                .map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
357                .map(ChainTipBlock::from)
358        })
359        .wait_for_panics()
360        .await
361    }
362
363    /// Sends the new chain tip and non-finalized state to the latest chain channels.
364    fn update_channels(&mut self, best_tip: impl Into<ChainTipBlock>) {
365        // If the final receiver was just dropped, ignore the error.
366        let _ = self
367            .non_finalized_state_sender
368            .send(self.non_finalized_state.clone());
369        self.chain_tip_sender
370            .set_best_non_finalized_tip(Some(best_tip.into()));
371    }
372}
373
374/// Accepts a [zebra-state configuration](zebra_state::Config), a [`Network`], and
375/// the [`SocketAddr`] of a Zebra node's RPC server.
376///
377/// Initializes a [`ReadStateService`] and a [`TrustedChainSync`] to update the
378/// non-finalized best chain and the latest chain tip.
379///
380/// Returns a [`ReadStateService`], [`LatestChainTip`], [`ChainTipChange`], and
381/// a [`JoinHandle`] for the sync task.
382pub fn init_read_state_with_syncer(
383    config: zebra_state::Config,
384    network: &Network,
385    rpc_address: SocketAddr,
386    indexer_rpc_address: SocketAddr,
387) -> tokio::task::JoinHandle<
388    Result<
389        (
390            ReadStateService,
391            LatestChainTip,
392            ChainTipChange,
393            tokio::task::JoinHandle<()>,
394        ),
395        BoxError,
396    >,
397> {
398    let network = network.clone();
399    tokio::spawn(async move {
400        if config.ephemeral {
401            return Err("standalone read state service cannot be used with ephemeral state".into());
402        }
403
404        let (read_state, db, non_finalized_state_sender) =
405            spawn_init_read_only(config, &network).await?;
406        let (latest_chain_tip, chain_tip_change, sync_task) = TrustedChainSync::spawn(
407            rpc_address,
408            indexer_rpc_address,
409            db,
410            non_finalized_state_sender,
411        )
412        .await?;
413        Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
414    })
415}
416
417trait SyncerRpcMethods {
418    async fn get_best_block_height_and_hash(
419        &self,
420    ) -> Result<(block::Height, block::Hash), BoxError>;
421    async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError>;
422    fn block_range_ordered(
423        &self,
424        height_range: RangeInclusive<Height>,
425    ) -> FuturesOrdered<impl std::future::Future<Output = Result<Option<Arc<Block>>, BoxError>>>
426    {
427        let &Height(start_height) = height_range.start();
428        let &Height(end_height) = height_range.end();
429        let mut futs = FuturesOrdered::new();
430
431        for height in start_height..=end_height {
432            futs.push_back(self.get_block(height));
433        }
434
435        futs
436    }
437}
438
439impl SyncerRpcMethods for RpcRequestClient {
440    async fn get_best_block_height_and_hash(
441        &self,
442    ) -> Result<(block::Height, block::Hash), BoxError> {
443        self.json_result_from_call("getbestblockheightandhash", "[]")
444            .await
445            .map(|GetBlockHeightAndHash { height, hash }| (height, hash))
446    }
447
448    async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError> {
449        match self
450            .json_result_from_call("getblock", format!(r#"["{}", 0]"#, height))
451            .await
452        {
453            Ok(HexData(raw_block)) => {
454                let block = raw_block.zcash_deserialize_into::<Block>()?;
455                Ok(Some(Arc::new(block)))
456            }
457            Err(err)
458                if err
459                    .downcast_ref::<jsonrpsee_types::ErrorCode>()
460                    .is_some_and(|err| {
461                        let code: i32 = server::error::LegacyCode::InvalidParameter.into();
462                        err.code() == code
463                    }) =>
464            {
465                Ok(None)
466            }
467            Err(err) => Err(err),
468        }
469    }
470}