1use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{
7 mpsc::{UnboundedReceiver, UnboundedSender},
8 oneshot, watch,
9};
10
11use tracing::Span;
12use zebra_chain::{
13 block::{self, Height},
14 transparent::EXTRA_ZEBRA_COINBASE_DATA,
15};
16
17use crate::{
18 constants::MAX_BLOCK_REORG_HEIGHT,
19 service::{
20 check,
21 finalized_state::{FinalizedState, ZebraDb},
22 non_finalized_state::NonFinalizedState,
23 queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24 BoxError, ChainTipBlock, ChainTipSender,
25 },
26 CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
27};
28
29#[allow(unused_imports)]
31use crate::service::{
32 chain_tip::{ChainTipChange, LatestChainTip},
33 non_finalized_state::Chain,
34};
35
36const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41#[tracing::instrument(
44 level = "debug",
45 skip(finalized_state, non_finalized_state, prepared),
46 fields(
47 height = ?prepared.height,
48 hash = %prepared.hash,
49 chains = non_finalized_state.chain_count()
50 )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53 finalized_state: &ZebraDb,
54 non_finalized_state: &mut NonFinalizedState,
55 prepared: SemanticallyVerifiedBlock,
56) -> Result<(), CommitSemanticallyVerifiedError> {
57 check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58 let parent_hash = prepared.block.header.previous_block_hash;
59
60 if !non_finalized_state.any_chain_contains(&parent_hash)
61 && finalized_state.finalized_tip_hash() == parent_hash
62 {
63 non_finalized_state.commit_new_chain(prepared, finalized_state)?;
64 } else {
65 non_finalized_state.commit_block(prepared, finalized_state)?;
66 }
67
68 Ok(())
69}
70
71#[instrument(
83 level = "debug",
84 skip(
85 non_finalized_state,
86 chain_tip_sender,
87 non_finalized_state_sender,
88 last_zebra_mined_log_height
89 ),
90 fields(chains = non_finalized_state.chain_count())
91)]
92fn update_latest_chain_channels(
93 non_finalized_state: &NonFinalizedState,
94 chain_tip_sender: &mut ChainTipSender,
95 non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
96 last_zebra_mined_log_height: &mut Option<Height>,
97) -> block::Height {
98 let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
99
100 let tip_block = best_chain
101 .tip_block()
102 .expect("unexpected empty chain: must commit at least one block before updating channels")
103 .clone();
104 let tip_block = ChainTipBlock::from(tip_block);
105
106 log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
107
108 let tip_block_height = tip_block.height;
109
110 let _ = non_finalized_state_sender.send(non_finalized_state.clone());
112
113 chain_tip_sender.set_best_non_finalized_tip(tip_block);
114
115 tip_block_height
116}
117
118struct WriteBlockWorkerTask {
121 finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
122 non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
123 finalized_state: FinalizedState,
124 non_finalized_state: NonFinalizedState,
125 invalid_block_reset_sender: UnboundedSender<block::Hash>,
126 chain_tip_sender: ChainTipSender,
127 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
128}
129
130pub enum NonFinalizedWriteMessage {
132 Commit(QueuedSemanticallyVerified),
135 Invalidate {
138 hash: block::Hash,
139 rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
140 },
141 Reconsider {
144 hash: block::Hash,
145 rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, BoxError>>,
146 },
147}
148
149impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
150 fn from(block: QueuedSemanticallyVerified) -> Self {
151 NonFinalizedWriteMessage::Commit(block)
152 }
153}
154
155#[derive(Clone, Debug)]
159pub(super) struct BlockWriteSender {
160 pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
163
164 pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
170}
171
172impl BlockWriteSender {
173 #[instrument(
175 level = "debug",
176 skip_all,
177 fields(
178 network = %non_finalized_state.network
179 )
180 )]
181 pub fn spawn(
182 finalized_state: FinalizedState,
183 non_finalized_state: NonFinalizedState,
184 chain_tip_sender: ChainTipSender,
185 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
186 should_use_finalized_block_write_sender: bool,
187 ) -> (
188 Self,
189 tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
190 Option<Arc<std::thread::JoinHandle<()>>>,
191 ) {
192 let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
195 tokio::sync::mpsc::unbounded_channel();
196 let (finalized_block_write_sender, finalized_block_write_receiver) =
197 tokio::sync::mpsc::unbounded_channel();
198 let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
199 tokio::sync::mpsc::unbounded_channel();
200
201 let span = Span::current();
202 let task = std::thread::spawn(move || {
203 span.in_scope(|| {
204 WriteBlockWorkerTask {
205 finalized_block_write_receiver,
206 non_finalized_block_write_receiver,
207 finalized_state,
208 non_finalized_state,
209 invalid_block_reset_sender,
210 chain_tip_sender,
211 non_finalized_state_sender,
212 }
213 .run()
214 })
215 });
216
217 (
218 Self {
219 non_finalized: Some(non_finalized_block_write_sender),
220 finalized: Some(finalized_block_write_sender)
221 .filter(|_| should_use_finalized_block_write_sender),
222 },
223 invalid_block_write_reset_receiver,
224 Some(Arc::new(task)),
225 )
226 }
227}
228
229impl WriteBlockWorkerTask {
230 #[instrument(
234 level = "debug",
235 skip(self),
236 fields(
237 network = %self.non_finalized_state.network
238 )
239 )]
240 pub fn run(mut self) {
241 let Self {
242 finalized_block_write_receiver,
243 non_finalized_block_write_receiver,
244 finalized_state,
245 non_finalized_state,
246 invalid_block_reset_sender,
247 chain_tip_sender,
248 non_finalized_state_sender,
249 } = &mut self;
250
251 let mut last_zebra_mined_log_height = None;
252 let mut prev_finalized_note_commitment_trees = None;
253
254 while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
257 if invalid_block_reset_sender.is_closed() {
260 info!("StateService closed the block reset channel. Is Zebra shutting down?");
261 return;
262 }
263
264 let next_valid_height = finalized_state
271 .db
272 .finalized_tip_height()
273 .map(|height| (height + 1).expect("committed heights are valid"))
274 .unwrap_or(Height(0));
275
276 if ordered_block.0.height != next_valid_height {
277 debug!(
278 ?next_valid_height,
279 invalid_height = ?ordered_block.0.height,
280 invalid_hash = ?ordered_block.0.hash,
281 "got a block that was the wrong height. \
282 Assuming a parent block failed, and dropping this block",
283 );
284
285 std::mem::drop(ordered_block);
287 continue;
288 }
289
290 match finalized_state
292 .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
293 {
294 Ok((finalized, note_commitment_trees)) => {
295 let tip_block = ChainTipBlock::from(finalized);
296 prev_finalized_note_commitment_trees = Some(note_commitment_trees);
297
298 log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
299
300 chain_tip_sender.set_finalized_tip(tip_block);
301 }
302 Err(error) => {
303 let finalized_tip = finalized_state.db.tip();
304
305 info!(
309 ?error,
310 last_valid_height = ?finalized_tip.map(|tip| tip.0),
311 last_valid_hash = ?finalized_tip.map(|tip| tip.1),
312 "committing a block to the finalized state failed, resetting state queue",
313 );
314
315 let send_result =
316 invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
317
318 if send_result.is_err() {
319 info!(
320 "StateService closed the block reset channel. Is Zebra shutting down?"
321 );
322 return;
323 }
324 }
325 }
326 }
327
328 if invalid_block_reset_sender.is_closed() {
331 info!("StateService closed the block reset channel. Is Zebra shutting down?");
332 return;
333 }
334
335 let mut parent_error_map: IndexMap<block::Hash, CommitSemanticallyVerifiedError> =
337 IndexMap::new();
338
339 while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
340 let queued_child_and_rsp_tx = match msg {
341 NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
342 NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
343 tracing::info!(?hash, "invalidating a block in the non-finalized state");
344 let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
345 None
346 }
347 NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
348 tracing::info!(?hash, "reconsidering a block in the non-finalized state");
349 let _ = rsp_tx.send(
350 non_finalized_state
351 .reconsider_block(hash, &finalized_state.db)
352 .map_err(BoxError::from),
353 );
354 None
355 }
356 };
357
358 let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
359 update_latest_chain_channels(
360 non_finalized_state,
361 chain_tip_sender,
362 non_finalized_state_sender,
363 &mut last_zebra_mined_log_height,
364 );
365 continue;
366 };
367
368 let child_hash = queued_child.hash;
369 let parent_hash = queued_child.block.header.previous_block_hash;
370 let parent_error = parent_error_map.get(&parent_hash);
371
372 let result;
373
374 if let Some(parent_error) = parent_error {
380 result = Err(parent_error.clone());
381 } else {
382 tracing::trace!(?child_hash, "validating queued child");
383 result = validate_and_commit_non_finalized(
384 &finalized_state.db,
385 non_finalized_state,
386 queued_child,
387 )
388 }
389
390 if let Err(ref error) = result {
395 let _ = rsp_tx.send(result.clone().map(|()| child_hash));
397
398 parent_error_map.insert(child_hash, error.clone());
400
401 if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
403 parent_error_map.shift_remove_index(0);
405 }
406
407 continue;
409 }
410
411 let tip_block_height = update_latest_chain_channels(
418 non_finalized_state,
419 chain_tip_sender,
420 non_finalized_state_sender,
421 &mut last_zebra_mined_log_height,
422 );
423
424 let _ = rsp_tx.send(result.clone().map(|()| child_hash));
426
427 while non_finalized_state
428 .best_chain_len()
429 .expect("just successfully inserted a non-finalized block above")
430 > MAX_BLOCK_REORG_HEIGHT
431 {
432 tracing::trace!("finalizing block past the reorg limit");
433 let contextually_verified_with_trees = non_finalized_state.finalize();
434 prev_finalized_note_commitment_trees = finalized_state
435 .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
436 .expect(
437 "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
438 ).1.into();
439 }
440
441 metrics::counter!("state.full_verifier.committed.block.count").increment(1);
445 metrics::counter!("zcash.chain.verified.block.total").increment(1);
446
447 metrics::gauge!("state.full_verifier.committed.block.height")
448 .set(tip_block_height.0 as f64);
449
450 metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
454
455 tracing::trace!("finished processing queued block");
456 }
457
458 finalized_state.db.shutdown(true);
461 std::mem::drop(self.finalized_state);
462 }
463}
464
465fn log_if_mined_by_zebra(
470 tip_block: &ChainTipBlock,
471 last_zebra_mined_log_height: &mut Option<Height>,
472) {
473 const LOG_RATE_LIMIT: u32 = 1000;
475
476 let height = tip_block.height.0;
477
478 if let Some(last_height) = last_zebra_mined_log_height {
479 if height < last_height.0 + LOG_RATE_LIMIT {
480 return;
482 }
483 };
484
485 let coinbase_data = tip_block.transactions[0].inputs()[0]
487 .extra_coinbase_data()
488 .expect("valid blocks must start with a coinbase input")
489 .clone();
490
491 if coinbase_data
492 .as_ref()
493 .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
494 {
495 let text = String::from_utf8_lossy(coinbase_data.as_ref());
496
497 *last_zebra_mined_log_height = Some(Height(height));
498
499 if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
501 info!(
502 %text,
503 %height,
504 hash = %tip_block.hash,
505 "looks like this block was mined by Zebra!"
506 );
507 } else {
508 let text = text.replace(
514 |c: char| {
515 !EXTRA_ZEBRA_COINBASE_DATA
516 .to_ascii_lowercase()
517 .contains(c.to_ascii_lowercase())
518 },
519 "?",
520 );
521 let data = hex::encode(coinbase_data.as_ref());
522
523 info!(
524 %text,
525 %data,
526 %height,
527 hash = %tip_block.hash,
528 "looks like this block was mined by Zebra!"
529 );
530 }
531 }
532}