zebrad/components/
miner.rs

1//! Internal mining in Zebra.
2//!
3//! # TODO
4//! - pause mining if we have no peers, like `zcashd` does,
5//!   and add a developer config that mines regardless of how many peers we have.
6//!   <https://github.com/zcash/zcash/blob/6fdd9f1b81d3b228326c9826fa10696fc516444b/src/miner.cpp#L865-L880>
7//! - move common code into zebra-chain or zebra-node-services and remove the RPC dependency.
8
9use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
10
11use color_eyre::Report;
12use futures::{stream::FuturesUnordered, StreamExt};
13use thread_priority::{ThreadBuilder, ThreadPriority};
14use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
15use tower::Service;
16use tracing::{Instrument, Span};
17
18use zebra_chain::{
19    block::{self, Block},
20    chain_sync_status::ChainSyncStatus,
21    chain_tip::ChainTip,
22    diagnostic::task::WaitForPanics,
23    serialization::{AtLeastOne, ZcashSerialize},
24    shutdown::is_shutting_down,
25    work::equihash::{Solution, SolverCancelled},
26};
27use zebra_network::AddressBookPeers;
28use zebra_node_services::mempool;
29use zebra_rpc::{
30    client::{
31        BlockTemplateTimeSource,
32        GetBlockTemplateCapability::{CoinbaseTxn, LongPoll},
33        GetBlockTemplateParameters,
34        GetBlockTemplateRequestMode::Template,
35        HexData,
36    },
37    methods::{RpcImpl, RpcServer},
38    proposal_block_from_template,
39};
40use zebra_state::WatchReceiver;
41
42use crate::components::metrics::Config;
43
44/// The amount of time we wait between block template retries.
45pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
46
47/// A rate-limit for block template refreshes.
48pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
49
50/// How long we wait after mining a block, before expecting a new template.
51///
52/// This should be slightly longer than `BLOCK_TEMPLATE_REFRESH_LIMIT` to allow for template
53/// generation.
54pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
55
56/// Initialize the miner based on its config, and spawn a task for it.
57///
58/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured
59/// mining thread.
60///
61/// See [`run_mining_solver()`] for more details.
62pub fn spawn_init<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>(
63    config: &Config,
64    rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
65) -> JoinHandle<Result<(), Report>>
66// TODO: simplify or avoid repeating these generics (how?)
67where
68    Mempool: Service<
69            mempool::Request,
70            Response = mempool::Response,
71            Error = zebra_node_services::BoxError,
72        > + Clone
73        + Send
74        + Sync
75        + 'static,
76    Mempool::Future: Send,
77    State: Service<
78            zebra_state::Request,
79            Response = zebra_state::Response,
80            Error = zebra_state::BoxError,
81        > + Clone
82        + Send
83        + Sync
84        + 'static,
85    <State as Service<zebra_state::Request>>::Future: Send,
86    ReadState: Service<
87            zebra_state::ReadRequest,
88            Response = zebra_state::ReadResponse,
89            Error = zebra_state::BoxError,
90        > + Clone
91        + Send
92        + Sync
93        + 'static,
94    <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
95    Tip: ChainTip + Clone + Send + Sync + 'static,
96    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
97        + Clone
98        + Send
99        + Sync
100        + 'static,
101    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
102    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
103    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
104{
105    // TODO: spawn an entirely new executor here, so mining is isolated from higher priority tasks.
106    tokio::spawn(init(config.clone(), rpc).in_current_span())
107}
108
109/// Initialize the miner based on its config.
110///
111/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured
112/// mining thread.
113///
114/// See [`run_mining_solver()`] for more details.
115pub async fn init<Mempool, State, ReadState, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
116    _config: Config,
117    rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
118) -> Result<(), Report>
119where
120    Mempool: Service<
121            mempool::Request,
122            Response = mempool::Response,
123            Error = zebra_node_services::BoxError,
124        > + Clone
125        + Send
126        + Sync
127        + 'static,
128    Mempool::Future: Send,
129    State: Service<
130            zebra_state::Request,
131            Response = zebra_state::Response,
132            Error = zebra_state::BoxError,
133        > + Clone
134        + Send
135        + Sync
136        + 'static,
137    <State as Service<zebra_state::Request>>::Future: Send,
138    ReadState: Service<
139            zebra_state::ReadRequest,
140            Response = zebra_state::ReadResponse,
141            Error = zebra_state::BoxError,
142        > + Clone
143        + Send
144        + Sync
145        + 'static,
146    <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
147    Tip: ChainTip + Clone + Send + Sync + 'static,
148    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
149        + Clone
150        + Send
151        + Sync
152        + 'static,
153    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
154    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
155    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
156{
157    // TODO: change this to `config.internal_miner_threads` once mining tasks are cancelled when the best tip changes (#8797)
158    let configured_threads = 1;
159    // If we can't detect the number of cores, use the configured number.
160    let available_threads = available_parallelism()
161        .map(usize::from)
162        .unwrap_or(configured_threads);
163
164    // Use the minimum of the configured and available threads.
165    let solver_count = min(configured_threads, available_threads);
166
167    info!(
168        ?solver_count,
169        "launching mining tasks with parallel solvers"
170    );
171
172    let (template_sender, template_receiver) = watch::channel(None);
173    let template_receiver = WatchReceiver::new(template_receiver);
174
175    // Spawn these tasks, to avoid blocked cooperative futures, and improve shutdown responsiveness.
176    // This is particularly important when there are a large number of solver threads.
177    let mut abort_handles = Vec::new();
178
179    let template_generator = tokio::task::spawn(
180        generate_block_templates(rpc.clone(), template_sender).in_current_span(),
181    );
182    abort_handles.push(template_generator.abort_handle());
183    let template_generator = template_generator.wait_for_panics();
184
185    let mut mining_solvers = FuturesUnordered::new();
186    for solver_id in 0..solver_count {
187        // Assume there are less than 256 cores. If there are more, only run 256 tasks.
188        let solver_id = min(solver_id, usize::from(u8::MAX))
189            .try_into()
190            .expect("just limited to u8::MAX");
191
192        let solver = tokio::task::spawn(
193            run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
194        );
195        abort_handles.push(solver.abort_handle());
196
197        mining_solvers.push(solver.wait_for_panics());
198    }
199
200    // These tasks run forever unless there is a fatal error or shutdown.
201    // When that happens, the first task to error returns, and the other JoinHandle futures are
202    // cancelled.
203    let first_result;
204    select! {
205        result = template_generator => { first_result = result; }
206        result = mining_solvers.next() => {
207            first_result = result
208                .expect("stream never terminates because there is at least one solver task");
209        }
210    }
211
212    // But the spawned async tasks keep running, so we need to abort them here.
213    for abort_handle in abort_handles {
214        abort_handle.abort();
215    }
216
217    // Any spawned blocking threads will keep running. When this task returns and drops the
218    // `template_sender`, it cancels all the spawned miner threads. This works because we've
219    // aborted the `template_generator` task, which owns the `template_sender`. (And it doesn't
220    // spawn any blocking threads.)
221    first_result
222}
223
224/// Generates block templates using `rpc`, and sends them to mining threads using `template_sender`.
225#[instrument(skip(rpc, template_sender))]
226pub async fn generate_block_templates<
227    Mempool,
228    State,
229    ReadState,
230    Tip,
231    BlockVerifierRouter,
232    SyncStatus,
233    AddressBook,
234>(
235    rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
236    template_sender: watch::Sender<Option<Arc<Block>>>,
237) -> Result<(), Report>
238where
239    Mempool: Service<
240            mempool::Request,
241            Response = mempool::Response,
242            Error = zebra_node_services::BoxError,
243        > + Clone
244        + Send
245        + Sync
246        + 'static,
247    Mempool::Future: Send,
248    State: Service<
249            zebra_state::Request,
250            Response = zebra_state::Response,
251            Error = zebra_state::BoxError,
252        > + Clone
253        + Send
254        + Sync
255        + 'static,
256    <State as Service<zebra_state::Request>>::Future: Send,
257    ReadState: Service<
258            zebra_state::ReadRequest,
259            Response = zebra_state::ReadResponse,
260            Error = zebra_state::BoxError,
261        > + Clone
262        + Send
263        + Sync
264        + 'static,
265    <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
266    Tip: ChainTip + Clone + Send + Sync + 'static,
267    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
268        + Clone
269        + Send
270        + Sync
271        + 'static,
272    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
273    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
274    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
275{
276    // Pass the correct arguments, even if Zebra currently ignores them.
277    let mut parameters =
278        GetBlockTemplateParameters::new(Template, None, vec![LongPoll, CoinbaseTxn], None, None);
279
280    // Shut down the task when all the template receivers are dropped, or Zebra shuts down.
281    while !template_sender.is_closed() && !is_shutting_down() {
282        let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
283
284        // Wait for the chain to sync so we get a valid template.
285        let Ok(template) = template else {
286            warn!(
287                ?BLOCK_TEMPLATE_WAIT_TIME,
288                ?template,
289                "waiting for a valid block template",
290            );
291
292            // Skip the wait if we got an error because we are shutting down.
293            if !is_shutting_down() {
294                sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
295            }
296
297            continue;
298        };
299
300        // Convert from RPC GetBlockTemplate to Block
301        let template = template
302            .try_into_template()
303            .expect("invalid RPC response: proposal in response to a template request");
304
305        info!(
306            height = ?template.height(),
307            transactions = ?template.transactions().len(),
308            "mining with an updated block template",
309        );
310
311        // Tell the next get_block_template() call to wait until the template has changed.
312        parameters = GetBlockTemplateParameters::new(
313            Template,
314            None,
315            vec![LongPoll, CoinbaseTxn],
316            Some(template.long_poll_id()),
317            None,
318        );
319
320        let block = proposal_block_from_template(
321            &template,
322            BlockTemplateTimeSource::CurTime,
323            rpc.network(),
324        )?;
325
326        // If the template has actually changed, send an updated template.
327        template_sender.send_if_modified(|old_block| {
328            if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
329                return false;
330            }
331            *old_block = Some(Arc::new(block));
332            true
333        });
334
335        // If the blockchain is changing rapidly, limit how often we'll update the template.
336        // But if we're shutting down, do that immediately.
337        if !template_sender.is_closed() && !is_shutting_down() {
338            sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
339        }
340    }
341
342    Ok(())
343}
344
345/// Runs a single mining thread that gets blocks from the `template_receiver`, calculates equihash
346/// solutions with nonces based on `solver_id`, and submits valid blocks to Zebra's block validator.
347///
348/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core while running.
349/// It can run for minutes or hours if the network difficulty is high. Mining uses a thread with
350/// low CPU priority.
351#[instrument(skip(template_receiver, rpc))]
352pub async fn run_mining_solver<
353    Mempool,
354    State,
355    ReadState,
356    Tip,
357    BlockVerifierRouter,
358    SyncStatus,
359    AddressBook,
360>(
361    solver_id: u8,
362    mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
363    rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
364) -> Result<(), Report>
365where
366    Mempool: Service<
367            mempool::Request,
368            Response = mempool::Response,
369            Error = zebra_node_services::BoxError,
370        > + Clone
371        + Send
372        + Sync
373        + 'static,
374    Mempool::Future: Send,
375    State: Service<
376            zebra_state::Request,
377            Response = zebra_state::Response,
378            Error = zebra_state::BoxError,
379        > + Clone
380        + Send
381        + Sync
382        + 'static,
383    <State as Service<zebra_state::Request>>::Future: Send,
384    ReadState: Service<
385            zebra_state::ReadRequest,
386            Response = zebra_state::ReadResponse,
387            Error = zebra_state::BoxError,
388        > + Clone
389        + Send
390        + Sync
391        + 'static,
392    <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
393    Tip: ChainTip + Clone + Send + Sync + 'static,
394    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
395        + Clone
396        + Send
397        + Sync
398        + 'static,
399    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
400    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
401    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
402{
403    // Shut down the task when the template sender is dropped, or Zebra shuts down.
404    while template_receiver.has_changed().is_ok() && !is_shutting_down() {
405        // Get the latest block template, and mark the current value as seen.
406        // We mark the value first to avoid missed updates.
407        template_receiver.mark_as_seen();
408        let template = template_receiver.cloned_watch_data();
409
410        let Some(template) = template else {
411            if solver_id == 0 {
412                info!(
413                    ?solver_id,
414                    ?BLOCK_TEMPLATE_WAIT_TIME,
415                    "solver waiting for initial block template"
416                );
417            } else {
418                debug!(
419                    ?solver_id,
420                    ?BLOCK_TEMPLATE_WAIT_TIME,
421                    "solver waiting for initial block template"
422                );
423            }
424
425            // Skip the wait if we didn't get a template because we are shutting down.
426            if !is_shutting_down() {
427                sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
428            }
429
430            continue;
431        };
432
433        let height = template.coinbase_height().expect("template is valid");
434
435        // Set up the cancellation conditions for the miner.
436        let mut cancel_receiver = template_receiver.clone();
437        let old_header = *template.header;
438        let cancel_fn = move || match cancel_receiver.has_changed() {
439            // Guard against get_block_template() providing an identical header. This could happen
440            // if something irrelevant to the block data changes, the time was within 1 second, or
441            // there is a spurious channel change.
442            Ok(has_changed) => {
443                cancel_receiver.mark_as_seen();
444
445                // We only need to check header equality, because the block data is bound to the
446                // header.
447                if has_changed
448                    && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
449                {
450                    Err(SolverCancelled)
451                } else {
452                    Ok(())
453                }
454            }
455            // If the sender was dropped, we're likely shutting down, so cancel the solver.
456            Err(_sender_dropped) => Err(SolverCancelled),
457        };
458
459        // Mine at least one block using the equihash solver.
460        let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
461            // If the solver was cancelled, we're either shutting down, or we have a new template.
462            if solver_id == 0 {
463                info!(
464                    ?height,
465                    ?solver_id,
466                    new_template = ?template_receiver.has_changed(),
467                    shutting_down = ?is_shutting_down(),
468                    "solver cancelled: getting a new block template or shutting down"
469                );
470            } else {
471                debug!(
472                    ?height,
473                    ?solver_id,
474                    new_template = ?template_receiver.has_changed(),
475                    shutting_down = ?is_shutting_down(),
476                    "solver cancelled: getting a new block template or shutting down"
477                );
478            }
479
480            // If the blockchain is changing rapidly, limit how often we'll update the template.
481            // But if we're shutting down, do that immediately.
482            if template_receiver.has_changed().is_ok() && !is_shutting_down() {
483                sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
484            }
485
486            continue;
487        };
488
489        // Submit the newly mined blocks to the verifiers.
490        //
491        // TODO: if there is a new template (`cancel_fn().is_err()`), and
492        //       GetBlockTemplate.submit_old is false, return immediately, and skip submitting the
493        //       blocks.
494        let mut any_success = false;
495        for block in blocks {
496            let data = block
497                .zcash_serialize_to_vec()
498                .expect("serializing to Vec never fails");
499
500            match rpc.submit_block(HexData(data), None).await {
501                Ok(success) => {
502                    info!(
503                        ?height,
504                        hash = ?block.hash(),
505                        ?solver_id,
506                        ?success,
507                        "successfully mined a new block",
508                    );
509                    any_success = true;
510                }
511                Err(error) => info!(
512                    ?height,
513                    hash = ?block.hash(),
514                    ?solver_id,
515                    ?error,
516                    "validating a newly mined block failed, trying again",
517                ),
518            }
519        }
520
521        // Start re-mining quickly after a failed solution.
522        // If there's a new template, we'll use it, otherwise the existing one is ok.
523        if !any_success {
524            // If the blockchain is changing rapidly, limit how often we'll update the template.
525            // But if we're shutting down, do that immediately.
526            if template_receiver.has_changed().is_ok() && !is_shutting_down() {
527                sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
528            }
529            continue;
530        }
531
532        // Wait for the new block to verify, and the RPC task to pick up a new template.
533        // But don't wait too long, we could have mined on a fork.
534        tokio::select! {
535            shutdown_result = template_receiver.changed() => shutdown_result?,
536            _ = sleep(BLOCK_MINING_WAIT_TIME) => {}
537
538        }
539    }
540
541    Ok(())
542}
543
544/// Mines one or more blocks based on `template`. Calculates equihash solutions, checks difficulty,
545/// and returns as soon as it has at least one block. Uses a different nonce range for each
546/// `solver_id`.
547///
548/// If `cancel_fn()` returns an error, returns early with `Err(SolverCancelled)`.
549///
550/// See [`run_mining_solver()`] for more details.
551pub async fn mine_a_block<F>(
552    solver_id: u8,
553    template: Arc<Block>,
554    cancel_fn: F,
555) -> Result<AtLeastOne<Block>, SolverCancelled>
556where
557    F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
558{
559    // TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
560    // https://github.com/rust-lang/rust/issues/93610
561    let mut header = *template.header;
562
563    // Use a different nonce for each solver thread.
564    // Change both the first and last bytes, so we don't have to care if the nonces are incremented in
565    // big-endian or little-endian order. And we can see the thread that mined a block from the nonce.
566    *header.nonce.first_mut().unwrap() = solver_id;
567    *header.nonce.last_mut().unwrap() = solver_id;
568
569    // Mine one or more blocks using the solver, in a low-priority blocking thread.
570    let span = Span::current();
571    let solved_headers =
572        tokio::task::spawn_blocking(move || span.in_scope(move || {
573            let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
574                if let Err(error) = priority_result {
575                    info!(?error, "could not set miner to run at a low priority: running at default priority");
576                }
577
578                Solution::solve(header, cancel_fn)
579            }).expect("unable to spawn miner thread");
580
581            miner_thread_handle.wait_for_panics()
582        }))
583        .wait_for_panics()
584        .await?;
585
586    // Modify the template into solved blocks.
587
588    // TODO: Replace with Arc::unwrap_or_clone() when it stabilises
589    let block = (*template).clone();
590
591    let solved_blocks: Vec<Block> = solved_headers
592        .into_iter()
593        .map(|header| {
594            let mut block = block.clone();
595            block.header = Arc::new(header);
596            block
597        })
598        .collect();
599
600    Ok(solved_blocks
601        .try_into()
602        .expect("a 1:1 mapping of AtLeastOne produces at least one block"))
603}