1use indexmap::IndexMap;
4use tokio::sync::{
5 mpsc::{UnboundedReceiver, UnboundedSender},
6 watch,
7};
8
9use zebra_chain::{
10 block::{self, Height},
11 transparent::EXTRA_ZEBRA_COINBASE_DATA,
12};
13
14use crate::{
15 constants::MAX_BLOCK_REORG_HEIGHT,
16 service::{
17 check,
18 finalized_state::{FinalizedState, ZebraDb},
19 non_finalized_state::NonFinalizedState,
20 queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
21 BoxError, ChainTipBlock, ChainTipSender, CloneError,
22 },
23 CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
24};
25
26#[allow(unused_imports)]
28use crate::service::{
29 chain_tip::{ChainTipChange, LatestChainTip},
30 non_finalized_state::Chain,
31};
32
33const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
37
38#[tracing::instrument(
41 level = "debug",
42 skip(finalized_state, non_finalized_state, prepared),
43 fields(
44 height = ?prepared.height,
45 hash = %prepared.hash,
46 chains = non_finalized_state.chain_count()
47 )
48)]
49pub(crate) fn validate_and_commit_non_finalized(
50 finalized_state: &ZebraDb,
51 non_finalized_state: &mut NonFinalizedState,
52 prepared: SemanticallyVerifiedBlock,
53) -> Result<(), CommitSemanticallyVerifiedError> {
54 check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
55 let parent_hash = prepared.block.header.previous_block_hash;
56
57 if finalized_state.finalized_tip_hash() == parent_hash {
58 non_finalized_state.commit_new_chain(prepared, finalized_state)?;
59 } else {
60 non_finalized_state.commit_block(prepared, finalized_state)?;
61 }
62
63 Ok(())
64}
65
66#[instrument(
78 level = "debug",
79 skip(
80 non_finalized_state,
81 chain_tip_sender,
82 non_finalized_state_sender,
83 last_zebra_mined_log_height
84 ),
85 fields(chains = non_finalized_state.chain_count())
86)]
87fn update_latest_chain_channels(
88 non_finalized_state: &NonFinalizedState,
89 chain_tip_sender: &mut ChainTipSender,
90 non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
91 last_zebra_mined_log_height: &mut Option<Height>,
92) -> block::Height {
93 let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
94
95 let tip_block = best_chain
96 .tip_block()
97 .expect("unexpected empty chain: must commit at least one block before updating channels")
98 .clone();
99 let tip_block = ChainTipBlock::from(tip_block);
100
101 log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
102
103 let tip_block_height = tip_block.height;
104
105 let _ = non_finalized_state_sender.send(non_finalized_state.clone());
107
108 chain_tip_sender.set_best_non_finalized_tip(tip_block);
109
110 tip_block_height
111}
112
113#[allow(clippy::too_many_arguments)]
118#[instrument(
119 level = "debug",
120 skip(
121 finalized_block_write_receiver,
122 non_finalized_block_write_receiver,
123 finalized_state,
124 non_finalized_state,
125 invalid_block_reset_sender,
126 chain_tip_sender,
127 non_finalized_state_sender,
128 ),
129 fields(
130 network = %non_finalized_state.network
131 )
132)]
133pub fn write_blocks_from_channels(
134 mut finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
135 mut non_finalized_block_write_receiver: UnboundedReceiver<QueuedSemanticallyVerified>,
136 mut finalized_state: FinalizedState,
137 mut non_finalized_state: NonFinalizedState,
138 invalid_block_reset_sender: UnboundedSender<block::Hash>,
139 mut chain_tip_sender: ChainTipSender,
140 non_finalized_state_sender: watch::Sender<NonFinalizedState>,
141) {
142 let mut last_zebra_mined_log_height = None;
143 let mut prev_finalized_note_commitment_trees = None;
144
145 while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
148 if invalid_block_reset_sender.is_closed() {
151 info!("StateService closed the block reset channel. Is Zebra shutting down?");
152 return;
153 }
154
155 let next_valid_height = finalized_state
162 .db
163 .finalized_tip_height()
164 .map(|height| (height + 1).expect("committed heights are valid"))
165 .unwrap_or(Height(0));
166
167 if ordered_block.0.height != next_valid_height {
168 debug!(
169 ?next_valid_height,
170 invalid_height = ?ordered_block.0.height,
171 invalid_hash = ?ordered_block.0.hash,
172 "got a block that was the wrong height. \
173 Assuming a parent block failed, and dropping this block",
174 );
175
176 std::mem::drop(ordered_block);
178 continue;
179 }
180
181 match finalized_state
183 .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
184 {
185 Ok((finalized, note_commitment_trees)) => {
186 let tip_block = ChainTipBlock::from(finalized);
187 prev_finalized_note_commitment_trees = Some(note_commitment_trees);
188
189 log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
190
191 chain_tip_sender.set_finalized_tip(tip_block);
192 }
193 Err(error) => {
194 let finalized_tip = finalized_state.db.tip();
195
196 info!(
200 ?error,
201 last_valid_height = ?finalized_tip.map(|tip| tip.0),
202 last_valid_hash = ?finalized_tip.map(|tip| tip.1),
203 "committing a block to the finalized state failed, resetting state queue",
204 );
205
206 let send_result =
207 invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
208
209 if send_result.is_err() {
210 info!("StateService closed the block reset channel. Is Zebra shutting down?");
211 return;
212 }
213 }
214 }
215 }
216
217 if invalid_block_reset_sender.is_closed() {
220 info!("StateService closed the block reset channel. Is Zebra shutting down?");
221 return;
222 }
223
224 let mut parent_error_map: IndexMap<block::Hash, CloneError> = IndexMap::new();
226
227 while let Some((queued_child, rsp_tx)) = non_finalized_block_write_receiver.blocking_recv() {
228 let child_hash = queued_child.hash;
229 let parent_hash = queued_child.block.header.previous_block_hash;
230 let parent_error = parent_error_map.get(&parent_hash);
231
232 let result;
233
234 if let Some(parent_error) = parent_error {
240 tracing::trace!(
241 ?child_hash,
242 ?parent_error,
243 "rejecting queued child due to parent error"
244 );
245 result = Err(parent_error.clone());
246 } else {
247 tracing::trace!(?child_hash, "validating queued child");
248 result = validate_and_commit_non_finalized(
249 &finalized_state.db,
250 &mut non_finalized_state,
251 queued_child,
252 )
253 .map_err(CloneError::from);
254 }
255
256 if let Err(ref error) = result {
261 let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
263
264 parent_error_map.insert(child_hash, error.clone());
266
267 if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
269 parent_error_map.shift_remove_index(0);
271 }
272
273 continue;
275 }
276
277 let tip_block_height = update_latest_chain_channels(
284 &non_finalized_state,
285 &mut chain_tip_sender,
286 &non_finalized_state_sender,
287 &mut last_zebra_mined_log_height,
288 );
289
290 let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
292
293 while non_finalized_state
294 .best_chain_len()
295 .expect("just successfully inserted a non-finalized block above")
296 > MAX_BLOCK_REORG_HEIGHT
297 {
298 tracing::trace!("finalizing block past the reorg limit");
299 let contextually_verified_with_trees = non_finalized_state.finalize();
300 prev_finalized_note_commitment_trees = finalized_state
301 .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
302 .expect(
303 "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
304 ).1.into();
305 }
306
307 metrics::counter!("state.full_verifier.committed.block.count").increment(1);
311 metrics::counter!("zcash.chain.verified.block.total").increment(1);
312
313 metrics::gauge!("state.full_verifier.committed.block.height")
314 .set(tip_block_height.0 as f64);
315
316 metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
320
321 tracing::trace!("finished processing queued block");
322 }
323
324 finalized_state.db.shutdown(true);
327 std::mem::drop(finalized_state);
328}
329
330fn log_if_mined_by_zebra(
335 tip_block: &ChainTipBlock,
336 last_zebra_mined_log_height: &mut Option<Height>,
337) {
338 const LOG_RATE_LIMIT: u32 = 1000;
340
341 let height = tip_block.height.0;
342
343 if let Some(last_height) = last_zebra_mined_log_height {
344 if height < last_height.0 + LOG_RATE_LIMIT {
345 return;
347 }
348 };
349
350 let coinbase_data = tip_block.transactions[0].inputs()[0]
352 .extra_coinbase_data()
353 .expect("valid blocks must start with a coinbase input")
354 .clone();
355
356 if coinbase_data
357 .as_ref()
358 .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
359 {
360 let text = String::from_utf8_lossy(coinbase_data.as_ref());
361
362 *last_zebra_mined_log_height = Some(Height(height));
363
364 if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
366 info!(
367 %text,
368 %height,
369 hash = %tip_block.hash,
370 "looks like this block was mined by Zebra!"
371 );
372 } else {
373 let text = text.replace(
379 |c: char| {
380 !EXTRA_ZEBRA_COINBASE_DATA
381 .to_ascii_lowercase()
382 .contains(c.to_ascii_lowercase())
383 },
384 "?",
385 );
386 let data = hex::encode(coinbase_data.as_ref());
387
388 info!(
389 %text,
390 %data,
391 %height,
392 hash = %tip_block.hash,
393 "looks like this block was mined by Zebra!"
394 );
395 }
396 }
397}