zebra_state/service/
write.rs

1//! Writing blocks to the finalized and non-finalized states.
2
3use std::sync::Arc;
4
5use indexmap::IndexMap;
6use tokio::sync::{
7    mpsc::{UnboundedReceiver, UnboundedSender},
8    oneshot, watch,
9};
10
11use tracing::Span;
12use zebra_chain::{
13    block::{self, Height},
14    transparent::EXTRA_ZEBRA_COINBASE_DATA,
15};
16
17use crate::{
18    constants::MAX_BLOCK_REORG_HEIGHT,
19    service::{
20        check,
21        finalized_state::{FinalizedState, ZebraDb},
22        non_finalized_state::NonFinalizedState,
23        queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified},
24        BoxError, ChainTipBlock, ChainTipSender, CloneError,
25    },
26    CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock,
27};
28
29// These types are used in doc links
30#[allow(unused_imports)]
31use crate::service::{
32    chain_tip::{ChainTipChange, LatestChainTip},
33    non_finalized_state::Chain,
34};
35
36/// The maximum size of the parent error map.
37///
38/// We allow enough space for multiple concurrent chain forks with errors.
39const PARENT_ERROR_MAP_LIMIT: usize = MAX_BLOCK_REORG_HEIGHT as usize * 2;
40
41/// Run contextual validation on the prepared block and add it to the
42/// non-finalized state if it is contextually valid.
43#[tracing::instrument(
44    level = "debug",
45    skip(finalized_state, non_finalized_state, prepared),
46    fields(
47        height = ?prepared.height,
48        hash = %prepared.hash,
49        chains = non_finalized_state.chain_count()
50    )
51)]
52pub(crate) fn validate_and_commit_non_finalized(
53    finalized_state: &ZebraDb,
54    non_finalized_state: &mut NonFinalizedState,
55    prepared: SemanticallyVerifiedBlock,
56) -> Result<(), CommitSemanticallyVerifiedError> {
57    check::initial_contextual_validity(finalized_state, non_finalized_state, &prepared)?;
58    let parent_hash = prepared.block.header.previous_block_hash;
59
60    if finalized_state.finalized_tip_hash() == parent_hash {
61        non_finalized_state.commit_new_chain(prepared, finalized_state)?;
62    } else {
63        non_finalized_state.commit_block(prepared, finalized_state)?;
64    }
65
66    Ok(())
67}
68
69/// Update the [`LatestChainTip`], [`ChainTipChange`], and `non_finalized_state_sender`
70/// channels with the latest non-finalized [`ChainTipBlock`] and
71/// [`Chain`].
72///
73/// `last_zebra_mined_log_height` is used to rate-limit logging.
74///
75/// Returns the latest non-finalized chain tip height.
76///
77/// # Panics
78///
79/// If the `non_finalized_state` is empty.
80#[instrument(
81    level = "debug",
82    skip(
83        non_finalized_state,
84        chain_tip_sender,
85        non_finalized_state_sender,
86        last_zebra_mined_log_height
87    ),
88    fields(chains = non_finalized_state.chain_count())
89)]
90fn update_latest_chain_channels(
91    non_finalized_state: &NonFinalizedState,
92    chain_tip_sender: &mut ChainTipSender,
93    non_finalized_state_sender: &watch::Sender<NonFinalizedState>,
94    last_zebra_mined_log_height: &mut Option<Height>,
95) -> block::Height {
96    let best_chain = non_finalized_state.best_chain().expect("unexpected empty non-finalized state: must commit at least one block before updating channels");
97
98    let tip_block = best_chain
99        .tip_block()
100        .expect("unexpected empty chain: must commit at least one block before updating channels")
101        .clone();
102    let tip_block = ChainTipBlock::from(tip_block);
103
104    log_if_mined_by_zebra(&tip_block, last_zebra_mined_log_height);
105
106    let tip_block_height = tip_block.height;
107
108    // If the final receiver was just dropped, ignore the error.
109    let _ = non_finalized_state_sender.send(non_finalized_state.clone());
110
111    chain_tip_sender.set_best_non_finalized_tip(tip_block);
112
113    tip_block_height
114}
115
116/// A worker task that reads, validates, and writes blocks to the
117/// `finalized_state` or `non_finalized_state`.
118struct WriteBlockWorkerTask {
119    finalized_block_write_receiver: UnboundedReceiver<QueuedCheckpointVerified>,
120    non_finalized_block_write_receiver: UnboundedReceiver<NonFinalizedWriteMessage>,
121    finalized_state: FinalizedState,
122    non_finalized_state: NonFinalizedState,
123    invalid_block_reset_sender: UnboundedSender<block::Hash>,
124    chain_tip_sender: ChainTipSender,
125    non_finalized_state_sender: watch::Sender<NonFinalizedState>,
126}
127
128/// The message type for the non-finalized block write task channel.
129pub enum NonFinalizedWriteMessage {
130    /// A newly downloaded and semantically verified block prepared for
131    /// contextual validation and insertion into the non-finalized state.
132    Commit(QueuedSemanticallyVerified),
133    /// The hash of a block that should be invalidated and removed from
134    /// the non-finalized state, if present.
135    Invalidate {
136        hash: block::Hash,
137        rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
138    },
139    /// The hash of a block that was previously invalidated but should be
140    /// reconsidered and reinserted into the non-finalized state.
141    Reconsider {
142        hash: block::Hash,
143        rsp_tx: oneshot::Sender<Result<Vec<block::Hash>, BoxError>>,
144    },
145}
146
147impl From<QueuedSemanticallyVerified> for NonFinalizedWriteMessage {
148    fn from(block: QueuedSemanticallyVerified) -> Self {
149        NonFinalizedWriteMessage::Commit(block)
150    }
151}
152
153/// A worker with a task that reads, validates, and writes blocks to the
154/// `finalized_state` or `non_finalized_state` and channels for sending
155/// it blocks.
156#[derive(Clone, Debug)]
157pub(super) struct BlockWriteSender {
158    /// A channel to send blocks to the `block_write_task`,
159    /// so they can be written to the [`NonFinalizedState`].
160    pub non_finalized: Option<tokio::sync::mpsc::UnboundedSender<NonFinalizedWriteMessage>>,
161
162    /// A channel to send blocks to the `block_write_task`,
163    /// so they can be written to the [`FinalizedState`].
164    ///
165    /// This sender is dropped after the state has finished sending all the checkpointed blocks,
166    /// and the lowest semantically verified block arrives.
167    pub finalized: Option<tokio::sync::mpsc::UnboundedSender<QueuedCheckpointVerified>>,
168}
169
170impl BlockWriteSender {
171    /// Creates a new [`BlockWriteSender`] with the given receivers and states.
172    #[instrument(
173        level = "debug",
174        skip_all,
175        fields(
176            network = %non_finalized_state.network
177        )
178    )]
179    pub fn spawn(
180        finalized_state: FinalizedState,
181        non_finalized_state: NonFinalizedState,
182        chain_tip_sender: ChainTipSender,
183        non_finalized_state_sender: watch::Sender<NonFinalizedState>,
184    ) -> (
185        Self,
186        tokio::sync::mpsc::UnboundedReceiver<block::Hash>,
187        Option<Arc<std::thread::JoinHandle<()>>>,
188    ) {
189        // Security: The number of blocks in these channels is limited by
190        //           the syncer and inbound lookahead limits.
191        let (non_finalized_block_write_sender, non_finalized_block_write_receiver) =
192            tokio::sync::mpsc::unbounded_channel();
193        let (finalized_block_write_sender, finalized_block_write_receiver) =
194            tokio::sync::mpsc::unbounded_channel();
195        let (invalid_block_reset_sender, invalid_block_write_reset_receiver) =
196            tokio::sync::mpsc::unbounded_channel();
197
198        let span = Span::current();
199        let task = std::thread::spawn(move || {
200            span.in_scope(|| {
201                WriteBlockWorkerTask {
202                    finalized_block_write_receiver,
203                    non_finalized_block_write_receiver,
204                    finalized_state,
205                    non_finalized_state,
206                    invalid_block_reset_sender,
207                    chain_tip_sender,
208                    non_finalized_state_sender,
209                }
210                .run()
211            })
212        });
213
214        (
215            Self {
216                non_finalized: Some(non_finalized_block_write_sender),
217                finalized: Some(finalized_block_write_sender),
218            },
219            invalid_block_write_reset_receiver,
220            Some(Arc::new(task)),
221        )
222    }
223}
224
225impl WriteBlockWorkerTask {
226    /// Reads blocks from the channels, writes them to the `finalized_state` or `non_finalized_state`,
227    /// sends any errors on the `invalid_block_reset_sender`, then updates the `chain_tip_sender` and
228    /// `non_finalized_state_sender`.
229    #[instrument(
230        level = "debug",
231        skip(self),
232        fields(
233            network = %self.non_finalized_state.network
234        )
235    )]
236    pub fn run(mut self) {
237        let Self {
238            finalized_block_write_receiver,
239            non_finalized_block_write_receiver,
240            finalized_state,
241            non_finalized_state,
242            invalid_block_reset_sender,
243            chain_tip_sender,
244            non_finalized_state_sender,
245        } = &mut self;
246
247        let mut last_zebra_mined_log_height = None;
248        let mut prev_finalized_note_commitment_trees = None;
249
250        // Write all the finalized blocks sent by the state,
251        // until the state closes the finalized block channel's sender.
252        while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() {
253            // TODO: split these checks into separate functions
254
255            if invalid_block_reset_sender.is_closed() {
256                info!("StateService closed the block reset channel. Is Zebra shutting down?");
257                return;
258            }
259
260            // Discard any children of invalid blocks in the channel
261            //
262            // `commit_finalized()` requires blocks in height order.
263            // So if there has been a block commit error,
264            // we need to drop all the descendants of that block,
265            // until we receive a block at the required next height.
266            let next_valid_height = finalized_state
267                .db
268                .finalized_tip_height()
269                .map(|height| (height + 1).expect("committed heights are valid"))
270                .unwrap_or(Height(0));
271
272            if ordered_block.0.height != next_valid_height {
273                debug!(
274                    ?next_valid_height,
275                    invalid_height = ?ordered_block.0.height,
276                    invalid_hash = ?ordered_block.0.hash,
277                    "got a block that was the wrong height. \
278                     Assuming a parent block failed, and dropping this block",
279                );
280
281                // We don't want to send a reset here, because it could overwrite a valid sent hash
282                std::mem::drop(ordered_block);
283                continue;
284            }
285
286            // Try committing the block
287            match finalized_state
288                .commit_finalized(ordered_block, prev_finalized_note_commitment_trees.take())
289            {
290                Ok((finalized, note_commitment_trees)) => {
291                    let tip_block = ChainTipBlock::from(finalized);
292                    prev_finalized_note_commitment_trees = Some(note_commitment_trees);
293
294                    log_if_mined_by_zebra(&tip_block, &mut last_zebra_mined_log_height);
295
296                    chain_tip_sender.set_finalized_tip(tip_block);
297                }
298                Err(error) => {
299                    let finalized_tip = finalized_state.db.tip();
300
301                    // The last block in the queue failed, so we can't commit the next block.
302                    // Instead, we need to reset the state queue,
303                    // and discard any children of the invalid block in the channel.
304                    info!(
305                        ?error,
306                        last_valid_height = ?finalized_tip.map(|tip| tip.0),
307                        last_valid_hash = ?finalized_tip.map(|tip| tip.1),
308                        "committing a block to the finalized state failed, resetting state queue",
309                    );
310
311                    let send_result =
312                        invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash());
313
314                    if send_result.is_err() {
315                        info!(
316                            "StateService closed the block reset channel. Is Zebra shutting down?"
317                        );
318                        return;
319                    }
320                }
321            }
322        }
323
324        // Do this check even if the channel got closed before any finalized blocks were sent.
325        // This can happen if we're past the finalized tip.
326        if invalid_block_reset_sender.is_closed() {
327            info!("StateService closed the block reset channel. Is Zebra shutting down?");
328            return;
329        }
330
331        // Save any errors to propagate down to queued child blocks
332        let mut parent_error_map: IndexMap<block::Hash, CloneError> = IndexMap::new();
333
334        while let Some(msg) = non_finalized_block_write_receiver.blocking_recv() {
335            let queued_child_and_rsp_tx = match msg {
336                NonFinalizedWriteMessage::Commit(queued_child) => Some(queued_child),
337                NonFinalizedWriteMessage::Invalidate { hash, rsp_tx } => {
338                    tracing::info!(?hash, "invalidating a block in the non-finalized state");
339                    let _ = rsp_tx.send(non_finalized_state.invalidate_block(hash));
340                    None
341                }
342                NonFinalizedWriteMessage::Reconsider { hash, rsp_tx } => {
343                    tracing::info!(?hash, "reconsidering a block in the non-finalized state");
344                    let _ = rsp_tx.send(
345                        non_finalized_state
346                            .reconsider_block(hash, &finalized_state.db)
347                            .map_err(BoxError::from),
348                    );
349                    None
350                }
351            };
352
353            let Some((queued_child, rsp_tx)) = queued_child_and_rsp_tx else {
354                update_latest_chain_channels(
355                    non_finalized_state,
356                    chain_tip_sender,
357                    non_finalized_state_sender,
358                    &mut last_zebra_mined_log_height,
359                );
360                continue;
361            };
362
363            let child_hash = queued_child.hash;
364            let parent_hash = queued_child.block.header.previous_block_hash;
365            let parent_error = parent_error_map.get(&parent_hash);
366
367            let result;
368
369            // If the parent block was marked as rejected, also reject all its children.
370            //
371            // At this point, we know that all the block's descendants
372            // are invalid, because we checked all the consensus rules before
373            // committing the failing ancestor block to the non-finalized state.
374            if let Some(parent_error) = parent_error {
375                tracing::trace!(
376                    ?child_hash,
377                    ?parent_error,
378                    "rejecting queued child due to parent error"
379                );
380                result = Err(parent_error.clone());
381            } else {
382                tracing::trace!(?child_hash, "validating queued child");
383                result = validate_and_commit_non_finalized(
384                    &finalized_state.db,
385                    non_finalized_state,
386                    queued_child,
387                )
388                .map_err(CloneError::from);
389            }
390
391            // TODO: fix the test timing bugs that require the result to be sent
392            //       after `update_latest_chain_channels()`,
393            //       and send the result on rsp_tx here
394
395            if let Err(ref error) = result {
396                // Update the caller with the error.
397                let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
398
399                // If the block is invalid, mark any descendant blocks as rejected.
400                parent_error_map.insert(child_hash, error.clone());
401
402                // Make sure the error map doesn't get too big.
403                if parent_error_map.len() > PARENT_ERROR_MAP_LIMIT {
404                    // We only add one hash at a time, so we only need to remove one extra here.
405                    parent_error_map.shift_remove_index(0);
406                }
407
408                // Skip the things we only need to do for successfully committed blocks
409                continue;
410            }
411
412            // Committing blocks to the finalized state keeps the same chain,
413            // so we can update the chain seen by the rest of the application now.
414            //
415            // TODO: if this causes state request errors due to chain conflicts,
416            //       fix the `service::read` bugs,
417            //       or do the channel update after the finalized state commit
418            let tip_block_height = update_latest_chain_channels(
419                non_finalized_state,
420                chain_tip_sender,
421                non_finalized_state_sender,
422                &mut last_zebra_mined_log_height,
423            );
424
425            // Update the caller with the result.
426            let _ = rsp_tx.send(result.clone().map(|()| child_hash).map_err(BoxError::from));
427
428            while non_finalized_state
429                .best_chain_len()
430                .expect("just successfully inserted a non-finalized block above")
431                > MAX_BLOCK_REORG_HEIGHT
432            {
433                tracing::trace!("finalizing block past the reorg limit");
434                let contextually_verified_with_trees = non_finalized_state.finalize();
435                prev_finalized_note_commitment_trees = finalized_state
436                            .commit_finalized_direct(contextually_verified_with_trees, prev_finalized_note_commitment_trees.take(), "commit contextually-verified request")
437                            .expect(
438                                "unexpected finalized block commit error: note commitment and history trees were already checked by the non-finalized state",
439                            ).1.into();
440            }
441
442            // Update the metrics if semantic and contextual validation passes
443            //
444            // TODO: split this out into a function?
445            metrics::counter!("state.full_verifier.committed.block.count").increment(1);
446            metrics::counter!("zcash.chain.verified.block.total").increment(1);
447
448            metrics::gauge!("state.full_verifier.committed.block.height")
449                .set(tip_block_height.0 as f64);
450
451            // This height gauge is updated for both fully verified and checkpoint blocks.
452            // These updates can't conflict, because this block write task makes sure that blocks
453            // are committed in order.
454            metrics::gauge!("zcash.chain.verified.block.height").set(tip_block_height.0 as f64);
455
456            tracing::trace!("finished processing queued block");
457        }
458
459        // We're finished receiving non-finalized blocks from the state, and
460        // done writing to the finalized state, so we can force it to shut down.
461        finalized_state.db.shutdown(true);
462        std::mem::drop(self.finalized_state);
463    }
464}
465
466/// Log a message if this block was mined by Zebra.
467///
468/// Does not detect early Zebra blocks, and blocks with custom coinbase transactions.
469/// Rate-limited to every 1000 blocks using `last_zebra_mined_log_height`.
470fn log_if_mined_by_zebra(
471    tip_block: &ChainTipBlock,
472    last_zebra_mined_log_height: &mut Option<Height>,
473) {
474    // This logs at most every 2-3 checkpoints, which seems fine.
475    const LOG_RATE_LIMIT: u32 = 1000;
476
477    let height = tip_block.height.0;
478
479    if let Some(last_height) = last_zebra_mined_log_height {
480        if height < last_height.0 + LOG_RATE_LIMIT {
481            // If we logged in the last 1000 blocks, don't log anything now.
482            return;
483        }
484    };
485
486    // This code is rate-limited, so we can do expensive transformations here.
487    let coinbase_data = tip_block.transactions[0].inputs()[0]
488        .extra_coinbase_data()
489        .expect("valid blocks must start with a coinbase input")
490        .clone();
491
492    if coinbase_data
493        .as_ref()
494        .starts_with(EXTRA_ZEBRA_COINBASE_DATA.as_bytes())
495    {
496        let text = String::from_utf8_lossy(coinbase_data.as_ref());
497
498        *last_zebra_mined_log_height = Some(Height(height));
499
500        // No need for hex-encoded data if it's exactly what we expected.
501        if coinbase_data.as_ref() == EXTRA_ZEBRA_COINBASE_DATA.as_bytes() {
502            info!(
503                %text,
504                %height,
505                hash = %tip_block.hash,
506                "looks like this block was mined by Zebra!"
507            );
508        } else {
509            // # Security
510            //
511            // Use the extra data as an allow-list, replacing unknown characters.
512            // This makes sure control characters and harmful messages don't get logged
513            // to the terminal.
514            let text = text.replace(
515                |c: char| {
516                    !EXTRA_ZEBRA_COINBASE_DATA
517                        .to_ascii_lowercase()
518                        .contains(c.to_ascii_lowercase())
519                },
520                "?",
521            );
522            let data = hex::encode(coinbase_data.as_ref());
523
524            info!(
525                %text,
526                %data,
527                %height,
528                hash = %tip_block.hash,
529                "looks like this block was mined by Zebra!"
530            );
531        }
532    }
533}