zebrad/components/
miner.rs

1//! Internal mining in Zebra.
2//!
3//! # TODO
4//! - pause mining if we have no peers, like `zcashd` does,
5//!   and add a developer config that mines regardless of how many peers we have.
6//!   <https://github.com/zcash/zcash/blob/6fdd9f1b81d3b228326c9826fa10696fc516444b/src/miner.cpp#L865-L880>
7//! - move common code into zebra-chain or zebra-node-services and remove the RPC dependency.
8
9use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
10
11use color_eyre::Report;
12use futures::{stream::FuturesUnordered, StreamExt};
13use thread_priority::{ThreadBuilder, ThreadPriority};
14use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
15use tower::Service;
16use tracing::{Instrument, Span};
17
18use zebra_chain::{
19    block::{self, Block, Height},
20    chain_sync_status::ChainSyncStatus,
21    chain_tip::ChainTip,
22    diagnostic::task::WaitForPanics,
23    parameters::{Network, NetworkUpgrade},
24    serialization::{AtLeastOne, ZcashSerialize},
25    shutdown::is_shutting_down,
26    work::equihash::{Solution, SolverCancelled},
27};
28use zebra_network::AddressBookPeers;
29use zebra_node_services::mempool;
30use zebra_rpc::{
31    config::mining::Config,
32    methods::{
33        hex_data::HexData,
34        types::get_block_template::{
35            self,
36            parameters::GetBlockTemplateCapability::{CoinbaseTxn, LongPoll},
37            proposal::proposal_block_from_template,
38            GetBlockTemplateRequestMode::Template,
39            TimeSource,
40        },
41        RpcImpl, RpcServer,
42    },
43};
44use zebra_state::WatchReceiver;
45
46/// The amount of time we wait between block template retries.
47pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
48
49/// A rate-limit for block template refreshes.
50pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
51
52/// How long we wait after mining a block, before expecting a new template.
53///
54/// This should be slightly longer than `BLOCK_TEMPLATE_REFRESH_LIMIT` to allow for template
55/// generation.
56pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
57
58/// Initialize the miner based on its config, and spawn a task for it.
59///
60/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured
61/// mining thread.
62///
63/// See [`run_mining_solver()`] for more details.
64pub fn spawn_init<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>(
65    network: &Network,
66    config: &Config,
67    rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
68) -> JoinHandle<Result<(), Report>>
69// TODO: simplify or avoid repeating these generics (how?)
70where
71    Mempool: Service<
72            mempool::Request,
73            Response = mempool::Response,
74            Error = zebra_node_services::BoxError,
75        > + Clone
76        + Send
77        + Sync
78        + 'static,
79    Mempool::Future: Send,
80    State: Service<
81            zebra_state::ReadRequest,
82            Response = zebra_state::ReadResponse,
83            Error = zebra_state::BoxError,
84        > + Clone
85        + Send
86        + Sync
87        + 'static,
88    <State as Service<zebra_state::ReadRequest>>::Future: Send,
89    Tip: ChainTip + Clone + Send + Sync + 'static,
90    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
91        + Clone
92        + Send
93        + Sync
94        + 'static,
95    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
96    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
97    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
98{
99    let network = network.clone();
100    let config = config.clone();
101
102    // TODO: spawn an entirely new executor here, so mining is isolated from higher priority tasks.
103    tokio::spawn(init(network, config, rpc).in_current_span())
104}
105
106/// Initialize the miner based on its config.
107///
108/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured
109/// mining thread.
110///
111/// See [`run_mining_solver()`] for more details.
112pub async fn init<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
113    network: Network,
114    _config: Config,
115    rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
116) -> Result<(), Report>
117where
118    Mempool: Service<
119            mempool::Request,
120            Response = mempool::Response,
121            Error = zebra_node_services::BoxError,
122        > + Clone
123        + Send
124        + Sync
125        + 'static,
126    Mempool::Future: Send,
127    State: Service<
128            zebra_state::ReadRequest,
129            Response = zebra_state::ReadResponse,
130            Error = zebra_state::BoxError,
131        > + Clone
132        + Send
133        + Sync
134        + 'static,
135    <State as Service<zebra_state::ReadRequest>>::Future: Send,
136    Tip: ChainTip + Clone + Send + Sync + 'static,
137    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
138        + Clone
139        + Send
140        + Sync
141        + 'static,
142    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
143    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
144    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
145{
146    // TODO: change this to `config.internal_miner_threads` when internal miner feature is added back.
147    //       https://github.com/ZcashFoundation/zebra/issues/8183
148    let configured_threads = 1;
149    // If we can't detect the number of cores, use the configured number.
150    let available_threads = available_parallelism()
151        .map(usize::from)
152        .unwrap_or(configured_threads);
153
154    // Use the minimum of the configured and available threads.
155    let solver_count = min(configured_threads, available_threads);
156
157    info!(
158        ?solver_count,
159        "launching mining tasks with parallel solvers"
160    );
161
162    let (template_sender, template_receiver) = watch::channel(None);
163    let template_receiver = WatchReceiver::new(template_receiver);
164
165    // Spawn these tasks, to avoid blocked cooperative futures, and improve shutdown responsiveness.
166    // This is particularly important when there are a large number of solver threads.
167    let mut abort_handles = Vec::new();
168
169    let template_generator = tokio::task::spawn(
170        generate_block_templates(network, rpc.clone(), template_sender).in_current_span(),
171    );
172    abort_handles.push(template_generator.abort_handle());
173    let template_generator = template_generator.wait_for_panics();
174
175    let mut mining_solvers = FuturesUnordered::new();
176    for solver_id in 0..solver_count {
177        // Assume there are less than 256 cores. If there are more, only run 256 tasks.
178        let solver_id = min(solver_id, usize::from(u8::MAX))
179            .try_into()
180            .expect("just limited to u8::MAX");
181
182        let solver = tokio::task::spawn(
183            run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
184        );
185        abort_handles.push(solver.abort_handle());
186
187        mining_solvers.push(solver.wait_for_panics());
188    }
189
190    // These tasks run forever unless there is a fatal error or shutdown.
191    // When that happens, the first task to error returns, and the other JoinHandle futures are
192    // cancelled.
193    let first_result;
194    select! {
195        result = template_generator => { first_result = result; }
196        result = mining_solvers.next() => {
197            first_result = result
198                .expect("stream never terminates because there is at least one solver task");
199        }
200    }
201
202    // But the spawned async tasks keep running, so we need to abort them here.
203    for abort_handle in abort_handles {
204        abort_handle.abort();
205    }
206
207    // Any spawned blocking threads will keep running. When this task returns and drops the
208    // `template_sender`, it cancels all the spawned miner threads. This works because we've
209    // aborted the `template_generator` task, which owns the `template_sender`. (And it doesn't
210    // spawn any blocking threads.)
211    first_result
212}
213
214/// Generates block templates using `rpc`, and sends them to mining threads using `template_sender`.
215#[instrument(skip(rpc, template_sender, network))]
216pub async fn generate_block_templates<
217    Mempool,
218    State,
219    Tip,
220    BlockVerifierRouter,
221    SyncStatus,
222    AddressBook,
223>(
224    network: Network,
225    rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
226    template_sender: watch::Sender<Option<Arc<Block>>>,
227) -> Result<(), Report>
228where
229    Mempool: Service<
230            mempool::Request,
231            Response = mempool::Response,
232            Error = zebra_node_services::BoxError,
233        > + Clone
234        + Send
235        + Sync
236        + 'static,
237    Mempool::Future: Send,
238    State: Service<
239            zebra_state::ReadRequest,
240            Response = zebra_state::ReadResponse,
241            Error = zebra_state::BoxError,
242        > + Clone
243        + Send
244        + Sync
245        + 'static,
246    <State as Service<zebra_state::ReadRequest>>::Future: Send,
247    Tip: ChainTip + Clone + Send + Sync + 'static,
248    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
249        + Clone
250        + Send
251        + Sync
252        + 'static,
253    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
254    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
255    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
256{
257    // Pass the correct arguments, even if Zebra currently ignores them.
258    let mut parameters = get_block_template::JsonParameters {
259        mode: Template,
260        data: None,
261        capabilities: vec![LongPoll, CoinbaseTxn],
262        long_poll_id: None,
263        _work_id: None,
264    };
265
266    // Shut down the task when all the template receivers are dropped, or Zebra shuts down.
267    while !template_sender.is_closed() && !is_shutting_down() {
268        let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
269
270        // Wait for the chain to sync so we get a valid template.
271        let Ok(template) = template else {
272            warn!(
273                ?BLOCK_TEMPLATE_WAIT_TIME,
274                ?template,
275                "waiting for a valid block template",
276            );
277
278            // Skip the wait if we got an error because we are shutting down.
279            if !is_shutting_down() {
280                sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
281            }
282
283            continue;
284        };
285
286        // Convert from RPC GetBlockTemplate to Block
287        let template = template
288            .try_into_template()
289            .expect("invalid RPC response: proposal in response to a template request");
290
291        info!(
292            height = ?template.height,
293            transactions = ?template.transactions.len(),
294            "mining with an updated block template",
295        );
296
297        // Tell the next get_block_template() call to wait until the template has changed.
298        parameters.long_poll_id = Some(template.long_poll_id);
299
300        let block = proposal_block_from_template(
301            &template,
302            TimeSource::CurTime,
303            NetworkUpgrade::current(&network, Height(template.height)),
304        )
305        .expect("unexpected invalid block template");
306
307        // If the template has actually changed, send an updated template.
308        template_sender.send_if_modified(|old_block| {
309            if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
310                return false;
311            }
312            *old_block = Some(Arc::new(block));
313            true
314        });
315
316        // If the blockchain is changing rapidly, limit how often we'll update the template.
317        // But if we're shutting down, do that immediately.
318        if !template_sender.is_closed() && !is_shutting_down() {
319            sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
320        }
321    }
322
323    Ok(())
324}
325
326/// Runs a single mining thread that gets blocks from the `template_receiver`, calculates equihash
327/// solutions with nonces based on `solver_id`, and submits valid blocks to Zebra's block validator.
328///
329/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core while running.
330/// It can run for minutes or hours if the network difficulty is high. Mining uses a thread with
331/// low CPU priority.
332#[instrument(skip(template_receiver, rpc))]
333pub async fn run_mining_solver<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
334    solver_id: u8,
335    mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
336    rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
337) -> Result<(), Report>
338where
339    Mempool: Service<
340            mempool::Request,
341            Response = mempool::Response,
342            Error = zebra_node_services::BoxError,
343        > + Clone
344        + Send
345        + Sync
346        + 'static,
347    Mempool::Future: Send,
348    State: Service<
349            zebra_state::ReadRequest,
350            Response = zebra_state::ReadResponse,
351            Error = zebra_state::BoxError,
352        > + Clone
353        + Send
354        + Sync
355        + 'static,
356    <State as Service<zebra_state::ReadRequest>>::Future: Send,
357    Tip: ChainTip + Clone + Send + Sync + 'static,
358    BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
359        + Clone
360        + Send
361        + Sync
362        + 'static,
363    <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
364    SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
365    AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
366{
367    // Shut down the task when the template sender is dropped, or Zebra shuts down.
368    while template_receiver.has_changed().is_ok() && !is_shutting_down() {
369        // Get the latest block template, and mark the current value as seen.
370        // We mark the value first to avoid missed updates.
371        template_receiver.mark_as_seen();
372        let template = template_receiver.cloned_watch_data();
373
374        let Some(template) = template else {
375            if solver_id == 0 {
376                info!(
377                    ?solver_id,
378                    ?BLOCK_TEMPLATE_WAIT_TIME,
379                    "solver waiting for initial block template"
380                );
381            } else {
382                debug!(
383                    ?solver_id,
384                    ?BLOCK_TEMPLATE_WAIT_TIME,
385                    "solver waiting for initial block template"
386                );
387            }
388
389            // Skip the wait if we didn't get a template because we are shutting down.
390            if !is_shutting_down() {
391                sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
392            }
393
394            continue;
395        };
396
397        let height = template.coinbase_height().expect("template is valid");
398
399        // Set up the cancellation conditions for the miner.
400        let mut cancel_receiver = template_receiver.clone();
401        let old_header = *template.header;
402        let cancel_fn = move || match cancel_receiver.has_changed() {
403            // Guard against get_block_template() providing an identical header. This could happen
404            // if something irrelevant to the block data changes, the time was within 1 second, or
405            // there is a spurious channel change.
406            Ok(has_changed) => {
407                cancel_receiver.mark_as_seen();
408
409                // We only need to check header equality, because the block data is bound to the
410                // header.
411                if has_changed
412                    && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
413                {
414                    Err(SolverCancelled)
415                } else {
416                    Ok(())
417                }
418            }
419            // If the sender was dropped, we're likely shutting down, so cancel the solver.
420            Err(_sender_dropped) => Err(SolverCancelled),
421        };
422
423        // Mine at least one block using the equihash solver.
424        let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
425            // If the solver was cancelled, we're either shutting down, or we have a new template.
426            if solver_id == 0 {
427                info!(
428                    ?height,
429                    ?solver_id,
430                    new_template = ?template_receiver.has_changed(),
431                    shutting_down = ?is_shutting_down(),
432                    "solver cancelled: getting a new block template or shutting down"
433                );
434            } else {
435                debug!(
436                    ?height,
437                    ?solver_id,
438                    new_template = ?template_receiver.has_changed(),
439                    shutting_down = ?is_shutting_down(),
440                    "solver cancelled: getting a new block template or shutting down"
441                );
442            }
443
444            // If the blockchain is changing rapidly, limit how often we'll update the template.
445            // But if we're shutting down, do that immediately.
446            if template_receiver.has_changed().is_ok() && !is_shutting_down() {
447                sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
448            }
449
450            continue;
451        };
452
453        // Submit the newly mined blocks to the verifiers.
454        //
455        // TODO: if there is a new template (`cancel_fn().is_err()`), and
456        //       GetBlockTemplate.submit_old is false, return immediately, and skip submitting the
457        //       blocks.
458        let mut any_success = false;
459        for block in blocks {
460            let data = block
461                .zcash_serialize_to_vec()
462                .expect("serializing to Vec never fails");
463
464            match rpc.submit_block(HexData(data), None).await {
465                Ok(success) => {
466                    info!(
467                        ?height,
468                        hash = ?block.hash(),
469                        ?solver_id,
470                        ?success,
471                        "successfully mined a new block",
472                    );
473                    any_success = true;
474                }
475                Err(error) => info!(
476                    ?height,
477                    hash = ?block.hash(),
478                    ?solver_id,
479                    ?error,
480                    "validating a newly mined block failed, trying again",
481                ),
482            }
483        }
484
485        // Start re-mining quickly after a failed solution.
486        // If there's a new template, we'll use it, otherwise the existing one is ok.
487        if !any_success {
488            // If the blockchain is changing rapidly, limit how often we'll update the template.
489            // But if we're shutting down, do that immediately.
490            if template_receiver.has_changed().is_ok() && !is_shutting_down() {
491                sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
492            }
493            continue;
494        }
495
496        // Wait for the new block to verify, and the RPC task to pick up a new template.
497        // But don't wait too long, we could have mined on a fork.
498        tokio::select! {
499            shutdown_result = template_receiver.changed() => shutdown_result?,
500            _ = sleep(BLOCK_MINING_WAIT_TIME) => {}
501
502        }
503    }
504
505    Ok(())
506}
507
508/// Mines one or more blocks based on `template`. Calculates equihash solutions, checks difficulty,
509/// and returns as soon as it has at least one block. Uses a different nonce range for each
510/// `solver_id`.
511///
512/// If `cancel_fn()` returns an error, returns early with `Err(SolverCancelled)`.
513///
514/// See [`run_mining_solver()`] for more details.
515pub async fn mine_a_block<F>(
516    solver_id: u8,
517    template: Arc<Block>,
518    cancel_fn: F,
519) -> Result<AtLeastOne<Block>, SolverCancelled>
520where
521    F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
522{
523    // TODO: Replace with Arc::unwrap_or_clone() when it stabilises:
524    // https://github.com/rust-lang/rust/issues/93610
525    let mut header = *template.header;
526
527    // Use a different nonce for each solver thread.
528    // Change both the first and last bytes, so we don't have to care if the nonces are incremented in
529    // big-endian or little-endian order. And we can see the thread that mined a block from the nonce.
530    *header.nonce.first_mut().unwrap() = solver_id;
531    *header.nonce.last_mut().unwrap() = solver_id;
532
533    // Mine one or more blocks using the solver, in a low-priority blocking thread.
534    let span = Span::current();
535    let solved_headers =
536        tokio::task::spawn_blocking(move || span.in_scope(move || {
537            let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
538                if let Err(error) = priority_result {
539                    info!(?error, "could not set miner to run at a low priority: running at default priority");
540                }
541
542                Solution::solve(header, cancel_fn)
543            }).expect("unable to spawn miner thread");
544
545            miner_thread_handle.wait_for_panics()
546        }))
547        .wait_for_panics()
548        .await?;
549
550    // Modify the template into solved blocks.
551
552    // TODO: Replace with Arc::unwrap_or_clone() when it stabilises
553    let block = (*template).clone();
554
555    let solved_blocks: Vec<Block> = solved_headers
556        .into_iter()
557        .map(|header| {
558            let mut block = block.clone();
559            block.header = Arc::new(header);
560            block
561        })
562        .collect();
563
564    Ok(solved_blocks
565        .try_into()
566        .expect("a 1:1 mapping of AtLeastOne produces at least one block"))
567}