zebrad/components/
miner.rs

1//! Internal mining in Zebra.
2//!
3//! # TODO
4//! - pause mining if we have no peers, like `zcashd` does,
5//!   and add a developer config that mines regardless of how many peers we have.
6//!   <https://github.com/zcash/zcash/blob/6fdd9f1b81d3b228326c9826fa10696fc516444b/src/miner.cpp#L865-L880>
7//! - move common code into zebra-chain or zebra-node-services and remove the RPC dependency.
8
9use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
10
11use color_eyre::Report;
12use futures::{stream::FuturesUnordered, StreamExt};
13use thread_priority::{ThreadBuilder, ThreadPriority};
14use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
15use tower::Service;
16use tracing::{Instrument, Span};
17
18use zebra_chain::{
19    block::{self, Block, Height},
20    chain_sync_status::ChainSyncStatus,
21    chain_tip::ChainTip,
22    diagnostic::task::WaitForPanics,
23    parameters::{Network, NetworkUpgrade},
24    serialization::{AtLeastOne, ZcashSerialize},
25    shutdown::is_shutting_down,
26    work::equihash::{Solution, SolverCancelled},
27};
28use zebra_network::AddressBookPeers;
29use zebra_node_services::mempool;
30use zebra_rpc::{
31    config::mining::Config,
32    methods::{
33        get_block_template_rpcs::get_block_template::{
34            self, proposal::TimeSource, proposal_block_from_template,
35            GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*,
36        },
37        hex_data::HexData,
38        GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer,
39    },
40};
41use zebra_state::WatchReceiver;
42
43/// The amount of time we wait between block template retries.
44pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
45
46/// A rate-limit for block template refreshes.
47pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
48
49/// How long we wait after mining a block, before expecting a new template.
50///
51/// This should be slightly longer than `BLOCK_TEMPLATE_REFRESH_LIMIT` to allow for template
52/// generation.
53pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
54
55/// Initialize the miner based on its config, and spawn a task for it.
56///
57/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured
58/// mining thread.
59///
60/// See [`run_mining_solver()`] for more details.
61pub fn spawn_init<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
62    network: &Network,
63    config: &Config,
64    rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
65) -> JoinHandle<Result<(), Report>>
66// TODO: simplify or avoid repeating these generics (how?)
67where
68    Mempool: Service<
69            mempool::Request,
70            Response = mempool::Response,
71            Error = zebra_node_services::BoxError,
72        > + Clone
73        + Send
74        + Sync
75        + 'static,
76    Mempool::Future: Send,
77    State: Service<
78            zebra_state::ReadRequest,
79            Response = zebra_state::ReadResponse,
80            Error = zebra_state::BoxError,
81        > + Clone
82        + Send
83        + Sync
84        + 'static,
85    <State as Service<zebra_state::ReadRequest>>::Future: Send,
86    Tip: ChainTip + Clone + Send + Sync + 'static,
87    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
88        + Clone
89        + Send
90        + Sync
91        + 'static,
92    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
93    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
94    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
95{
96    let network = network.clone();
97    let config = config.clone();
98
99    // TODO: spawn an entirely new executor here, so mining is isolated from higher priority tasks.
100    tokio::spawn(init(network, config, rpc).in_current_span())
101}
102
103/// Initialize the miner based on its config.
104///
105/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured
106/// mining thread.
107///
108/// See [`run_mining_solver()`] for more details.
109pub async fn init<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
110    network: Network,
111    _config: Config,
112    rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
113) -> Result<(), Report>
114where
115    Mempool: Service<
116            mempool::Request,
117            Response = mempool::Response,
118            Error = zebra_node_services::BoxError,
119        > + Clone
120        + Send
121        + Sync
122        + 'static,
123    Mempool::Future: Send,
124    State: Service<
125            zebra_state::ReadRequest,
126            Response = zebra_state::ReadResponse,
127            Error = zebra_state::BoxError,
128        > + Clone
129        + Send
130        + Sync
131        + 'static,
132    <State as Service<zebra_state::ReadRequest>>::Future: Send,
133    Tip: ChainTip + Clone + Send + Sync + 'static,
134    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
135        + Clone
136        + Send
137        + Sync
138        + 'static,
139    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
140    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
141    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
142{
143    // TODO: change this to `config.internal_miner_threads` when internal miner feature is added back.
144    //       https://github.com/ZcashFoundation/zebra/issues/8183
145    let configured_threads = 1;
146    // If we can't detect the number of cores, use the configured number.
147    let available_threads = available_parallelism()
148        .map(usize::from)
149        .unwrap_or(configured_threads);
150
151    // Use the minimum of the configured and available threads.
152    let solver_count = min(configured_threads, available_threads);
153
154    info!(
155        ?solver_count,
156        "launching mining tasks with parallel solvers"
157    );
158
159    let (template_sender, template_receiver) = watch::channel(None);
160    let template_receiver = WatchReceiver::new(template_receiver);
161
162    // Spawn these tasks, to avoid blocked cooperative futures, and improve shutdown responsiveness.
163    // This is particularly important when there are a large number of solver threads.
164    let mut abort_handles = Vec::new();
165
166    let template_generator = tokio::task::spawn(
167        generate_block_templates(network, rpc.clone(), template_sender).in_current_span(),
168    );
169    abort_handles.push(template_generator.abort_handle());
170    let template_generator = template_generator.wait_for_panics();
171
172    let mut mining_solvers = FuturesUnordered::new();
173    for solver_id in 0..solver_count {
174        // Assume there are less than 256 cores. If there are more, only run 256 tasks.
175        let solver_id = min(solver_id, usize::from(u8::MAX))
176            .try_into()
177            .expect("just limited to u8::MAX");
178
179        let solver = tokio::task::spawn(
180            run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
181        );
182        abort_handles.push(solver.abort_handle());
183
184        mining_solvers.push(solver.wait_for_panics());
185    }
186
187    // These tasks run forever unless there is a fatal error or shutdown.
188    // When that happens, the first task to error returns, and the other JoinHandle futures are
189    // cancelled.
190    let first_result;
191    select! {
192        result = template_generator => { first_result = result; }
193        result = mining_solvers.next() => {
194            first_result = result
195                .expect("stream never terminates because there is at least one solver task");
196        }
197    }
198
199    // But the spawned async tasks keep running, so we need to abort them here.
200    for abort_handle in abort_handles {
201        abort_handle.abort();
202    }
203
204    // Any spawned blocking threads will keep running. When this task returns and drops the
205    // `template_sender`, it cancels all the spawned miner threads. This works because we've
206    // aborted the `template_generator` task, which owns the `template_sender`. (And it doesn't
207    // spawn any blocking threads.)
208    first_result
209}
210
211/// Generates block templates using `rpc`, and sends them to mining threads using `template_sender`.
212#[instrument(skip(rpc, template_sender))]
213pub async fn generate_block_templates<
214    Mempool,
215    State,
216    Tip,
217    BlockVerifierRouter,
218    SyncStatus,
219    AddressBook,
220>(
221    network: Network,
222    rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
223    template_sender: watch::Sender<Option<Arc<Block>>>,
224) -> Result<(), Report>
225where
226    Mempool: Service<
227            mempool::Request,
228            Response = mempool::Response,
229            Error = zebra_node_services::BoxError,
230        > + Clone
231        + Send
232        + Sync
233        + 'static,
234    Mempool::Future: Send,
235    State: Service<
236            zebra_state::ReadRequest,
237            Response = zebra_state::ReadResponse,
238            Error = zebra_state::BoxError,
239        > + Clone
240        + Send
241        + Sync
242        + 'static,
243    <State as Service<zebra_state::ReadRequest>>::Future: Send,
244    Tip: ChainTip + Clone + Send + Sync + 'static,
245    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
246        + Clone
247        + Send
248        + Sync
249        + 'static,
250    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
251    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
252    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
253{
254    // Pass the correct arguments, even if Zebra currently ignores them.
255    let mut parameters = get_block_template::JsonParameters {
256        mode: Template,
257        data: None,
258        capabilities: vec![LongPoll, CoinbaseTxn],
259        long_poll_id: None,
260        _work_id: None,
261    };
262
263    // Shut down the task when all the template receivers are dropped, or Zebra shuts down.
264    while !template_sender.is_closed() && !is_shutting_down() {
265        let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
266
267        // Wait for the chain to sync so we get a valid template.
268        let Ok(template) = template else {
269            info!(
270                ?BLOCK_TEMPLATE_WAIT_TIME,
271                "waiting for a valid block template",
272            );
273
274            // Skip the wait if we got an error because we are shutting down.
275            if !is_shutting_down() {
276                sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
277            }
278
279            continue;
280        };
281
282        // Convert from RPC GetBlockTemplate to Block
283        let template = template
284            .try_into_template()
285            .expect("invalid RPC response: proposal in response to a template request");
286
287        info!(
288            height = ?template.height,
289            transactions = ?template.transactions.len(),
290            "mining with an updated block template",
291        );
292
293        // Tell the next get_block_template() call to wait until the template has changed.
294        parameters.long_poll_id = Some(template.long_poll_id);
295
296        let block = proposal_block_from_template(
297            &template,
298            TimeSource::CurTime,
299            NetworkUpgrade::current(&network, Height(template.height)),
300        )
301        .expect("unexpected invalid block template");
302
303        // If the template has actually changed, send an updated template.
304        template_sender.send_if_modified(|old_block| {
305            if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
306                return false;
307            }
308            *old_block = Some(Arc::new(block));
309            true
310        });
311
312        // If the blockchain is changing rapidly, limit how often we'll update the template.
313        // But if we're shutting down, do that immediately.
314        if !template_sender.is_closed() && !is_shutting_down() {
315            sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
316        }
317    }
318
319    Ok(())
320}
321
322/// Runs a single mining thread that gets blocks from the `template_receiver`, calculates equihash
323/// solutions with nonces based on `solver_id`, and submits valid blocks to Zebra's block validator.
324///
325/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core while running.
326/// It can run for minutes or hours if the network difficulty is high. Mining uses a thread with
327/// low CPU priority.
328#[instrument(skip(template_receiver, rpc))]
329pub async fn run_mining_solver<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
330    solver_id: u8,
331    mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
332    rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
333) -> Result<(), Report>
334where
335    Mempool: Service<
336            mempool::Request,
337            Response = mempool::Response,
338            Error = zebra_node_services::BoxError,
339        > + Clone
340        + Send
341        + Sync
342        + 'static,
343    Mempool::Future: Send,
344    State: Service<
345            zebra_state::ReadRequest,
346            Response = zebra_state::ReadResponse,
347            Error = zebra_state::BoxError,
348        > + Clone
349        + Send
350        + Sync
351        + 'static,
352    <State as Service<zebra_state::ReadRequest>>::Future: Send,
353    Tip: ChainTip + Clone + Send + Sync + 'static,
354    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
355        + Clone
356        + Send
357        + Sync
358        + 'static,
359    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
360    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
361    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
362{
363    // Shut down the task when the template sender is dropped, or Zebra shuts down.
364    while template_receiver.has_changed().is_ok() && !is_shutting_down() {
365        // Get the latest block template, and mark the current value as seen.
366        // We mark the value first to avoid missed updates.
367        template_receiver.mark_as_seen();
368        let template = template_receiver.cloned_watch_data();
369
370        let Some(template) = template else {
371            if solver_id == 0 {
372                info!(
373                    ?solver_id,
374                    ?BLOCK_TEMPLATE_WAIT_TIME,
375                    "solver waiting for initial block template"
376                );
377            } else {
378                debug!(
379                    ?solver_id,
380                    ?BLOCK_TEMPLATE_WAIT_TIME,
381                    "solver waiting for initial block template"
382                );
383            }
384
385            // Skip the wait if we didn't get a template because we are shutting down.
386            if !is_shutting_down() {
387                sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
388            }
389
390            continue;
391        };
392
393        let height = template.coinbase_height().expect("template is valid");
394
395        // Set up the cancellation conditions for the miner.
396        let mut cancel_receiver = template_receiver.clone();
397        let old_header = *template.header;
398        let cancel_fn = move || match cancel_receiver.has_changed() {
399            // Guard against get_block_template() providing an identical header. This could happen
400            // if something irrelevant to the block data changes, the time was within 1 second, or
401            // there is a spurious channel change.
402            Ok(has_changed) => {
403                cancel_receiver.mark_as_seen();
404
405                // We only need to check header equality, because the block data is bound to the
406                // header.
407                if has_changed
408                    && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
409                {
410                    Err(SolverCancelled)
411                } else {
412                    Ok(())
413                }
414            }
415            // If the sender was dropped, we're likely shutting down, so cancel the solver.
416            Err(_sender_dropped) => Err(SolverCancelled),
417        };
418
419        // Mine at least one block using the equihash solver.
420        let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
421            // If the solver was cancelled, we're either shutting down, or we have a new template.
422            if solver_id == 0 {
423                info!(
424                    ?height,
425                    ?solver_id,
426                    new_template = ?template_receiver.has_changed(),
427                    shutting_down = ?is_shutting_down(),
428                    "solver cancelled: getting a new block template or shutting down"
429                );
430            } else {
431                debug!(
432                    ?height,
433                    ?solver_id,
434                    new_template = ?template_receiver.has_changed(),
435                    shutting_down = ?is_shutting_down(),
436                    "solver cancelled: getting a new block template or shutting down"
437                );
438            }
439
440            // If the blockchain is changing rapidly, limit how often we'll update the template.
441            // But if we're shutting down, do that immediately.
442            if template_receiver.has_changed().is_ok() && !is_shutting_down() {
443                sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
444            }
445
446            continue;
447        };
448
449        // Submit the newly mined blocks to the verifiers.
450        //
451        // TODO: if there is a new template (`cancel_fn().is_err()`), and
452        //       GetBlockTemplate.submit_old is false, return immediately, and skip submitting the
453        //       blocks.
454        let mut any_success = false;
455        for block in blocks {
456            let data = block
457                .zcash_serialize_to_vec()
458                .expect("serializing to Vec never fails");
459
460            match rpc.submit_block(HexData(data), None).await {
461                Ok(success) => {
462                    info!(
463                        ?height,
464                        hash = ?block.hash(),
465                        ?solver_id,
466                        ?success,
467                        "successfully mined a new block",
468                    );
469                    any_success = true;
470                }
471                Err(error) => info!(
472                    ?height,
473                    hash = ?block.hash(),
474                    ?solver_id,
475                    ?error,
476                    "validating a newly mined block failed, trying again",
477                ),
478            }
479        }
480
481        // Start re-mining quickly after a failed solution.
482        // If there's a new template, we'll use it, otherwise the existing one is ok.
483        if !any_success {
484            // If the blockchain is changing rapidly, limit how often we'll update the template.
485            // But if we're shutting down, do that immediately.
486            if template_receiver.has_changed().is_ok() && !is_shutting_down() {
487                sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
488            }
489            continue;
490        }
491
492        // Wait for the new block to verify, and the RPC task to pick up a new template.
493        // But don't wait too long, we could have mined on a fork.
494        tokio::select! {
495            shutdown_result = template_receiver.changed() => shutdown_result?,
496            _ = sleep(BLOCK_MINING_WAIT_TIME) => {}
497
498        }
499    }
500
501    Ok(())
502}
503
504/// Mines one or more blocks based on `template`. Calculates equihash solutions, checks difficulty,
505/// and returns as soon as it has at least one block. Uses a different nonce range for each
506/// `solver_id`.
507///
508/// If `cancel_fn()` returns an error, returns early with `Err(SolverCancelled)`.
509///
510/// See [`run_mining_solver()`] for more details.
511pub async fn mine_a_block<F>(
512    solver_id: u8,
513    template: Arc<Block>,
514    cancel_fn: F,
515) -> Result<AtLeastOne<Block>, SolverCancelled>
516where
517    F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
518{
519    // TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
520    // https://github.com/rust-lang/rust/issues/93610
521    let mut header = *template.header;
522
523    // Use a different nonce for each solver thread.
524    // Change both the first and last bytes, so we don't have to care if the nonces are incremented in
525    // big-endian or little-endian order. And we can see the thread that mined a block from the nonce.
526    *header.nonce.first_mut().unwrap() = solver_id;
527    *header.nonce.last_mut().unwrap() = solver_id;
528
529    // Mine one or more blocks using the solver, in a low-priority blocking thread.
530    let span = Span::current();
531    let solved_headers =
532        tokio::task::spawn_blocking(move || span.in_scope(move || {
533            let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
534                if let Err(error) = priority_result {
535                    info!(?error, "could not set miner to run at a low priority: running at default priority");
536                }
537
538                Solution::solve(header, cancel_fn)
539            }).expect("unable to spawn miner thread");
540
541            miner_thread_handle.wait_for_panics()
542        }))
543        .wait_for_panics()
544        .await?;
545
546    // Modify the template into solved blocks.
547
548    // TODO: Replace with Arc::unwrap_or_clone() when it stabilises
549    let block = (*template).clone();
550
551    let solved_blocks: Vec<Block> = solved_headers
552        .into_iter()
553        .map(|header| {
554            let mut block = block.clone();
555            block.header = Arc::new(header);
556            block
557        })
558        .collect();
559
560    Ok(solved_blocks
561        .try_into()
562        .expect("a 1:1 mapping of AtLeastOne produces at least one block"))
563}