zebra_rpc/
sync.rs
1use std::{net::SocketAddr, sync::Arc, time::Duration};
4
5use tokio::task::JoinHandle;
6use tonic::{Status, Streaming};
7use tower::BoxError;
8use zebra_chain::{block::Height, parameters::Network};
9use zebra_state::{
10 spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
11 LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock,
12 ValidateContextError, ZebraDb,
13};
14
15use zebra_chain::diagnostic::task::WaitForPanics;
16
17use crate::indexer::{indexer_client::IndexerClient, BlockAndHash, Empty};
18
19const POLL_DELAY: Duration = Duration::from_secs(5);
21
22#[derive(Debug)]
24pub struct TrustedChainSync {
25 pub indexer_rpc_client: IndexerClient<tonic::transport::Channel>,
27 db: ZebraDb,
29 non_finalized_state: NonFinalizedState,
31 chain_tip_sender: ChainTipSender,
33 non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
35}
36
37impl TrustedChainSync {
38 pub async fn spawn(
43 indexer_rpc_address: SocketAddr,
44 db: ZebraDb,
45 non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
46 ) -> Result<(LatestChainTip, ChainTipChange, JoinHandle<()>), BoxError> {
47 let non_finalized_state = NonFinalizedState::new(&db.network());
48 let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
49 ChainTipSender::new(None, &db.network());
50 let mut indexer_rpc_client =
51 IndexerClient::connect(format!("http://{indexer_rpc_address}")).await?;
52 let mut finalized_chain_tip_sender = chain_tip_sender.finalized_sender();
53
54 let mut syncer = Self {
55 indexer_rpc_client: indexer_rpc_client.clone(),
56 db: db.clone(),
57 non_finalized_state,
58 chain_tip_sender,
59 non_finalized_state_sender,
60 };
61
62 tokio::spawn(async move {
64 let mut chain_tip_change_stream = None;
65
66 loop {
67 let Some(ref mut chain_tip_change) = chain_tip_change_stream else {
68 chain_tip_change_stream = match indexer_rpc_client
69 .chain_tip_change(Empty {})
70 .await
71 .map(|a| a.into_inner())
72 {
73 Ok(listener) => Some(listener),
74 Err(err) => {
75 tracing::warn!(
76 ?err,
77 "failed to subscribe to non-finalized state changes"
78 );
79 tokio::time::sleep(POLL_DELAY).await;
80 None
81 }
82 };
83
84 continue;
85 };
86
87 let message = match chain_tip_change.message().await {
88 Ok(Some(block_hash_and_height)) => block_hash_and_height,
89 Ok(None) => {
90 tracing::warn!("chain_tip_change stream ended unexpectedly");
91 chain_tip_change_stream = None;
92 continue;
93 }
94 Err(err) => {
95 tracing::warn!(?err, "error receiving chain tip change");
96 chain_tip_change_stream = None;
97 continue;
98 }
99 };
100
101 let Some((hash, _height)) = message.try_into_hash_and_height() else {
102 tracing::warn!("failed to convert message into a block hash and height");
103 continue;
104 };
105
106 if db.spawn_try_catch_up_with_primary().await.is_err() {
108 continue;
109 }
110
111 let Some(tip_block) = db.block(hash.into()) else {
114 return;
115 };
116
117 finalized_chain_tip_sender.set_finalized_tip(Some(
118 SemanticallyVerifiedBlock::with_hash(tip_block, hash).into(),
119 ));
120 }
121 });
122
123 let sync_task = tokio::spawn(async move {
124 syncer.sync().await;
125 });
126
127 Ok((latest_chain_tip, chain_tip_change, sync_task))
128 }
129
130 #[tracing::instrument(skip_all)]
136 async fn sync(&mut self) {
137 let mut non_finalized_blocks_listener = None;
138 self.try_catch_up_with_primary().await;
139 if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
140 self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
141 }
142
143 loop {
144 let Some(ref mut non_finalized_state_change) = non_finalized_blocks_listener else {
145 non_finalized_blocks_listener = match self
146 .subscribe_to_non_finalized_state_change()
147 .await
148 {
149 Ok(listener) => Some(listener),
150 Err(err) => {
151 tracing::warn!(?err, "failed to subscribe to non-finalized state changes");
152 tokio::time::sleep(POLL_DELAY).await;
153 None
154 }
155 };
156
157 continue;
158 };
159
160 let message = match non_finalized_state_change.message().await {
161 Ok(Some(block_and_hash)) => block_and_hash,
162 Ok(None) => {
163 tracing::warn!("non-finalized state change stream ended unexpectedly");
164 non_finalized_blocks_listener = None;
165 continue;
166 }
167 Err(err) => {
168 tracing::warn!(?err, "error receiving non-finalized state change");
169 non_finalized_blocks_listener = None;
170 continue;
171 }
172 };
173
174 let Some((block, hash)) = message.decode() else {
175 tracing::warn!("received malformed non-finalized state change message");
176 non_finalized_blocks_listener = None;
177 continue;
178 };
179
180 if self.non_finalized_state.any_chain_contains(&hash) {
181 tracing::info!(?hash, "non-finalized state already contains block");
182 continue;
183 }
184
185 let block = SemanticallyVerifiedBlock::with_hash(Arc::new(block), hash);
186 match self.try_commit(block.clone()).await {
187 Ok(()) => {
188 while self
189 .non_finalized_state
190 .root_height()
191 .expect("just successfully inserted a non-finalized block above")
192 <= self.db.finalized_tip_height().unwrap_or(Height::MIN)
193 {
194 tracing::trace!("finalizing block past the reorg limit");
195 self.non_finalized_state.finalize();
196 }
197
198 self.update_channels();
199 }
200 Err(error) => {
201 tracing::warn!(
202 ?error,
203 ?hash,
204 "failed to commit block to non-finalized state"
205 );
206
207 non_finalized_blocks_listener = None;
210 }
211 };
212 }
213 }
214
215 async fn try_commit(
216 &mut self,
217 block: SemanticallyVerifiedBlock,
218 ) -> Result<(), ValidateContextError> {
219 self.try_catch_up_with_primary().await;
220
221 if self.db.finalized_tip_hash() == block.block.header.previous_block_hash {
222 self.non_finalized_state.commit_new_chain(block, &self.db)
223 } else {
224 self.non_finalized_state.commit_block(block, &self.db)
225 }
226 }
227
228 async fn subscribe_to_non_finalized_state_change(
231 &mut self,
232 ) -> Result<Streaming<BlockAndHash>, Status> {
233 self.indexer_rpc_client
234 .clone()
235 .non_finalized_state_change(Empty {})
236 .await
237 .map(|a| a.into_inner())
238 }
239
240 async fn try_catch_up_with_primary(&self) {
242 let _ = self.db.spawn_try_catch_up_with_primary().await;
243 }
244
245 async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
247 let db = self.db.clone();
248 tokio::task::spawn_blocking(move || {
249 let (height, hash) = db.tip()?;
250 db.block(height.into())
251 .map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
252 .map(ChainTipBlock::from)
253 })
254 .wait_for_panics()
255 .await
256 }
257
258 fn update_channels(&mut self) {
261 let _ = self
263 .non_finalized_state_sender
264 .send(self.non_finalized_state.clone());
265
266 let best_chain = self.non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
267
268 let tip_block = best_chain
269 .tip_block()
270 .expect(
271 "unexpected empty chain: must commit at least one block before updating channels",
272 )
273 .clone();
274
275 self.chain_tip_sender
276 .set_best_non_finalized_tip(Some(tip_block.into()));
277 }
278}
279
280pub fn init_read_state_with_syncer(
289 config: zebra_state::Config,
290 network: &Network,
291 indexer_rpc_address: SocketAddr,
292) -> tokio::task::JoinHandle<
293 Result<
294 (
295 ReadStateService,
296 LatestChainTip,
297 ChainTipChange,
298 tokio::task::JoinHandle<()>,
299 ),
300 BoxError,
301 >,
302> {
303 let network = network.clone();
304 tokio::spawn(async move {
305 if config.ephemeral {
306 return Err("standalone read state service cannot be used with ephemeral state".into());
307 }
308
309 let (read_state, db, non_finalized_state_sender) =
310 spawn_init_read_only(config, &network).await?;
311 let (latest_chain_tip, chain_tip_change, sync_task) =
312 TrustedChainSync::spawn(indexer_rpc_address, db, non_finalized_state_sender).await?;
313 Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
314 })
315}