1use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
10
11use color_eyre::Report;
12use futures::{stream::FuturesUnordered, StreamExt};
13use thread_priority::{ThreadBuilder, ThreadPriority};
14use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
15use tower::Service;
16use tracing::{Instrument, Span};
17
18use zebra_chain::{
19 block::{self, Block, Height},
20 chain_sync_status::ChainSyncStatus,
21 chain_tip::ChainTip,
22 diagnostic::task::WaitForPanics,
23 parameters::{Network, NetworkUpgrade},
24 serialization::{AtLeastOne, ZcashSerialize},
25 shutdown::is_shutting_down,
26 work::equihash::{Solution, SolverCancelled},
27};
28use zebra_network::AddressBookPeers;
29use zebra_node_services::mempool;
30use zebra_rpc::{
31 config::mining::Config,
32 methods::{
33 get_block_template_rpcs::get_block_template::{
34 self, proposal::TimeSource, proposal_block_from_template,
35 GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*,
36 },
37 hex_data::HexData,
38 GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer,
39 },
40};
41use zebra_state::WatchReceiver;
42
43pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
45
46pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
48
49pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
54
55pub fn spawn_init<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
62 network: &Network,
63 config: &Config,
64 rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
65) -> JoinHandle<Result<(), Report>>
66where
68 Mempool: Service<
69 mempool::Request,
70 Response = mempool::Response,
71 Error = zebra_node_services::BoxError,
72 > + Clone
73 + Send
74 + Sync
75 + 'static,
76 Mempool::Future: Send,
77 State: Service<
78 zebra_state::ReadRequest,
79 Response = zebra_state::ReadResponse,
80 Error = zebra_state::BoxError,
81 > + Clone
82 + Send
83 + Sync
84 + 'static,
85 <State as Service<zebra_state::ReadRequest>>::Future: Send,
86 Tip: ChainTip + Clone + Send + Sync + 'static,
87 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
88 + Clone
89 + Send
90 + Sync
91 + 'static,
92 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
93 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
94 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
95{
96 let network = network.clone();
97 let config = config.clone();
98
99 tokio::spawn(init(network, config, rpc).in_current_span())
101}
102
103pub async fn init<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
110 network: Network,
111 _config: Config,
112 rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
113) -> Result<(), Report>
114where
115 Mempool: Service<
116 mempool::Request,
117 Response = mempool::Response,
118 Error = zebra_node_services::BoxError,
119 > + Clone
120 + Send
121 + Sync
122 + 'static,
123 Mempool::Future: Send,
124 State: Service<
125 zebra_state::ReadRequest,
126 Response = zebra_state::ReadResponse,
127 Error = zebra_state::BoxError,
128 > + Clone
129 + Send
130 + Sync
131 + 'static,
132 <State as Service<zebra_state::ReadRequest>>::Future: Send,
133 Tip: ChainTip + Clone + Send + Sync + 'static,
134 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
135 + Clone
136 + Send
137 + Sync
138 + 'static,
139 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
140 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
141 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
142{
143 let configured_threads = 1;
146 let available_threads = available_parallelism()
148 .map(usize::from)
149 .unwrap_or(configured_threads);
150
151 let solver_count = min(configured_threads, available_threads);
153
154 info!(
155 ?solver_count,
156 "launching mining tasks with parallel solvers"
157 );
158
159 let (template_sender, template_receiver) = watch::channel(None);
160 let template_receiver = WatchReceiver::new(template_receiver);
161
162 let mut abort_handles = Vec::new();
165
166 let template_generator = tokio::task::spawn(
167 generate_block_templates(network, rpc.clone(), template_sender).in_current_span(),
168 );
169 abort_handles.push(template_generator.abort_handle());
170 let template_generator = template_generator.wait_for_panics();
171
172 let mut mining_solvers = FuturesUnordered::new();
173 for solver_id in 0..solver_count {
174 let solver_id = min(solver_id, usize::from(u8::MAX))
176 .try_into()
177 .expect("just limited to u8::MAX");
178
179 let solver = tokio::task::spawn(
180 run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
181 );
182 abort_handles.push(solver.abort_handle());
183
184 mining_solvers.push(solver.wait_for_panics());
185 }
186
187 let first_result;
191 select! {
192 result = template_generator => { first_result = result; }
193 result = mining_solvers.next() => {
194 first_result = result
195 .expect("stream never terminates because there is at least one solver task");
196 }
197 }
198
199 for abort_handle in abort_handles {
201 abort_handle.abort();
202 }
203
204 first_result
209}
210
211#[instrument(skip(rpc, template_sender))]
213pub async fn generate_block_templates<
214 Mempool,
215 State,
216 Tip,
217 BlockVerifierRouter,
218 SyncStatus,
219 AddressBook,
220>(
221 network: Network,
222 rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
223 template_sender: watch::Sender<Option<Arc<Block>>>,
224) -> Result<(), Report>
225where
226 Mempool: Service<
227 mempool::Request,
228 Response = mempool::Response,
229 Error = zebra_node_services::BoxError,
230 > + Clone
231 + Send
232 + Sync
233 + 'static,
234 Mempool::Future: Send,
235 State: Service<
236 zebra_state::ReadRequest,
237 Response = zebra_state::ReadResponse,
238 Error = zebra_state::BoxError,
239 > + Clone
240 + Send
241 + Sync
242 + 'static,
243 <State as Service<zebra_state::ReadRequest>>::Future: Send,
244 Tip: ChainTip + Clone + Send + Sync + 'static,
245 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
246 + Clone
247 + Send
248 + Sync
249 + 'static,
250 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
251 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
252 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
253{
254 let mut parameters = get_block_template::JsonParameters {
256 mode: Template,
257 data: None,
258 capabilities: vec![LongPoll, CoinbaseTxn],
259 long_poll_id: None,
260 _work_id: None,
261 };
262
263 while !template_sender.is_closed() && !is_shutting_down() {
265 let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
266
267 let Ok(template) = template else {
269 info!(
270 ?BLOCK_TEMPLATE_WAIT_TIME,
271 "waiting for a valid block template",
272 );
273
274 if !is_shutting_down() {
276 sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
277 }
278
279 continue;
280 };
281
282 let template = template
284 .try_into_template()
285 .expect("invalid RPC response: proposal in response to a template request");
286
287 info!(
288 height = ?template.height,
289 transactions = ?template.transactions.len(),
290 "mining with an updated block template",
291 );
292
293 parameters.long_poll_id = Some(template.long_poll_id);
295
296 let block = proposal_block_from_template(
297 &template,
298 TimeSource::CurTime,
299 NetworkUpgrade::current(&network, Height(template.height)),
300 )
301 .expect("unexpected invalid block template");
302
303 template_sender.send_if_modified(|old_block| {
305 if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
306 return false;
307 }
308 *old_block = Some(Arc::new(block));
309 true
310 });
311
312 if !template_sender.is_closed() && !is_shutting_down() {
315 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
316 }
317 }
318
319 Ok(())
320}
321
322#[instrument(skip(template_receiver, rpc))]
329pub async fn run_mining_solver<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
330 solver_id: u8,
331 mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
332 rpc: GetBlockTemplateRpcImpl<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>,
333) -> Result<(), Report>
334where
335 Mempool: Service<
336 mempool::Request,
337 Response = mempool::Response,
338 Error = zebra_node_services::BoxError,
339 > + Clone
340 + Send
341 + Sync
342 + 'static,
343 Mempool::Future: Send,
344 State: Service<
345 zebra_state::ReadRequest,
346 Response = zebra_state::ReadResponse,
347 Error = zebra_state::BoxError,
348 > + Clone
349 + Send
350 + Sync
351 + 'static,
352 <State as Service<zebra_state::ReadRequest>>::Future: Send,
353 Tip: ChainTip + Clone + Send + Sync + 'static,
354 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
355 + Clone
356 + Send
357 + Sync
358 + 'static,
359 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
360 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
361 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
362{
363 while template_receiver.has_changed().is_ok() && !is_shutting_down() {
365 template_receiver.mark_as_seen();
368 let template = template_receiver.cloned_watch_data();
369
370 let Some(template) = template else {
371 if solver_id == 0 {
372 info!(
373 ?solver_id,
374 ?BLOCK_TEMPLATE_WAIT_TIME,
375 "solver waiting for initial block template"
376 );
377 } else {
378 debug!(
379 ?solver_id,
380 ?BLOCK_TEMPLATE_WAIT_TIME,
381 "solver waiting for initial block template"
382 );
383 }
384
385 if !is_shutting_down() {
387 sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
388 }
389
390 continue;
391 };
392
393 let height = template.coinbase_height().expect("template is valid");
394
395 let mut cancel_receiver = template_receiver.clone();
397 let old_header = *template.header;
398 let cancel_fn = move || match cancel_receiver.has_changed() {
399 Ok(has_changed) => {
403 cancel_receiver.mark_as_seen();
404
405 if has_changed
408 && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
409 {
410 Err(SolverCancelled)
411 } else {
412 Ok(())
413 }
414 }
415 Err(_sender_dropped) => Err(SolverCancelled),
417 };
418
419 let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
421 if solver_id == 0 {
423 info!(
424 ?height,
425 ?solver_id,
426 new_template = ?template_receiver.has_changed(),
427 shutting_down = ?is_shutting_down(),
428 "solver cancelled: getting a new block template or shutting down"
429 );
430 } else {
431 debug!(
432 ?height,
433 ?solver_id,
434 new_template = ?template_receiver.has_changed(),
435 shutting_down = ?is_shutting_down(),
436 "solver cancelled: getting a new block template or shutting down"
437 );
438 }
439
440 if template_receiver.has_changed().is_ok() && !is_shutting_down() {
443 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
444 }
445
446 continue;
447 };
448
449 let mut any_success = false;
455 for block in blocks {
456 let data = block
457 .zcash_serialize_to_vec()
458 .expect("serializing to Vec never fails");
459
460 match rpc.submit_block(HexData(data), None).await {
461 Ok(success) => {
462 info!(
463 ?height,
464 hash = ?block.hash(),
465 ?solver_id,
466 ?success,
467 "successfully mined a new block",
468 );
469 any_success = true;
470 }
471 Err(error) => info!(
472 ?height,
473 hash = ?block.hash(),
474 ?solver_id,
475 ?error,
476 "validating a newly mined block failed, trying again",
477 ),
478 }
479 }
480
481 if !any_success {
484 if template_receiver.has_changed().is_ok() && !is_shutting_down() {
487 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
488 }
489 continue;
490 }
491
492 tokio::select! {
495 shutdown_result = template_receiver.changed() => shutdown_result?,
496 _ = sleep(BLOCK_MINING_WAIT_TIME) => {}
497
498 }
499 }
500
501 Ok(())
502}
503
504pub async fn mine_a_block<F>(
512 solver_id: u8,
513 template: Arc<Block>,
514 cancel_fn: F,
515) -> Result<AtLeastOne<Block>, SolverCancelled>
516where
517 F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
518{
519 let mut header = *template.header;
522
523 *header.nonce.first_mut().unwrap() = solver_id;
527 *header.nonce.last_mut().unwrap() = solver_id;
528
529 let span = Span::current();
531 let solved_headers =
532 tokio::task::spawn_blocking(move || span.in_scope(move || {
533 let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
534 if let Err(error) = priority_result {
535 info!(?error, "could not set miner to run at a low priority: running at default priority");
536 }
537
538 Solution::solve(header, cancel_fn)
539 }).expect("unable to spawn miner thread");
540
541 miner_thread_handle.wait_for_panics()
542 }))
543 .wait_for_panics()
544 .await?;
545
546 let block = (*template).clone();
550
551 let solved_blocks: Vec<Block> = solved_headers
552 .into_iter()
553 .map(|header| {
554 let mut block = block.clone();
555 block.header = Arc::new(header);
556 block
557 })
558 .collect();
559
560 Ok(solved_blocks
561 .try_into()
562 .expect("a 1:1 mapping of AtLeastOne produces at least one block"))
563}