1use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration};
4
5use futures::{stream::FuturesOrdered, StreamExt};
6use tokio::task::JoinHandle;
7use tower::BoxError;
8use tracing::info;
9use zebra_chain::{
10 block::{self, Block, Height},
11 parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
12 serialization::ZcashDeserializeInto,
13};
14use zebra_node_services::rpc_client::RpcRequestClient;
15use zebra_state::{
16 spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
17 LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
18 MAX_BLOCK_REORG_HEIGHT,
19};
20
21use zebra_chain::diagnostic::task::WaitForPanics;
22
23use crate::{
24 methods::{hex_data::HexData, GetBlockHeightAndHash},
25 server,
26};
27
28const POLL_DELAY: Duration = Duration::from_millis(200);
35
36#[derive(Debug)]
38struct TrustedChainSync {
39 rpc_client: RpcRequestClient,
41 db: ZebraDb,
43 non_finalized_state: NonFinalizedState,
45 chain_tip_sender: ChainTipSender,
47 non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
49}
50
51impl TrustedChainSync {
52 pub async fn spawn(
57 rpc_address: SocketAddr,
58 db: ZebraDb,
59 non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
60 ) -> (LatestChainTip, ChainTipChange, JoinHandle<()>) {
61 let rpc_client = RpcRequestClient::new(rpc_address);
62 let non_finalized_state = NonFinalizedState::new(&db.network());
63 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
64 ChainTipSender::new(None, &db.network());
65
66 let mut syncer = Self {
67 rpc_client,
68 db,
69 non_finalized_state,
70 chain_tip_sender,
71 non_finalized_state_sender,
72 };
73
74 let sync_task = tokio::spawn(async move {
75 syncer.sync().await;
76 });
77
78 (latest_chain_tip, chain_tip_change, sync_task)
79 }
80
81 async fn sync(&mut self) {
87 self.try_catch_up_with_primary().await;
88 let mut last_chain_tip_hash =
89 if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
90 let last_chain_tip_hash = finalized_tip_block.hash;
91 self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
92 last_chain_tip_hash
93 } else {
94 GENESIS_PREVIOUS_BLOCK_HASH
95 };
96
97 loop {
98 let (target_tip_height, target_tip_hash) =
99 self.wait_for_chain_tip_change(last_chain_tip_hash).await;
100
101 info!(
102 ?target_tip_height,
103 ?target_tip_hash,
104 "got a chain tip change"
105 );
106
107 if self.is_finalized_tip_change(target_tip_hash).await {
108 let block = self.finalized_chain_tip_block().await.expect(
109 "should have genesis block after successful bestblockheightandhash response",
110 );
111
112 last_chain_tip_hash = block.hash;
113 self.chain_tip_sender.set_finalized_tip(block);
114 continue;
115 }
116
117 let (next_block_height, mut current_tip_hash) =
120 self.next_block_height_and_prev_hash().await;
121
122 last_chain_tip_hash = current_tip_hash;
123
124 let rpc_client = self.rpc_client.clone();
125 let mut block_futs =
126 rpc_client.block_range_ordered(next_block_height..=target_tip_height);
127
128 let should_reset_non_finalized_state = loop {
129 let block = match block_futs.next().await {
130 Some(Ok(Some(block)))
131 if block.header.previous_block_hash == current_tip_hash =>
132 {
133 SemanticallyVerifiedBlock::from(block)
134 }
135 Some(Ok(_)) | None => break true,
141 Some(Err(err)) => {
144 tracing::warn!(
145 ?err,
146 "encountered an unexpected error while calling getblock method"
147 );
148
149 break false;
150 }
151 };
152
153 self.try_catch_up_with_primary().await;
160
161 let block_hash = block.hash;
162 let commit_result = if self.non_finalized_state.chain_count() == 0 {
163 self.non_finalized_state
164 .commit_new_chain(block.clone(), &self.db)
165 } else {
166 self.non_finalized_state
167 .commit_block(block.clone(), &self.db)
168 };
169
170 if let Err(error) = commit_result {
172 tracing::warn!(
173 ?error,
174 ?block_hash,
175 "failed to commit block to non-finalized state"
176 );
177
178 break false;
179 }
180
181 while self
184 .non_finalized_state
185 .best_chain_len()
186 .expect("just successfully inserted a non-finalized block above")
187 > MAX_BLOCK_REORG_HEIGHT
188 {
189 tracing::trace!("finalizing block past the reorg limit");
190 self.non_finalized_state.finalize();
191 }
192
193 self.update_channels(block);
194 current_tip_hash = block_hash;
195 last_chain_tip_hash = current_tip_hash;
196
197 if block_hash == target_tip_hash {
200 break false;
201 }
202 };
203
204 if should_reset_non_finalized_state {
205 self.try_catch_up_with_primary().await;
206 let block = self.finalized_chain_tip_block().await.expect(
207 "should have genesis block after successful bestblockheightandhash response",
208 );
209
210 last_chain_tip_hash = block.hash;
211 self.non_finalized_state =
212 NonFinalizedState::new(&self.non_finalized_state.network);
213 self.update_channels(block);
214 }
215 }
216 }
217
218 async fn try_catch_up_with_primary(&self) {
220 let db = self.db.clone();
221 tokio::task::spawn_blocking(move || {
222 if let Err(catch_up_error) = db.try_catch_up_with_primary() {
223 tracing::warn!(?catch_up_error, "failed to catch up to primary");
224 }
225 })
226 .wait_for_panics()
227 .await
228 }
229
230 async fn is_finalized_tip_change(&self, target_tip_hash: block::Hash) -> bool {
235 self.non_finalized_state.chain_count() == 0 && {
236 let db = self.db.clone();
237 tokio::task::spawn_blocking(move || {
238 if let Err(catch_up_error) = db.try_catch_up_with_primary() {
239 tracing::warn!(?catch_up_error, "failed to catch up to primary");
240 }
241 db.contains_hash(target_tip_hash)
242 })
243 .wait_for_panics()
244 .await
245 }
246 }
247
248 async fn next_block_height_and_prev_hash(&self) -> (block::Height, block::Hash) {
250 if let Some(tip) = self.non_finalized_state.best_tip() {
251 Some(tip)
252 } else {
253 let db = self.db.clone();
254 tokio::task::spawn_blocking(move || db.tip())
255 .wait_for_panics()
256 .await
257 }
258 .map(|(current_tip_height, current_tip_hash)| {
259 (
260 current_tip_height.next().expect("should be valid height"),
261 current_tip_hash,
262 )
263 })
264 .unwrap_or((Height::MIN, GENESIS_PREVIOUS_BLOCK_HASH))
265 }
266
267 async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
269 let db = self.db.clone();
270 tokio::task::spawn_blocking(move || {
271 let (height, hash) = db.tip()?;
272 db.block(height.into())
273 .map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
274 .map(ChainTipBlock::from)
275 })
276 .wait_for_panics()
277 .await
278 }
279
280 async fn wait_for_chain_tip_change(
286 &self,
287 last_chain_tip_hash: block::Hash,
288 ) -> (block::Height, block::Hash) {
289 loop {
290 let Some(target_height_and_hash) = self
291 .rpc_client
292 .get_best_block_height_and_hash()
293 .await
294 .filter(|&(_height, hash)| hash != last_chain_tip_hash)
295 else {
296 tokio::time::sleep(POLL_DELAY).await;
299 continue;
300 };
301
302 break target_height_and_hash;
303 }
304 }
305
306 fn update_channels(&mut self, best_tip: impl Into<ChainTipBlock>) {
308 let _ = self
310 .non_finalized_state_sender
311 .send(self.non_finalized_state.clone());
312 self.chain_tip_sender
313 .set_best_non_finalized_tip(Some(best_tip.into()));
314 }
315}
316
317pub fn init_read_state_with_syncer(
326 config: zebra_state::Config,
327 network: &Network,
328 rpc_address: SocketAddr,
329) -> tokio::task::JoinHandle<
330 Result<
331 (
332 ReadStateService,
333 LatestChainTip,
334 ChainTipChange,
335 tokio::task::JoinHandle<()>,
336 ),
337 BoxError,
338 >,
339> {
340 let network = network.clone();
341 tokio::spawn(async move {
342 if config.ephemeral {
343 return Err("standalone read state service cannot be used with ephemeral state".into());
344 }
345
346 let (read_state, db, non_finalized_state_sender) =
347 spawn_init_read_only(config, &network).await?;
348 let (latest_chain_tip, chain_tip_change, sync_task) =
349 TrustedChainSync::spawn(rpc_address, db, non_finalized_state_sender).await;
350 Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
351 })
352}
353
354trait SyncerRpcMethods {
355 async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)>;
356 async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError>;
357 fn block_range_ordered(
358 &self,
359 height_range: RangeInclusive<Height>,
360 ) -> FuturesOrdered<impl std::future::Future<Output = Result<Option<Arc<Block>>, BoxError>>>
361 {
362 let &Height(start_height) = height_range.start();
363 let &Height(end_height) = height_range.end();
364 let mut futs = FuturesOrdered::new();
365
366 for height in start_height..=end_height {
367 futs.push_back(self.get_block(height));
368 }
369
370 futs
371 }
372}
373
374impl SyncerRpcMethods for RpcRequestClient {
375 async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
376 self.json_result_from_call("getbestblockheightandhash", "[]")
377 .await
378 .map(|GetBlockHeightAndHash { height, hash }| (height, hash))
379 .ok()
380 }
381
382 async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError> {
383 match self
384 .json_result_from_call("getblock", format!(r#"["{}", 0]"#, height))
385 .await
386 {
387 Ok(HexData(raw_block)) => {
388 let block = raw_block.zcash_deserialize_into::<Block>()?;
389 Ok(Some(Arc::new(block)))
390 }
391 Err(err)
392 if err
393 .downcast_ref::<jsonrpsee_types::ErrorCode>()
394 .is_some_and(|err| {
395 let code: i32 = server::error::LegacyCode::InvalidParameter.into();
396 err.code() == code
397 }) =>
398 {
399 Ok(None)
400 }
401 Err(err) => Err(err),
402 }
403 }
404}