1use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{
7 mpsc::{UnboundedReceiver, UnboundedSender},
8 oneshot, watch,
9};
10
11use tracing::Span;
12use zebra_chain::{
13 block::{self, Height},
14 transparent::EXTRA_ZEBRA_COINBASE_DATA,
15};
16
17use crate::{
18 constants::MAX_BLOCK_REORG_HEIGHT,
19 service::{
20 check,
21 finalized_state::{FinalizedState, ZebraDb},
22 non_finalized_state::NonFinalizedState,
23 queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24 BoxError, ChainTipBlock, ChainTipSender, CloneError,
25 },
26 CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
27};
28
29#[allow(unused_imports)]
31use crate::service::{
32 chain_tip::{ChainTipChange, LatestChainTip},
33 non_finalized_state::Chain,
34};
35
36const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41#[tracing::instrument(
44 level = "debug",
45 skip(finalized_state, non_finalized_state, prepared),
46 fields(
47 height = ?prepared.height,
48 hash = %prepared.hash,
49 chains = non_finalized_state.chain_count()
50 )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53 finalized_state: &ZebraDb,
54 non_finalized_state: &mut NonFinalizedState,
55 prepared: SemanticallyVerifiedBlock,
56) -> Result<(), CommitSemanticallyVerifiedError> {
57 check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58 let parent_hash = prepared.block.header.previous_block_hash;
59
60 if finalized_state.finalized_tip_hash() == parent_hash {
61 non_finalized_state.commit_new_chain(prepared, finalized_state)?;
62 } else {
63 non_finalized_state.commit_block(prepared, finalized_state)?;
64 }
65
66 Ok(())
67}
68
69#[instrument(
81 level = "debug",
82 skip(
83 non_finalized_state,
84 chain_tip_sender,
85 non_finalized_state_sender,
86 last_zebra_mined_log_height
87 ),
88 fields(chains = non_finalized_state.chain_count())
89)]
90fn update_latest_chain_channels(
91 non_finalized_state: &NonFinalizedState,
92 chain_tip_sender: &mut ChainTipSender,
93 non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
94 last_zebra_mined_log_height: &mut Option<Height>,
95) -> block::Height {
96 let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
97
98 let tip_block = best_chain
99 .tip_block()
100 .expect("unexpected empty chain: must commit at least one block before updating channels")
101 .clone();
102 let tip_block = ChainTipBlock::from(tip_block);
103
104 log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
105
106 let tip_block_height = tip_block.height;
107
108 let _ = non_finalized_state_sender.send(non_finalized_state.clone());
110
111 chain_tip_sender.set_best_non_finalized_tip(tip_block);
112
113 tip_block_height
114}
115
116struct WriteBlockWorkerTask {
119 finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
120 non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
121 finalized_state: FinalizedState,
122 non_finalized_state: NonFinalizedState,
123 invalid_block_reset_sender: UnboundedSender<block::Hash>,
124 chain_tip_sender: ChainTipSender,
125 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
126}
127
128pub enum NonFinalizedWriteMessage {
130 Commit(QueuedSemanticallyVerified),
133 Invalidate {
136 hash: block::Hash,
137 rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
138 },
139 Reconsider {
142 hash: block::Hash,
143 rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, BoxError>>,
144 },
145}
146
147impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
148 fn from(block: QueuedSemanticallyVerified) -> Self {
149 NonFinalizedWriteMessage::Commit(block)
150 }
151}
152
153#[derive(Clone, Debug)]
157pub(super) struct BlockWriteSender {
158 pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
161
162 pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
168}
169
170impl BlockWriteSender {
171 #[instrument(
173 level = "debug",
174 skip_all,
175 fields(
176 network = %non_finalized_state.network
177 )
178 )]
179 pub fn spawn(
180 finalized_state: FinalizedState,
181 non_finalized_state: NonFinalizedState,
182 chain_tip_sender: ChainTipSender,
183 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
184 ) -> (
185 Self,
186 tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
187 Option<Arc<std::thread::JoinHandle<()>>>,
188 ) {
189 let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
192 tokio::sync::mpsc::unbounded_channel();
193 let (finalized_block_write_sender, finalized_block_write_receiver) =
194 tokio::sync::mpsc::unbounded_channel();
195 let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
196 tokio::sync::mpsc::unbounded_channel();
197
198 let span = Span::current();
199 let task = std::thread::spawn(move || {
200 span.in_scope(|| {
201 WriteBlockWorkerTask {
202 finalized_block_write_receiver,
203 non_finalized_block_write_receiver,
204 finalized_state,
205 non_finalized_state,
206 invalid_block_reset_sender,
207 chain_tip_sender,
208 non_finalized_state_sender,
209 }
210 .run()
211 })
212 });
213
214 (
215 Self {
216 non_finalized: Some(non_finalized_block_write_sender),
217 finalized: Some(finalized_block_write_sender),
218 },
219 invalid_block_write_reset_receiver,
220 Some(Arc::new(task)),
221 )
222 }
223}
224
225impl WriteBlockWorkerTask {
226 #[instrument(
230 level = "debug",
231 skip(self),
232 fields(
233 network = %self.non_finalized_state.network
234 )
235 )]
236 pub fn run(mut self) {
237 let Self {
238 finalized_block_write_receiver,
239 non_finalized_block_write_receiver,
240 finalized_state,
241 non_finalized_state,
242 invalid_block_reset_sender,
243 chain_tip_sender,
244 non_finalized_state_sender,
245 } = &mut self;
246
247 let mut last_zebra_mined_log_height = None;
248 let mut prev_finalized_note_commitment_trees = None;
249
250 while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
253 if invalid_block_reset_sender.is_closed() {
256 info!("StateService closed the block reset channel. Is Zebra shutting down?");
257 return;
258 }
259
260 let next_valid_height = finalized_state
267 .db
268 .finalized_tip_height()
269 .map(|height| (height + 1).expect("committed heights are valid"))
270 .unwrap_or(Height(0));
271
272 if ordered_block.0.height != next_valid_height {
273 debug!(
274 ?next_valid_height,
275 invalid_height = ?ordered_block.0.height,
276 invalid_hash = ?ordered_block.0.hash,
277 "got a block that was the wrong height. \
278 Assuming a parent block failed, and dropping this block",
279 );
280
281 std::mem::drop(ordered_block);
283 continue;
284 }
285
286 match finalized_state
288 .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
289 {
290 Ok((finalized, note_commitment_trees)) => {
291 let tip_block = ChainTipBlock::from(finalized);
292 prev_finalized_note_commitment_trees = Some(note_commitment_trees);
293
294 log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
295
296 chain_tip_sender.set_finalized_tip(tip_block);
297 }
298 Err(error) => {
299 let finalized_tip = finalized_state.db.tip();
300
301 info!(
305 ?error,
306 last_valid_height = ?finalized_tip.map(|tip| tip.0),
307 last_valid_hash = ?finalized_tip.map(|tip| tip.1),
308 "committing a block to the finalized state failed, resetting state queue",
309 );
310
311 let send_result =
312 invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
313
314 if send_result.is_err() {
315 info!(
316 "StateService closed the block reset channel. Is Zebra shutting down?"
317 );
318 return;
319 }
320 }
321 }
322 }
323
324 if invalid_block_reset_sender.is_closed() {
327 info!("StateService closed the block reset channel. Is Zebra shutting down?");
328 return;
329 }
330
331 let mut parent_error_map: IndexMap<block::Hash, CloneError> = IndexMap::new();
333
334 while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
335 let queued_child_and_rsp_tx = match msg {
336 NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
337 NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
338 tracing::info!(?hash, "invalidating a block in the non-finalized state");
339 let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
340 None
341 }
342 NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
343 tracing::info!(?hash, "reconsidering a block in the non-finalized state");
344 let _ = rsp_tx.send(
345 non_finalized_state
346 .reconsider_block(hash, &finalized_state.db)
347 .map_err(BoxError::from),
348 );
349 None
350 }
351 };
352
353 let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
354 update_latest_chain_channels(
355 non_finalized_state,
356 chain_tip_sender,
357 non_finalized_state_sender,
358 &mut last_zebra_mined_log_height,
359 );
360 continue;
361 };
362
363 let child_hash = queued_child.hash;
364 let parent_hash = queued_child.block.header.previous_block_hash;
365 let parent_error = parent_error_map.get(&parent_hash);
366
367 let result;
368
369 if let Some(parent_error) = parent_error {
375 tracing::trace!(
376 ?child_hash,
377 ?parent_error,
378 "rejecting queued child due to parent error"
379 );
380 result = Err(parent_error.clone());
381 } else {
382 tracing::trace!(?child_hash, "validating queued child");
383 result = validate_and_commit_non_finalized(
384 &finalized_state.db,
385 non_finalized_state,
386 queued_child,
387 )
388 .map_err(CloneError::from);
389 }
390
391 if let Err(ref error) = result {
396 let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
398
399 parent_error_map.insert(child_hash, error.clone());
401
402 if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
404 parent_error_map.shift_remove_index(0);
406 }
407
408 continue;
410 }
411
412 let tip_block_height = update_latest_chain_channels(
419 non_finalized_state,
420 chain_tip_sender,
421 non_finalized_state_sender,
422 &mut last_zebra_mined_log_height,
423 );
424
425 let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
427
428 while non_finalized_state
429 .best_chain_len()
430 .expect("just successfully inserted a non-finalized block above")
431 > MAX_BLOCK_REORG_HEIGHT
432 {
433 tracing::trace!("finalizing block past the reorg limit");
434 let contextually_verified_with_trees = non_finalized_state.finalize();
435 prev_finalized_note_commitment_trees = finalized_state
436 .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
437 .expect(
438 "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
439 ).1.into();
440 }
441
442 metrics::counter!("state.full_verifier.committed.block.count").increment(1);
446 metrics::counter!("zcash.chain.verified.block.total").increment(1);
447
448 metrics::gauge!("state.full_verifier.committed.block.height")
449 .set(tip_block_height.0 as f64);
450
451 metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
455
456 tracing::trace!("finished processing queued block");
457 }
458
459 finalized_state.db.shutdown(true);
462 std::mem::drop(self.finalized_state);
463 }
464}
465
466fn log_if_mined_by_zebra(
471 tip_block: &ChainTipBlock,
472 last_zebra_mined_log_height: &mut Option<Height>,
473) {
474 const LOG_RATE_LIMIT: u32 = 1000;
476
477 let height = tip_block.height.0;
478
479 if let Some(last_height) = last_zebra_mined_log_height {
480 if height < last_height.0 + LOG_RATE_LIMIT {
481 return;
483 }
484 };
485
486 let coinbase_data = tip_block.transactions[0].inputs()[0]
488 .extra_coinbase_data()
489 .expect("valid blocks must start with a coinbase input")
490 .clone();
491
492 if coinbase_data
493 .as_ref()
494 .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
495 {
496 let text = String::from_utf8_lossy(coinbase_data.as_ref());
497
498 *last_zebra_mined_log_height = Some(Height(height));
499
500 if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
502 info!(
503 %text,
504 %height,
505 hash = %tip_block.hash,
506 "looks like this block was mined by Zebra!"
507 );
508 } else {
509 let text = text.replace(
515 |c: char| {
516 !EXTRA_ZEBRA_COINBASE_DATA
517 .to_ascii_lowercase()
518 .contains(c.to_ascii_lowercase())
519 },
520 "?",
521 );
522 let data = hex::encode(coinbase_data.as_ref());
523
524 info!(
525 %text,
526 %data,
527 %height,
528 hash = %tip_block.hash,
529 "looks like this block was mined by Zebra!"
530 );
531 }
532 }
533}