zebra_rpc/
sync.rs

1//! Syncer task for maintaining a non-finalized state in Zebra's ReadStateService and updating `ChainTipSender` via RPCs
2
3use std::{net::SocketAddr, sync::Arc, time::Duration};
4
5use tokio::task::JoinHandle;
6use tonic::{Status, Streaming};
7use tower::BoxError;
8use zebra_chain::{block::Height, parameters::Network};
9use zebra_state::{
10    spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
11    LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock,
12    ValidateContextError, ZebraDb,
13};
14
15use zebra_chain::diagnostic::task::WaitForPanics;
16
17use crate::indexer::{indexer_client::IndexerClient, BlockAndHash, Empty};
18
19/// How long to wait between calls to `subscribe_to_non_finalized_state_change` when it returns an error.
20const POLL_DELAY: Duration = Duration::from_secs(5);
21
22/// Syncs non-finalized blocks in the best chain from a trusted Zebra node's RPC methods.
23#[derive(Debug)]
24pub struct TrustedChainSync {
25    /// gRPC client for calling Zebra's indexer methods.
26    pub indexer_rpc_client: IndexerClient<tonic::transport::Channel>,
27    /// The read state service.
28    db: ZebraDb,
29    /// The non-finalized state - currently only contains the best chain.
30    non_finalized_state: NonFinalizedState,
31    /// The chain tip sender for updating [`LatestChainTip`] and [`ChainTipChange`].
32    chain_tip_sender: ChainTipSender,
33    /// The non-finalized state sender, for updating the [`ReadStateService`] when the non-finalized best chain changes.
34    non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
35}
36
37impl TrustedChainSync {
38    /// Creates a new [`TrustedChainSync`] with a [`ChainTipSender`], then spawns a task to sync blocks
39    /// from the node's non-finalized best chain.
40    ///
41    /// Returns the [`LatestChainTip`], [`ChainTipChange`], and a [`JoinHandle`] for the sync task.
42    pub async fn spawn(
43        indexer_rpc_address: SocketAddr,
44        db: ZebraDb,
45        non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
46    ) -> Result<(LatestChainTip, ChainTipChange, JoinHandle<()>), BoxError> {
47        let non_finalized_state = NonFinalizedState::new(&db.network());
48        let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
49            ChainTipSender::new(None, &db.network());
50        let mut indexer_rpc_client =
51            IndexerClient::connect(format!("http://{indexer_rpc_address}")).await?;
52        let mut finalized_chain_tip_sender = chain_tip_sender.finalized_sender();
53
54        let mut syncer = Self {
55            indexer_rpc_client: indexer_rpc_client.clone(),
56            db: db.clone(),
57            non_finalized_state,
58            chain_tip_sender,
59            non_finalized_state_sender,
60        };
61
62        // Spawn a task to send finalized chain tip changes to the chain tip change and latest chain tip channels.
63        tokio::spawn(async move {
64            let mut chain_tip_change_stream = None;
65
66            loop {
67                let Some(ref mut chain_tip_change) = chain_tip_change_stream else {
68                    chain_tip_change_stream = match indexer_rpc_client
69                        .chain_tip_change(Empty {})
70                        .await
71                        .map(|a| a.into_inner())
72                    {
73                        Ok(listener) => Some(listener),
74                        Err(err) => {
75                            tracing::warn!(
76                                ?err,
77                                "failed to subscribe to non-finalized state changes"
78                            );
79                            tokio::time::sleep(POLL_DELAY).await;
80                            None
81                        }
82                    };
83
84                    continue;
85                };
86
87                let message = match chain_tip_change.message().await {
88                    Ok(Some(block_hash_and_height)) => block_hash_and_height,
89                    Ok(None) => {
90                        tracing::warn!("chain_tip_change stream ended unexpectedly");
91                        chain_tip_change_stream = None;
92                        continue;
93                    }
94                    Err(err) => {
95                        tracing::warn!(?err, "error receiving chain tip change");
96                        chain_tip_change_stream = None;
97                        continue;
98                    }
99                };
100
101                let Some((hash, _height)) = message.try_into_hash_and_height() else {
102                    tracing::warn!("failed to convert message into a block hash and height");
103                    continue;
104                };
105
106                // Skip the chain tip change if catching up to the primary db instance fails.
107                if db.spawn_try_catch_up_with_primary().await.is_err() {
108                    continue;
109                }
110
111                // End the task and let the `TrustedChainSync::sync()` method send non-finalized chain tip updates if
112                // the latest chain tip hash is not present in the db.
113                let Some(tip_block) = db.block(hash.into()) else {
114                    return;
115                };
116
117                finalized_chain_tip_sender.set_finalized_tip(Some(
118                    SemanticallyVerifiedBlock::with_hash(tip_block, hash).into(),
119                ));
120            }
121        });
122
123        let sync_task = tokio::spawn(async move {
124            syncer.sync().await;
125        });
126
127        Ok((latest_chain_tip, chain_tip_change, sync_task))
128    }
129
130    /// Starts syncing blocks from the node's non-finalized best chain and checking for chain tip changes in the finalized state.
131    ///
132    /// When the best chain tip in Zebra is not available in the finalized state or the local non-finalized state,
133    /// gets any unavailable blocks in Zebra's best chain from the RPC server, adds them to the local non-finalized state, then
134    /// sends the updated chain tip block and non-finalized state to the [`ChainTipSender`] and non-finalized state sender.
135    #[tracing::instrument(skip_all)]
136    async fn sync(&mut self) {
137        let mut non_finalized_blocks_listener = None;
138        self.try_catch_up_with_primary().await;
139        if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
140            self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
141        }
142
143        loop {
144            let Some(ref mut non_finalized_state_change) = non_finalized_blocks_listener else {
145                non_finalized_blocks_listener = match self
146                    .subscribe_to_non_finalized_state_change()
147                    .await
148                {
149                    Ok(listener) => Some(listener),
150                    Err(err) => {
151                        tracing::warn!(?err, "failed to subscribe to non-finalized state changes");
152                        tokio::time::sleep(POLL_DELAY).await;
153                        None
154                    }
155                };
156
157                continue;
158            };
159
160            let message = match non_finalized_state_change.message().await {
161                Ok(Some(block_and_hash)) => block_and_hash,
162                Ok(None) => {
163                    tracing::warn!("non-finalized state change stream ended unexpectedly");
164                    non_finalized_blocks_listener = None;
165                    continue;
166                }
167                Err(err) => {
168                    tracing::warn!(?err, "error receiving non-finalized state change");
169                    non_finalized_blocks_listener = None;
170                    continue;
171                }
172            };
173
174            let Some((block, hash)) = message.decode() else {
175                tracing::warn!("received malformed non-finalized state change message");
176                non_finalized_blocks_listener = None;
177                continue;
178            };
179
180            if self.non_finalized_state.any_chain_contains(&hash) {
181                tracing::info!(?hash, "non-finalized state already contains block");
182                continue;
183            }
184
185            let block = SemanticallyVerifiedBlock::with_hash(Arc::new(block), hash);
186            match self.try_commit(block.clone()).await {
187                Ok(()) => {
188                    while self
189                        .non_finalized_state
190                        .root_height()
191                        .expect("just successfully inserted a non-finalized block above")
192                        <= self.db.finalized_tip_height().unwrap_or(Height::MIN)
193                    {
194                        tracing::trace!("finalizing block past the reorg limit");
195                        self.non_finalized_state.finalize();
196                    }
197
198                    self.update_channels();
199                }
200                Err(error) => {
201                    tracing::warn!(
202                        ?error,
203                        ?hash,
204                        "failed to commit block to non-finalized state"
205                    );
206
207                    // TODO: Investigate whether it would be correct to ignore some errors here instead of
208                    //       trying every block in the non-finalized state again.
209                    non_finalized_blocks_listener = None;
210                }
211            };
212        }
213    }
214
215    async fn try_commit(
216        &mut self,
217        block: SemanticallyVerifiedBlock,
218    ) -> Result<(), ValidateContextError> {
219        self.try_catch_up_with_primary().await;
220
221        if self.db.finalized_tip_hash() == block.block.header.previous_block_hash {
222            self.non_finalized_state.commit_new_chain(block, &self.db)
223        } else {
224            self.non_finalized_state.commit_block(block, &self.db)
225        }
226    }
227
228    /// Calls `non_finalized_state_change()` method on the indexer gRPC client to subscribe
229    /// to non-finalized state changes, and returns the response stream.
230    async fn subscribe_to_non_finalized_state_change(
231        &mut self,
232    ) -> Result<Streaming<BlockAndHash>, Status> {
233        self.indexer_rpc_client
234            .clone()
235            .non_finalized_state_change(Empty {})
236            .await
237            .map(|a| a.into_inner())
238    }
239
240    /// Tries to catch up to the primary db instance for an up-to-date view of finalized blocks.
241    async fn try_catch_up_with_primary(&self) {
242        let _ = self.db.spawn_try_catch_up_with_primary().await;
243    }
244
245    /// Reads the finalized tip block from the secondary db instance and converts it to a [`ChainTipBlock`].
246    async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
247        let db = self.db.clone();
248        tokio::task::spawn_blocking(move || {
249            let (height, hash) = db.tip()?;
250            db.block(height.into())
251                .map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
252                .map(ChainTipBlock::from)
253        })
254        .wait_for_panics()
255        .await
256    }
257
258    /// Sends the new chain tip and non-finalized state to the latest chain channels.
259    // TODO: Replace this with the `update_latest_chain_channels()` fn in `write.rs`.
260    fn update_channels(&mut self) {
261        // If the final receiver was just dropped, ignore the error.
262        let _ = self
263            .non_finalized_state_sender
264            .send(self.non_finalized_state.clone());
265
266        let best_chain = self.non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
267
268        let tip_block = best_chain
269            .tip_block()
270            .expect(
271                "unexpected empty chain: must commit at least one block before updating channels",
272            )
273            .clone();
274
275        self.chain_tip_sender
276            .set_best_non_finalized_tip(Some(tip_block.into()));
277    }
278}
279
280/// Accepts a [zebra-state configuration](zebra_state::Config), a [`Network`], and
281/// the [`SocketAddr`] of a Zebra node's RPC server.
282///
283/// Initializes a [`ReadStateService`] and a [`TrustedChainSync`] to update the
284/// non-finalized best chain and the latest chain tip.
285///
286/// Returns a [`ReadStateService`], [`LatestChainTip`], [`ChainTipChange`], and
287/// a [`JoinHandle`] for the sync task.
288pub fn init_read_state_with_syncer(
289    config: zebra_state::Config,
290    network: &Network,
291    indexer_rpc_address: SocketAddr,
292) -> tokio::task::JoinHandle<
293    Result<
294        (
295            ReadStateService,
296            LatestChainTip,
297            ChainTipChange,
298            tokio::task::JoinHandle<()>,
299        ),
300        BoxError,
301    >,
302> {
303    let network = network.clone();
304    tokio::spawn(async move {
305        if config.ephemeral {
306            return Err("standalone read state service cannot be used with ephemeral state".into());
307        }
308
309        let (read_state, db, non_finalized_state_sender) =
310            spawn_init_read_only(config, &network).await?;
311        let (latest_chain_tip, chain_tip_change, sync_task) =
312            TrustedChainSync::spawn(indexer_rpc_address, db, non_finalized_state_sender).await?;
313        Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
314    })
315}