zebra_state/service/
write.rs

1//! Writing blocks to the finalized and non-finalized states.
2
3use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{
7    mpsc::{UnboundedReceiver, UnboundedSender},
8    oneshot, watch,
9};
10
11use tracing::Span;
12use zebra_chain::{
13    block::{self, Height},
14    transparent::EXTRA_ZEBRA_COINBASE_DATA,
15};
16
17use crate::{
18    constants::MAX_BLOCK_REORG_HEIGHT,
19    service::{
20        check,
21        finalized_state::{FinalizedState, ZebraDb},
22        non_finalized_state::NonFinalizedState,
23        queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24        BoxError, ChainTipBlock, ChainTipSender,
25    },
26    CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
27};
28
29// These types are used in doc links
30#[allow(unused_imports)]
31use crate::service::{
32    chain_tip::{ChainTipChange, LatestChainTip},
33    non_finalized_state::Chain,
34};
35
36/// The maximum size of the parent error map.
37///
38/// We allow enough space for multiple concurrent chain forks with errors.
39const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41/// Run contextual validation on the prepared block and add it to the
42/// non-finalized state if it is contextually valid.
43#[tracing::instrument(
44    level = "debug",
45    skip(finalized_state, non_finalized_state, prepared),
46    fields(
47        height = ?prepared.height,
48        hash = %prepared.hash,
49        chains = non_finalized_state.chain_count()
50    )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53    finalized_state: &ZebraDb,
54    non_finalized_state: &mut NonFinalizedState,
55    prepared: SemanticallyVerifiedBlock,
56) -> Result<(), CommitSemanticallyVerifiedError> {
57    check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58    let parent_hash = prepared.block.header.previous_block_hash;
59
60    if !non_finalized_state.any_chain_contains(&parent_hash)
61        && finalized_state.finalized_tip_hash() == parent_hash
62    {
63        non_finalized_state.commit_new_chain(prepared, finalized_state)?;
64    } else {
65        non_finalized_state.commit_block(prepared, finalized_state)?;
66    }
67
68    Ok(())
69}
70
71/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender`
72/// channels with the latest non-finalized [`ChainTipBlock`] and
73/// [`Chain`].
74///
75/// `last_zebra_mined_log_height` is used to rate-limit logging.
76///
77/// Returns the latest non-finalized chain tip height.
78///
79/// # Panics
80///
81/// If the `non_finalized_state` is empty.
82#[instrument(
83    level = "debug",
84    skip(
85        non_finalized_state,
86        chain_tip_sender,
87        non_finalized_state_sender,
88        last_zebra_mined_log_height
89    ),
90    fields(chains = non_finalized_state.chain_count())
91)]
92fn update_latest_chain_channels(
93    non_finalized_state: &NonFinalizedState,
94    chain_tip_sender: &mut ChainTipSender,
95    non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
96    last_zebra_mined_log_height: &mut Option<Height>,
97) -> block::Height {
98    let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
99
100    let tip_block = best_chain
101        .tip_block()
102        .expect("unexpected empty chain: must commit at least one block before updating channels")
103        .clone();
104    let tip_block = ChainTipBlock::from(tip_block);
105
106    log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
107
108    let tip_block_height = tip_block.height;
109
110    // If the final receiver was just dropped, ignore the error.
111    let _ = non_finalized_state_sender.send(non_finalized_state.clone());
112
113    chain_tip_sender.set_best_non_finalized_tip(tip_block);
114
115    tip_block_height
116}
117
118/// A worker task that reads, validates, and writes blocks to the
119/// `finalized_state` or `non_finalized_state`.
120struct WriteBlockWorkerTask {
121    finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
122    non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
123    finalized_state: FinalizedState,
124    non_finalized_state: NonFinalizedState,
125    invalid_block_reset_sender: UnboundedSender<block::Hash>,
126    chain_tip_sender: ChainTipSender,
127    non_finalized_state_sender: watch::Sender<NonFinalizedState>,
128}
129
130/// The message type for the non-finalized block write task channel.
131pub enum NonFinalizedWriteMessage {
132    /// A newly downloaded and semantically verified block prepared for
133    /// contextual validation and insertion into the non-finalized state.
134    Commit(QueuedSemanticallyVerified),
135    /// The hash of a block that should be invalidated and removed from
136    /// the non-finalized state, if present.
137    Invalidate {
138        hash: block::Hash,
139        rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
140    },
141    /// The hash of a block that was previously invalidated but should be
142    /// reconsidered and reinserted into the non-finalized state.
143    Reconsider {
144        hash: block::Hash,
145        rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, BoxError>>,
146    },
147}
148
149impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
150    fn from(block: QueuedSemanticallyVerified) -> Self {
151        NonFinalizedWriteMessage::Commit(block)
152    }
153}
154
155/// A worker with a task that reads, validates, and writes blocks to the
156/// `finalized_state` or `non_finalized_state` and channels for sending
157/// it blocks.
158#[derive(Clone, Debug)]
159pub(super) struct BlockWriteSender {
160    /// A channel to send blocks to the `block_write_task`,
161    /// so they can be written to the [`NonFinalizedState`].
162    pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
163
164    /// A channel to send blocks to the `block_write_task`,
165    /// so they can be written to the [`FinalizedState`].
166    ///
167    /// This sender is dropped after the state has finished sending all the checkpointed blocks,
168    /// and the lowest semantically verified block arrives.
169    pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
170}
171
172impl BlockWriteSender {
173    /// Creates a new [`BlockWriteSender`] with the given receivers and states.
174    #[instrument(
175        level = "debug",
176        skip_all,
177        fields(
178            network = %non_finalized_state.network
179        )
180    )]
181    pub fn spawn(
182        finalized_state: FinalizedState,
183        non_finalized_state: NonFinalizedState,
184        chain_tip_sender: ChainTipSender,
185        non_finalized_state_sender: watch::Sender<NonFinalizedState>,
186        should_use_finalized_block_write_sender: bool,
187    ) -> (
188        Self,
189        tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
190        Option<Arc<std::thread::JoinHandle<()>>>,
191    ) {
192        // Security: The number of blocks in these channels is limited by
193        //           the syncer and inbound lookahead limits.
194        let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
195            tokio::sync::mpsc::unbounded_channel();
196        let (finalized_block_write_sender, finalized_block_write_receiver) =
197            tokio::sync::mpsc::unbounded_channel();
198        let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
199            tokio::sync::mpsc::unbounded_channel();
200
201        let span = Span::current();
202        let task = std::thread::spawn(move || {
203            span.in_scope(|| {
204                WriteBlockWorkerTask {
205                    finalized_block_write_receiver,
206                    non_finalized_block_write_receiver,
207                    finalized_state,
208                    non_finalized_state,
209                    invalid_block_reset_sender,
210                    chain_tip_sender,
211                    non_finalized_state_sender,
212                }
213                .run()
214            })
215        });
216
217        (
218            Self {
219                non_finalized: Some(non_finalized_block_write_sender),
220                finalized: Some(finalized_block_write_sender)
221                    .filter(|_| should_use_finalized_block_write_sender),
222            },
223            invalid_block_write_reset_receiver,
224            Some(Arc::new(task)),
225        )
226    }
227}
228
229impl WriteBlockWorkerTask {
230    /// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`,
231    /// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and
232    /// `non_finalized_state_sender`.
233    #[instrument(
234        level = "debug",
235        skip(self),
236        fields(
237            network = %self.non_finalized_state.network
238        )
239    )]
240    pub fn run(mut self) {
241        let Self {
242            finalized_block_write_receiver,
243            non_finalized_block_write_receiver,
244            finalized_state,
245            non_finalized_state,
246            invalid_block_reset_sender,
247            chain_tip_sender,
248            non_finalized_state_sender,
249        } = &mut self;
250
251        let mut last_zebra_mined_log_height = None;
252        let mut prev_finalized_note_commitment_trees = None;
253
254        // Write all the finalized blocks sent by the state,
255        // until the state closes the finalized block channel's sender.
256        while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
257            // TODO: split these checks into separate functions
258
259            if invalid_block_reset_sender.is_closed() {
260                info!("StateService closed the block reset channel. Is Zebra shutting down?");
261                return;
262            }
263
264            // Discard any children of invalid blocks in the channel
265            //
266            // `commit_finalized()` requires blocks in height order.
267            // So if there has been a block commit error,
268            // we need to drop all the descendants of that block,
269            // until we receive a block at the required next height.
270            let next_valid_height = finalized_state
271                .db
272                .finalized_tip_height()
273                .map(|height| (height + 1).expect("committed heights are valid"))
274                .unwrap_or(Height(0));
275
276            if ordered_block.0.height != next_valid_height {
277                debug!(
278                    ?next_valid_height,
279                    invalid_height = ?ordered_block.0.height,
280                    invalid_hash = ?ordered_block.0.hash,
281                    "got a block that was the wrong height. \
282                     Assuming a parent block failed, and dropping this block",
283                );
284
285                // We don't want to send a reset here, because it could overwrite a valid sent hash
286                std::mem::drop(ordered_block);
287                continue;
288            }
289
290            // Try committing the block
291            match finalized_state
292                .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
293            {
294                Ok((finalized, note_commitment_trees)) => {
295                    let tip_block = ChainTipBlock::from(finalized);
296                    prev_finalized_note_commitment_trees = Some(note_commitment_trees);
297
298                    log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
299
300                    chain_tip_sender.set_finalized_tip(tip_block);
301                }
302                Err(error) => {
303                    let finalized_tip = finalized_state.db.tip();
304
305                    // The last block in the queue failed, so we can't commit the next block.
306                    // Instead, we need to reset the state queue,
307                    // and discard any children of the invalid block in the channel.
308                    info!(
309                        ?error,
310                        last_valid_height = ?finalized_tip.map(|tip| tip.0),
311                        last_valid_hash = ?finalized_tip.map(|tip| tip.1),
312                        "committing a block to the finalized state failed, resetting state queue",
313                    );
314
315                    let send_result =
316                        invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
317
318                    if send_result.is_err() {
319                        info!(
320                            "StateService closed the block reset channel. Is Zebra shutting down?"
321                        );
322                        return;
323                    }
324                }
325            }
326        }
327
328        // Do this check even if the channel got closed before any finalized blocks were sent.
329        // This can happen if we're past the finalized tip.
330        if invalid_block_reset_sender.is_closed() {
331            info!("StateService closed the block reset channel. Is Zebra shutting down?");
332            return;
333        }
334
335        // Save any errors to propagate down to queued child blocks
336        let mut parent_error_map: IndexMap<block::Hash, CommitSemanticallyVerifiedError> =
337            IndexMap::new();
338
339        while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
340            let queued_child_and_rsp_tx = match msg {
341                NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
342                NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
343                    tracing::info!(?hash, "invalidating a block in the non-finalized state");
344                    let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
345                    None
346                }
347                NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
348                    tracing::info!(?hash, "reconsidering a block in the non-finalized state");
349                    let _ = rsp_tx.send(
350                        non_finalized_state
351                            .reconsider_block(hash, &finalized_state.db)
352                            .map_err(BoxError::from),
353                    );
354                    None
355                }
356            };
357
358            let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
359                update_latest_chain_channels(
360                    non_finalized_state,
361                    chain_tip_sender,
362                    non_finalized_state_sender,
363                    &mut last_zebra_mined_log_height,
364                );
365                continue;
366            };
367
368            let child_hash = queued_child.hash;
369            let parent_hash = queued_child.block.header.previous_block_hash;
370            let parent_error = parent_error_map.get(&parent_hash);
371
372            let result;
373
374            // If the parent block was marked as rejected, also reject all its children.
375            //
376            // At this point, we know that all the block's descendants
377            // are invalid, because we checked all the consensus rules before
378            // committing the failing ancestor block to the non-finalized state.
379            if let Some(parent_error) = parent_error {
380                result = Err(parent_error.clone());
381            } else {
382                tracing::trace!(?child_hash, "validating queued child");
383                result = validate_and_commit_non_finalized(
384                    &finalized_state.db,
385                    non_finalized_state,
386                    queued_child,
387                )
388            }
389
390            // TODO: fix the test timing bugs that require the result to be sent
391            //       after `update_latest_chain_channels()`,
392            //       and send the result on rsp_tx here
393
394            if let Err(ref error) = result {
395                // Update the caller with the error.
396                let _ = rsp_tx.send(result.clone().map(|()| child_hash));
397
398                // If the block is invalid, mark any descendant blocks as rejected.
399                parent_error_map.insert(child_hash, error.clone());
400
401                // Make sure the error map doesn't get too big.
402                if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
403                    // We only add one hash at a time, so we only need to remove one extra here.
404                    parent_error_map.shift_remove_index(0);
405                }
406
407                // Skip the things we only need to do for successfully committed blocks
408                continue;
409            }
410
411            // Committing blocks to the finalized state keeps the same chain,
412            // so we can update the chain seen by the rest of the application now.
413            //
414            // TODO: if this causes state request errors due to chain conflicts,
415            //       fix the `service::read` bugs,
416            //       or do the channel update after the finalized state commit
417            let tip_block_height = update_latest_chain_channels(
418                non_finalized_state,
419                chain_tip_sender,
420                non_finalized_state_sender,
421                &mut last_zebra_mined_log_height,
422            );
423
424            // Update the caller with the result.
425            let _ = rsp_tx.send(result.clone().map(|()| child_hash));
426
427            while non_finalized_state
428                .best_chain_len()
429                .expect("just successfully inserted a non-finalized block above")
430                > MAX_BLOCK_REORG_HEIGHT
431            {
432                tracing::trace!("finalizing block past the reorg limit");
433                let contextually_verified_with_trees = non_finalized_state.finalize();
434                prev_finalized_note_commitment_trees = finalized_state
435                            .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
436                            .expect(
437                                "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
438                            ).1.into();
439            }
440
441            // Update the metrics if semantic and contextual validation passes
442            //
443            // TODO: split this out into a function?
444            metrics::counter!("state.full_verifier.committed.block.count").increment(1);
445            metrics::counter!("zcash.chain.verified.block.total").increment(1);
446
447            metrics::gauge!("state.full_verifier.committed.block.height")
448                .set(tip_block_height.0 as f64);
449
450            // This height gauge is updated for both fully verified and checkpoint blocks.
451            // These updates can't conflict, because this block write task makes sure that blocks
452            // are committed in order.
453            metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
454
455            tracing::trace!("finished processing queued block");
456        }
457
458        // We're finished receiving non-finalized blocks from the state, and
459        // done writing to the finalized state, so we can force it to shut down.
460        finalized_state.db.shutdown(true);
461        std::mem::drop(self.finalized_state);
462    }
463}
464
465/// Log a message if this block was mined by Zebra.
466///
467/// Does not detect early Zebra blocks, and blocks with custom coinbase transactions.
468/// Rate-limited to every 1000 blocks using `last_zebra_mined_log_height`.
469fn log_if_mined_by_zebra(
470    tip_block: &ChainTipBlock,
471    last_zebra_mined_log_height: &mut Option<Height>,
472) {
473    // This logs at most every 2-3 checkpoints, which seems fine.
474    const LOG_RATE_LIMIT: u32 = 1000;
475
476    let height = tip_block.height.0;
477
478    if let Some(last_height) = last_zebra_mined_log_height {
479        if height < last_height.0 + LOG_RATE_LIMIT {
480            // If we logged in the last 1000 blocks, don't log anything now.
481            return;
482        }
483    };
484
485    // This code is rate-limited, so we can do expensive transformations here.
486    let coinbase_data = tip_block.transactions[0].inputs()[0]
487        .extra_coinbase_data()
488        .expect("valid blocks must start with a coinbase input")
489        .clone();
490
491    if coinbase_data
492        .as_ref()
493        .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
494    {
495        let text = String::from_utf8_lossy(coinbase_data.as_ref());
496
497        *last_zebra_mined_log_height = Some(Height(height));
498
499        // No need for hex-encoded data if it's exactly what we expected.
500        if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
501            info!(
502                %text,
503                %height,
504                hash = %tip_block.hash,
505                "looks like this block was mined by Zebra!"
506            );
507        } else {
508            // # Security
509            //
510            // Use the extra data as an allow-list, replacing unknown characters.
511            // This makes sure control characters and harmful messages don't get logged
512            // to the terminal.
513            let text = text.replace(
514                |c: char| {
515                    !EXTRA_ZEBRA_COINBASE_DATA
516                        .to_ascii_lowercase()
517                        .contains(c.to_ascii_lowercase())
518                },
519                "?",
520            );
521            let data = hex::encode(coinbase_data.as_ref());
522
523            info!(
524                %text,
525                %data,
526                %height,
527                hash = %tip_block.hash,
528                "looks like this block was mined by Zebra!"
529            );
530        }
531    }
532}