1use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
10
11use color_eyre::Report;
12use futures::{stream::FuturesUnordered, StreamExt};
13use thread_priority::{ThreadBuilder, ThreadPriority};
14use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
15use tower::Service;
16use tracing::{Instrument, Span};
17
18use zebra_chain::{
19 block::{self, Block},
20 chain_sync_status::ChainSyncStatus,
21 chain_tip::ChainTip,
22 diagnostic::task::WaitForPanics,
23 serialization::{AtLeastOne, ZcashSerialize},
24 shutdown::is_shutting_down,
25 work::equihash::{Solution, SolverCancelled},
26};
27use zebra_network::AddressBookPeers;
28use zebra_node_services::mempool;
29use zebra_rpc::{
30 client::{
31 BlockTemplateTimeSource,
32 GetBlockTemplateCapability::{CoinbaseTxn, LongPoll},
33 GetBlockTemplateParameters,
34 GetBlockTemplateRequestMode::Template,
35 HexData,
36 },
37 methods::{RpcImpl, RpcServer},
38 proposal_block_from_template,
39};
40use zebra_state::WatchReceiver;
41
42use crate::components::metrics::Config;
43
44pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
46
47pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
49
50pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
55
56pub fn spawn_init<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>(
63 config: &Config,
64 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
65) -> JoinHandle<Result<(), Report>>
66where
68 Mempool: Service<
69 mempool::Request,
70 Response = mempool::Response,
71 Error = zebra_node_services::BoxError,
72 > + Clone
73 + Send
74 + Sync
75 + 'static,
76 Mempool::Future: Send,
77 State: Service<
78 zebra_state::Request,
79 Response = zebra_state::Response,
80 Error = zebra_state::BoxError,
81 > + Clone
82 + Send
83 + Sync
84 + 'static,
85 <State as Service<zebra_state::Request>>::Future: Send,
86 ReadState: Service<
87 zebra_state::ReadRequest,
88 Response = zebra_state::ReadResponse,
89 Error = zebra_state::BoxError,
90 > + Clone
91 + Send
92 + Sync
93 + 'static,
94 <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
95 Tip: ChainTip + Clone + Send + Sync + 'static,
96 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
97 + Clone
98 + Send
99 + Sync
100 + 'static,
101 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
102 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
103 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
104{
105 tokio::spawn(init(config.clone(), rpc).in_current_span())
107}
108
109pub async fn init<Mempool, State, ReadState, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
116 _config: Config,
117 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
118) -> Result<(), Report>
119where
120 Mempool: Service<
121 mempool::Request,
122 Response = mempool::Response,
123 Error = zebra_node_services::BoxError,
124 > + Clone
125 + Send
126 + Sync
127 + 'static,
128 Mempool::Future: Send,
129 State: Service<
130 zebra_state::Request,
131 Response = zebra_state::Response,
132 Error = zebra_state::BoxError,
133 > + Clone
134 + Send
135 + Sync
136 + 'static,
137 <State as Service<zebra_state::Request>>::Future: Send,
138 ReadState: Service<
139 zebra_state::ReadRequest,
140 Response = zebra_state::ReadResponse,
141 Error = zebra_state::BoxError,
142 > + Clone
143 + Send
144 + Sync
145 + 'static,
146 <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
147 Tip: ChainTip + Clone + Send + Sync + 'static,
148 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
149 + Clone
150 + Send
151 + Sync
152 + 'static,
153 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
154 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
155 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
156{
157 let configured_threads = 1;
159 let available_threads = available_parallelism()
161 .map(usize::from)
162 .unwrap_or(configured_threads);
163
164 let solver_count = min(configured_threads, available_threads);
166
167 info!(
168 ?solver_count,
169 "launching mining tasks with parallel solvers"
170 );
171
172 let (template_sender, template_receiver) = watch::channel(None);
173 let template_receiver = WatchReceiver::new(template_receiver);
174
175 let mut abort_handles = Vec::new();
178
179 let template_generator = tokio::task::spawn(
180 generate_block_templates(rpc.clone(), template_sender).in_current_span(),
181 );
182 abort_handles.push(template_generator.abort_handle());
183 let template_generator = template_generator.wait_for_panics();
184
185 let mut mining_solvers = FuturesUnordered::new();
186 for solver_id in 0..solver_count {
187 let solver_id = min(solver_id, usize::from(u8::MAX))
189 .try_into()
190 .expect("just limited to u8::MAX");
191
192 let solver = tokio::task::spawn(
193 run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
194 );
195 abort_handles.push(solver.abort_handle());
196
197 mining_solvers.push(solver.wait_for_panics());
198 }
199
200 let first_result;
204 select! {
205 result = template_generator => { first_result = result; }
206 result = mining_solvers.next() => {
207 first_result = result
208 .expect("stream never terminates because there is at least one solver task");
209 }
210 }
211
212 for abort_handle in abort_handles {
214 abort_handle.abort();
215 }
216
217 first_result
222}
223
224#[instrument(skip(rpc, template_sender))]
226pub async fn generate_block_templates<
227 Mempool,
228 State,
229 ReadState,
230 Tip,
231 BlockVerifierRouter,
232 SyncStatus,
233 AddressBook,
234>(
235 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
236 template_sender: watch::Sender<Option<Arc<Block>>>,
237) -> Result<(), Report>
238where
239 Mempool: Service<
240 mempool::Request,
241 Response = mempool::Response,
242 Error = zebra_node_services::BoxError,
243 > + Clone
244 + Send
245 + Sync
246 + 'static,
247 Mempool::Future: Send,
248 State: Service<
249 zebra_state::Request,
250 Response = zebra_state::Response,
251 Error = zebra_state::BoxError,
252 > + Clone
253 + Send
254 + Sync
255 + 'static,
256 <State as Service<zebra_state::Request>>::Future: Send,
257 ReadState: Service<
258 zebra_state::ReadRequest,
259 Response = zebra_state::ReadResponse,
260 Error = zebra_state::BoxError,
261 > + Clone
262 + Send
263 + Sync
264 + 'static,
265 <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
266 Tip: ChainTip + Clone + Send + Sync + 'static,
267 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
268 + Clone
269 + Send
270 + Sync
271 + 'static,
272 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
273 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
274 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
275{
276 let mut parameters =
278 GetBlockTemplateParameters::new(Template, None, vec![LongPoll, CoinbaseTxn], None, None);
279
280 while !template_sender.is_closed() && !is_shutting_down() {
282 let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
283
284 let Ok(template) = template else {
286 warn!(
287 ?BLOCK_TEMPLATE_WAIT_TIME,
288 ?template,
289 "waiting for a valid block template",
290 );
291
292 if !is_shutting_down() {
294 sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
295 }
296
297 continue;
298 };
299
300 let template = template
302 .try_into_template()
303 .expect("invalid RPC response: proposal in response to a template request");
304
305 info!(
306 height = ?template.height(),
307 transactions = ?template.transactions().len(),
308 "mining with an updated block template",
309 );
310
311 parameters = GetBlockTemplateParameters::new(
313 Template,
314 None,
315 vec![LongPoll, CoinbaseTxn],
316 Some(template.long_poll_id()),
317 None,
318 );
319
320 let block = proposal_block_from_template(
321 &template,
322 BlockTemplateTimeSource::CurTime,
323 rpc.network(),
324 )?;
325
326 template_sender.send_if_modified(|old_block| {
328 if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
329 return false;
330 }
331 *old_block = Some(Arc::new(block));
332 true
333 });
334
335 if !template_sender.is_closed() && !is_shutting_down() {
338 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
339 }
340 }
341
342 Ok(())
343}
344
345#[instrument(skip(template_receiver, rpc))]
352pub async fn run_mining_solver<
353 Mempool,
354 State,
355 ReadState,
356 Tip,
357 BlockVerifierRouter,
358 SyncStatus,
359 AddressBook,
360>(
361 solver_id: u8,
362 mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
363 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
364) -> Result<(), Report>
365where
366 Mempool: Service<
367 mempool::Request,
368 Response = mempool::Response,
369 Error = zebra_node_services::BoxError,
370 > + Clone
371 + Send
372 + Sync
373 + 'static,
374 Mempool::Future: Send,
375 State: Service<
376 zebra_state::Request,
377 Response = zebra_state::Response,
378 Error = zebra_state::BoxError,
379 > + Clone
380 + Send
381 + Sync
382 + 'static,
383 <State as Service<zebra_state::Request>>::Future: Send,
384 ReadState: Service<
385 zebra_state::ReadRequest,
386 Response = zebra_state::ReadResponse,
387 Error = zebra_state::BoxError,
388 > + Clone
389 + Send
390 + Sync
391 + 'static,
392 <ReadState as Service<zebra_state::ReadRequest>>::Future: Send,
393 Tip: ChainTip + Clone + Send + Sync + 'static,
394 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
395 + Clone
396 + Send
397 + Sync
398 + 'static,
399 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
400 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
401 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
402{
403 while template_receiver.has_changed().is_ok() && !is_shutting_down() {
405 template_receiver.mark_as_seen();
408 let template = template_receiver.cloned_watch_data();
409
410 let Some(template) = template else {
411 if solver_id == 0 {
412 info!(
413 ?solver_id,
414 ?BLOCK_TEMPLATE_WAIT_TIME,
415 "solver waiting for initial block template"
416 );
417 } else {
418 debug!(
419 ?solver_id,
420 ?BLOCK_TEMPLATE_WAIT_TIME,
421 "solver waiting for initial block template"
422 );
423 }
424
425 if !is_shutting_down() {
427 sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
428 }
429
430 continue;
431 };
432
433 let height = template.coinbase_height().expect("template is valid");
434
435 let mut cancel_receiver = template_receiver.clone();
437 let old_header = *template.header;
438 let cancel_fn = move || match cancel_receiver.has_changed() {
439 Ok(has_changed) => {
443 cancel_receiver.mark_as_seen();
444
445 if has_changed
448 && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
449 {
450 Err(SolverCancelled)
451 } else {
452 Ok(())
453 }
454 }
455 Err(_sender_dropped) => Err(SolverCancelled),
457 };
458
459 let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
461 if solver_id == 0 {
463 info!(
464 ?height,
465 ?solver_id,
466 new_template = ?template_receiver.has_changed(),
467 shutting_down = ?is_shutting_down(),
468 "solver cancelled: getting a new block template or shutting down"
469 );
470 } else {
471 debug!(
472 ?height,
473 ?solver_id,
474 new_template = ?template_receiver.has_changed(),
475 shutting_down = ?is_shutting_down(),
476 "solver cancelled: getting a new block template or shutting down"
477 );
478 }
479
480 if template_receiver.has_changed().is_ok() && !is_shutting_down() {
483 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
484 }
485
486 continue;
487 };
488
489 let mut any_success = false;
495 for block in blocks {
496 let data = block
497 .zcash_serialize_to_vec()
498 .expect("serializing to Vec never fails");
499
500 match rpc.submit_block(HexData(data), None).await {
501 Ok(success) => {
502 info!(
503 ?height,
504 hash = ?block.hash(),
505 ?solver_id,
506 ?success,
507 "successfully mined a new block",
508 );
509 any_success = true;
510 }
511 Err(error) => info!(
512 ?height,
513 hash = ?block.hash(),
514 ?solver_id,
515 ?error,
516 "validating a newly mined block failed, trying again",
517 ),
518 }
519 }
520
521 if !any_success {
524 if template_receiver.has_changed().is_ok() && !is_shutting_down() {
527 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
528 }
529 continue;
530 }
531
532 tokio::select! {
535 shutdown_result = template_receiver.changed() => shutdown_result?,
536 _ = sleep(BLOCK_MINING_WAIT_TIME) => {}
537
538 }
539 }
540
541 Ok(())
542}
543
544pub async fn mine_a_block<F>(
552 solver_id: u8,
553 template: Arc<Block>,
554 cancel_fn: F,
555) -> Result<AtLeastOne<Block>, SolverCancelled>
556where
557 F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
558{
559 let mut header = *template.header;
562
563 *header.nonce.first_mut().unwrap() = solver_id;
567 *header.nonce.last_mut().unwrap() = solver_id;
568
569 let span = Span::current();
571 let solved_headers =
572 tokio::task::spawn_blocking(move || span.in_scope(move || {
573 let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
574 if let Err(error) = priority_result {
575 info!(?error, "could not set miner to run at a low priority: running at default priority");
576 }
577
578 Solution::solve(header, cancel_fn)
579 }).expect("unable to spawn miner thread");
580
581 miner_thread_handle.wait_for_panics()
582 }))
583 .wait_for_panics()
584 .await?;
585
586 let block = (*template).clone();
590
591 let solved_blocks: Vec<Block> = solved_headers
592 .into_iter()
593 .map(|header| {
594 let mut block = block.clone();
595 block.header = Arc::new(header);
596 block
597 })
598 .collect();
599
600 Ok(solved_blocks
601 .try_into()
602 .expect("a 1:1 mapping of AtLeastOne produces at least one block"))
603}