1use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration};
4
5use futures::{stream::FuturesOrdered, StreamExt};
6use tokio::{sync::Mutex, task::JoinHandle};
7use tower::BoxError;
8use tracing::info;
9use zebra_chain::{
10 block::{self, Block, Height},
11 parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
12 serialization::ZcashDeserializeInto,
13};
14use zebra_node_services::rpc_client::RpcRequestClient;
15use zebra_state::{
16 spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
17 LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
18 MAX_BLOCK_REORG_HEIGHT,
19};
20
21use zebra_chain::diagnostic::task::WaitForPanics;
22
23use crate::{
24 indexer::{self, indexer_client::IndexerClient, BlockHashAndHeight, Empty},
25 methods::{hex_data::HexData, GetBlockHeightAndHash},
26 server,
27};
28
29const POLL_DELAY: Duration = Duration::from_millis(200);
31
32#[derive(Debug)]
34pub struct TrustedChainSync {
35 rpc_client: RpcRequestClient,
37 pub indexer_rpc_client: IndexerClient<tonic::transport::Channel>,
39 chain_tip_change: Option<Mutex<tonic::Streaming<indexer::BlockHashAndHeight>>>,
41 db: ZebraDb,
43 non_finalized_state: NonFinalizedState,
45 chain_tip_sender: ChainTipSender,
47 non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
49}
50
51impl TrustedChainSync {
52 pub async fn spawn(
57 rpc_address: SocketAddr,
58 indexer_rpc_address: SocketAddr,
59 db: ZebraDb,
60 non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
61 ) -> Result<(LatestChainTip, ChainTipChange, JoinHandle<()>), BoxError> {
62 let rpc_client = RpcRequestClient::new(rpc_address);
63 let indexer_rpc_client =
64 IndexerClient::connect(format!("http://{indexer_rpc_address}")).await?;
65
66 let non_finalized_state = NonFinalizedState::new(&db.network());
67 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
68 ChainTipSender::new(None, &db.network());
69
70 let mut syncer = Self {
71 rpc_client,
72 indexer_rpc_client,
73 chain_tip_change: None,
74 db,
75 non_finalized_state,
76 chain_tip_sender,
77 non_finalized_state_sender,
78 };
79
80 let sync_task = tokio::spawn(async move {
81 syncer.sync().await;
82 });
83
84 Ok((latest_chain_tip, chain_tip_change, sync_task))
85 }
86
87 async fn sync(&mut self) {
93 let mut should_reset_non_finalized_state = false;
94 self.try_catch_up_with_primary().await;
95 if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
96 self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
97 }
98
99 loop {
100 tracing::info!(
101 ?should_reset_non_finalized_state,
102 "waiting for a chain tip change"
103 );
104
105 let (target_tip_hash, target_tip_height) = if !should_reset_non_finalized_state {
106 self.wait_for_tip_change().await
107 } else {
108 match self.rpc_client.get_best_block_height_and_hash().await {
109 Ok((height, hash)) => {
110 info!(
111 ?height,
112 ?hash,
113 "got best height and hash from jsonrpc after resetting non-finalized state"
114 );
115
116 self.try_catch_up_with_primary().await;
117 let block: ChainTipBlock = self.finalized_chain_tip_block().await.expect(
118 "should have genesis block after successful bestblockheightandhash response",
119 );
120
121 self.non_finalized_state =
122 NonFinalizedState::new(&self.non_finalized_state.network);
123
124 self.update_channels(block);
125
126 should_reset_non_finalized_state = false;
127 (hash, height)
128 }
129 Err(error) => {
130 tracing::warn!(?error, "failed to get best block height and hash");
131 tokio::time::sleep(POLL_DELAY).await;
133 continue;
134 }
135 }
136 };
137
138 info!(
139 ?target_tip_height,
140 ?target_tip_hash,
141 "got a chain tip change"
142 );
143
144 if self.is_finalized_tip_change(target_tip_hash).await {
145 let block = self
146 .finalized_chain_tip_block()
147 .await
148 .expect("should have genesis block after a chain tip change");
149
150 self.chain_tip_sender.set_finalized_tip(block);
151 continue;
152 }
153
154 should_reset_non_finalized_state =
155 self.sync_once(target_tip_hash, target_tip_height).await;
156
157 info!(?should_reset_non_finalized_state, "finished sync_once");
158 }
159 }
160
161 async fn sync_once(&mut self, target_tip_hash: block::Hash, target_tip_height: Height) -> bool {
164 let rpc_client = self.rpc_client.clone();
165
166 let (next_block_height, mut current_tip_hash) =
169 self.next_block_height_and_prev_hash().await;
170
171 info!(
172 ?next_block_height,
173 ?current_tip_hash,
174 "syncing non-finalized blocks from the best chain"
175 );
176
177 let mut block_futs = rpc_client.block_range_ordered(next_block_height..=target_tip_height);
178
179 loop {
180 let block = match block_futs.next().await {
181 Some(Ok(Some(block))) if block.header.previous_block_hash == current_tip_hash => {
182 SemanticallyVerifiedBlock::from(block)
183 }
184 Some(Ok(_)) | None => {
190 info!("mismatch between block hash and prev hash of next expected block");
191
192 break true;
193 }
194 Some(Err(err)) => {
197 tracing::warn!(
198 ?err,
199 "encountered an unexpected error while calling getblock method"
200 );
201
202 break false;
203 }
204 };
205
206 self.try_catch_up_with_primary().await;
213
214 let block_hash = block.hash;
215 let commit_result = if self.non_finalized_state.chain_count() == 0 {
216 self.non_finalized_state
217 .commit_new_chain(block.clone(), &self.db)
218 } else {
219 self.non_finalized_state
220 .commit_block(block.clone(), &self.db)
221 };
222
223 if let Err(error) = commit_result {
225 tracing::warn!(
226 ?error,
227 ?block_hash,
228 "failed to commit block to non-finalized state"
229 );
230
231 break false;
232 }
233
234 while self
237 .non_finalized_state
238 .best_chain_len()
239 .expect("just successfully inserted a non-finalized block above")
240 > MAX_BLOCK_REORG_HEIGHT
241 {
242 tracing::trace!("finalizing block past the reorg limit");
243 self.non_finalized_state.finalize();
244 }
245
246 self.update_channels(block);
247 current_tip_hash = block_hash;
248
249 if block_hash == target_tip_hash {
252 break false;
253 }
254 }
255 }
256
257 async fn wait_for_tip_change(&mut self) -> (block::Hash, block::Height) {
258 loop {
259 self.subscribe_to_chain_tip_change(false).await;
260
261 if let Some(stream) = self.chain_tip_change.as_mut() {
262 if let Some(block_hash_and_height) = stream
263 .lock()
264 .await
265 .message()
266 .await
267 .ok()
268 .flatten()
269 .and_then(BlockHashAndHeight::try_into_hash_and_height)
270 {
271 return block_hash_and_height;
272 }
273 }
274
275 tokio::time::sleep(POLL_DELAY).await;
280 }
281 }
282
283 async fn subscribe_to_chain_tip_change(&mut self, should_replace: bool) {
287 if !should_replace && self.chain_tip_change.is_some() {
288 return;
289 }
290
291 self.chain_tip_change = self
292 .indexer_rpc_client
293 .clone()
294 .chain_tip_change(Empty {})
295 .await
296 .map(|a| Mutex::new(a.into_inner()))
297 .ok()
298 .or(self.chain_tip_change.take());
299 }
300
301 async fn try_catch_up_with_primary(&self) {
303 let db = self.db.clone();
304 tokio::task::spawn_blocking(move || {
305 if let Err(catch_up_error) = db.try_catch_up_with_primary() {
306 tracing::warn!(?catch_up_error, "failed to catch up to primary");
307 }
308 })
309 .wait_for_panics()
310 .await
311 }
312
313 async fn is_finalized_tip_change(&self, target_tip_hash: block::Hash) -> bool {
318 self.non_finalized_state.chain_count() == 0 && {
319 let db = self.db.clone();
320 tokio::task::spawn_blocking(move || {
321 if let Err(catch_up_error) = db.try_catch_up_with_primary() {
322 tracing::warn!(?catch_up_error, "failed to catch up to primary");
323 }
324 db.contains_hash(target_tip_hash)
325 })
326 .wait_for_panics()
327 .await
328 }
329 }
330
331 async fn next_block_height_and_prev_hash(&self) -> (block::Height, block::Hash) {
333 if let Some(tip) = self.non_finalized_state.best_tip() {
334 Some(tip)
335 } else {
336 let db = self.db.clone();
337 tokio::task::spawn_blocking(move || db.tip())
338 .wait_for_panics()
339 .await
340 }
341 .map(|(current_tip_height, current_tip_hash)| {
342 (
343 current_tip_height.next().expect("should be valid height"),
344 current_tip_hash,
345 )
346 })
347 .unwrap_or((Height::MIN, GENESIS_PREVIOUS_BLOCK_HASH))
348 }
349
350 async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
352 let db = self.db.clone();
353 tokio::task::spawn_blocking(move || {
354 let (height, hash) = db.tip()?;
355 db.block(height.into())
356 .map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
357 .map(ChainTipBlock::from)
358 })
359 .wait_for_panics()
360 .await
361 }
362
363 fn update_channels(&mut self, best_tip: impl Into<ChainTipBlock>) {
365 let _ = self
367 .non_finalized_state_sender
368 .send(self.non_finalized_state.clone());
369 self.chain_tip_sender
370 .set_best_non_finalized_tip(Some(best_tip.into()));
371 }
372}
373
374pub fn init_read_state_with_syncer(
383 config: zebra_state::Config,
384 network: &Network,
385 rpc_address: SocketAddr,
386 indexer_rpc_address: SocketAddr,
387) -> tokio::task::JoinHandle<
388 Result<
389 (
390 ReadStateService,
391 LatestChainTip,
392 ChainTipChange,
393 tokio::task::JoinHandle<()>,
394 ),
395 BoxError,
396 >,
397> {
398 let network = network.clone();
399 tokio::spawn(async move {
400 if config.ephemeral {
401 return Err("standalone read state service cannot be used with ephemeral state".into());
402 }
403
404 let (read_state, db, non_finalized_state_sender) =
405 spawn_init_read_only(config, &network).await?;
406 let (latest_chain_tip, chain_tip_change, sync_task) = TrustedChainSync::spawn(
407 rpc_address,
408 indexer_rpc_address,
409 db,
410 non_finalized_state_sender,
411 )
412 .await?;
413 Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
414 })
415}
416
417trait SyncerRpcMethods {
418 async fn get_best_block_height_and_hash(
419 &self,
420 ) -> Result<(block::Height, block::Hash), BoxError>;
421 async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError>;
422 fn block_range_ordered(
423 &self,
424 height_range: RangeInclusive<Height>,
425 ) -> FuturesOrdered<impl std::future::Future<Output = Result<Option<Arc<Block>>, BoxError>>>
426 {
427 let &Height(start_height) = height_range.start();
428 let &Height(end_height) = height_range.end();
429 let mut futs = FuturesOrdered::new();
430
431 for height in start_height..=end_height {
432 futs.push_back(self.get_block(height));
433 }
434
435 futs
436 }
437}
438
439impl SyncerRpcMethods for RpcRequestClient {
440 async fn get_best_block_height_and_hash(
441 &self,
442 ) -> Result<(block::Height, block::Hash), BoxError> {
443 self.json_result_from_call("getbestblockheightandhash", "[]")
444 .await
445 .map(|GetBlockHeightAndHash { height, hash }| (height, hash))
446 }
447
448 async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError> {
449 match self
450 .json_result_from_call("getblock", format!(r#"["{}", 0]"#, height))
451 .await
452 {
453 Ok(HexData(raw_block)) => {
454 let block = raw_block.zcash_deserialize_into::<Block>()?;
455 Ok(Some(Arc::new(block)))
456 }
457 Err(err)
458 if err
459 .downcast_ref::<jsonrpsee_types::ErrorCode>()
460 .is_some_and(|err| {
461 let code: i32 = server::error::LegacyCode::InvalidParameter.into();
462 err.code() == code
463 }) =>
464 {
465 Ok(None)
466 }
467 Err(err) => Err(err),
468 }
469 }
470}