1use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration};
10
11use color_eyre::Report;
12use futures::{stream::FuturesUnordered, StreamExt};
13use thread_priority::{ThreadBuilder, ThreadPriority};
14use tokio::{select, sync::watch, task::JoinHandle, time::sleep};
15use tower::Service;
16use tracing::{Instrument, Span};
17
18use zebra_chain::{
19 block::{self, Block, Height},
20 chain_sync_status::ChainSyncStatus,
21 chain_tip::ChainTip,
22 diagnostic::task::WaitForPanics,
23 parameters::{Network, NetworkUpgrade},
24 serialization::{AtLeastOne, ZcashSerialize},
25 shutdown::is_shutting_down,
26 work::equihash::{Solution, SolverCancelled},
27};
28use zebra_network::AddressBookPeers;
29use zebra_node_services::mempool;
30use zebra_rpc::{
31 config::mining::Config,
32 methods::{
33 hex_data::HexData,
34 types::get_block_template::{
35 self,
36 parameters::GetBlockTemplateCapability::{CoinbaseTxn, LongPoll},
37 proposal::proposal_block_from_template,
38 GetBlockTemplateRequestMode::Template,
39 TimeSource,
40 },
41 RpcImpl, RpcServer,
42 },
43};
44use zebra_state::WatchReceiver;
45
46pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20);
48
49pub const BLOCK_TEMPLATE_REFRESH_LIMIT: Duration = Duration::from_secs(2);
51
52pub const BLOCK_MINING_WAIT_TIME: Duration = Duration::from_secs(3);
57
58pub fn spawn_init<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>(
65 network: &Network,
66 config: &Config,
67 rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
68) -> JoinHandle<Result<(), Report>>
69where
71 Mempool: Service<
72 mempool::Request,
73 Response = mempool::Response,
74 Error = zebra_node_services::BoxError,
75 > + Clone
76 + Send
77 + Sync
78 + 'static,
79 Mempool::Future: Send,
80 State: Service<
81 zebra_state::ReadRequest,
82 Response = zebra_state::ReadResponse,
83 Error = zebra_state::BoxError,
84 > + Clone
85 + Send
86 + Sync
87 + 'static,
88 <State as Service<zebra_state::ReadRequest>>::Future: Send,
89 Tip: ChainTip + Clone + Send + Sync + 'static,
90 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
91 + Clone
92 + Send
93 + Sync
94 + 'static,
95 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
96 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
97 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
98{
99 let network = network.clone();
100 let config = config.clone();
101
102 tokio::spawn(init(network, config, rpc).in_current_span())
104}
105
106pub async fn init<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
113 network: Network,
114 _config: Config,
115 rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
116) -> Result<(), Report>
117where
118 Mempool: Service<
119 mempool::Request,
120 Response = mempool::Response,
121 Error = zebra_node_services::BoxError,
122 > + Clone
123 + Send
124 + Sync
125 + 'static,
126 Mempool::Future: Send,
127 State: Service<
128 zebra_state::ReadRequest,
129 Response = zebra_state::ReadResponse,
130 Error = zebra_state::BoxError,
131 > + Clone
132 + Send
133 + Sync
134 + 'static,
135 <State as Service<zebra_state::ReadRequest>>::Future: Send,
136 Tip: ChainTip + Clone + Send + Sync + 'static,
137 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
138 + Clone
139 + Send
140 + Sync
141 + 'static,
142 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
143 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
144 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
145{
146 let configured_threads = 1;
149 let available_threads = available_parallelism()
151 .map(usize::from)
152 .unwrap_or(configured_threads);
153
154 let solver_count = min(configured_threads, available_threads);
156
157 info!(
158 ?solver_count,
159 "launching mining tasks with parallel solvers"
160 );
161
162 let (template_sender, template_receiver) = watch::channel(None);
163 let template_receiver = WatchReceiver::new(template_receiver);
164
165 let mut abort_handles = Vec::new();
168
169 let template_generator = tokio::task::spawn(
170 generate_block_templates(network, rpc.clone(), template_sender).in_current_span(),
171 );
172 abort_handles.push(template_generator.abort_handle());
173 let template_generator = template_generator.wait_for_panics();
174
175 let mut mining_solvers = FuturesUnordered::new();
176 for solver_id in 0..solver_count {
177 let solver_id = min(solver_id, usize::from(u8::MAX))
179 .try_into()
180 .expect("just limited to u8::MAX");
181
182 let solver = tokio::task::spawn(
183 run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(),
184 );
185 abort_handles.push(solver.abort_handle());
186
187 mining_solvers.push(solver.wait_for_panics());
188 }
189
190 let first_result;
194 select! {
195 result = template_generator => { first_result = result; }
196 result = mining_solvers.next() => {
197 first_result = result
198 .expect("stream never terminates because there is at least one solver task");
199 }
200 }
201
202 for abort_handle in abort_handles {
204 abort_handle.abort();
205 }
206
207 first_result
212}
213
214#[instrument(skip(rpc, template_sender, network))]
216pub async fn generate_block_templates<
217 Mempool,
218 State,
219 Tip,
220 BlockVerifierRouter,
221 SyncStatus,
222 AddressBook,
223>(
224 network: Network,
225 rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
226 template_sender: watch::Sender<Option<Arc<Block>>>,
227) -> Result<(), Report>
228where
229 Mempool: Service<
230 mempool::Request,
231 Response = mempool::Response,
232 Error = zebra_node_services::BoxError,
233 > + Clone
234 + Send
235 + Sync
236 + 'static,
237 Mempool::Future: Send,
238 State: Service<
239 zebra_state::ReadRequest,
240 Response = zebra_state::ReadResponse,
241 Error = zebra_state::BoxError,
242 > + Clone
243 + Send
244 + Sync
245 + 'static,
246 <State as Service<zebra_state::ReadRequest>>::Future: Send,
247 Tip: ChainTip + Clone + Send + Sync + 'static,
248 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
249 + Clone
250 + Send
251 + Sync
252 + 'static,
253 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
254 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
255 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
256{
257 let mut parameters = get_block_template::JsonParameters {
259 mode: Template,
260 data: None,
261 capabilities: vec![LongPoll, CoinbaseTxn],
262 long_poll_id: None,
263 _work_id: None,
264 };
265
266 while !template_sender.is_closed() && !is_shutting_down() {
268 let template: Result<_, _> = rpc.get_block_template(Some(parameters.clone())).await;
269
270 let Ok(template) = template else {
272 warn!(
273 ?BLOCK_TEMPLATE_WAIT_TIME,
274 ?template,
275 "waiting for a valid block template",
276 );
277
278 if !is_shutting_down() {
280 sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
281 }
282
283 continue;
284 };
285
286 let template = template
288 .try_into_template()
289 .expect("invalid RPC response: proposal in response to a template request");
290
291 info!(
292 height = ?template.height,
293 transactions = ?template.transactions.len(),
294 "mining with an updated block template",
295 );
296
297 parameters.long_poll_id = Some(template.long_poll_id);
299
300 let block = proposal_block_from_template(
301 &template,
302 TimeSource::CurTime,
303 NetworkUpgrade::current(&network, Height(template.height)),
304 )
305 .expect("unexpected invalid block template");
306
307 template_sender.send_if_modified(|old_block| {
309 if old_block.as_ref().map(|b| *b.header) == Some(*block.header) {
310 return false;
311 }
312 *old_block = Some(Arc::new(block));
313 true
314 });
315
316 if !template_sender.is_closed() && !is_shutting_down() {
319 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
320 }
321 }
322
323 Ok(())
324}
325
326#[instrument(skip(template_receiver, rpc))]
333pub async fn run_mining_solver<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
334 solver_id: u8,
335 mut template_receiver: WatchReceiver<Option<Arc<Block>>>,
336 rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
337) -> Result<(), Report>
338where
339 Mempool: Service<
340 mempool::Request,
341 Response = mempool::Response,
342 Error = zebra_node_services::BoxError,
343 > + Clone
344 + Send
345 + Sync
346 + 'static,
347 Mempool::Future: Send,
348 State: Service<
349 zebra_state::ReadRequest,
350 Response = zebra_state::ReadResponse,
351 Error = zebra_state::BoxError,
352 > + Clone
353 + Send
354 + Sync
355 + 'static,
356 <State as Service<zebra_state::ReadRequest>>::Future: Send,
357 Tip: ChainTip + Clone + Send + Sync + 'static,
358 BlockVerifierRouter: Service<zebra_consensus::Request, Response = block::Hash, Error = zebra_consensus::BoxError>
359 + Clone
360 + Send
361 + Sync
362 + 'static,
363 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
364 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
365 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
366{
367 while template_receiver.has_changed().is_ok() && !is_shutting_down() {
369 template_receiver.mark_as_seen();
372 let template = template_receiver.cloned_watch_data();
373
374 let Some(template) = template else {
375 if solver_id == 0 {
376 info!(
377 ?solver_id,
378 ?BLOCK_TEMPLATE_WAIT_TIME,
379 "solver waiting for initial block template"
380 );
381 } else {
382 debug!(
383 ?solver_id,
384 ?BLOCK_TEMPLATE_WAIT_TIME,
385 "solver waiting for initial block template"
386 );
387 }
388
389 if !is_shutting_down() {
391 sleep(BLOCK_TEMPLATE_WAIT_TIME).await;
392 }
393
394 continue;
395 };
396
397 let height = template.coinbase_height().expect("template is valid");
398
399 let mut cancel_receiver = template_receiver.clone();
401 let old_header = *template.header;
402 let cancel_fn = move || match cancel_receiver.has_changed() {
403 Ok(has_changed) => {
407 cancel_receiver.mark_as_seen();
408
409 if has_changed
412 && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header)
413 {
414 Err(SolverCancelled)
415 } else {
416 Ok(())
417 }
418 }
419 Err(_sender_dropped) => Err(SolverCancelled),
421 };
422
423 let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else {
425 if solver_id == 0 {
427 info!(
428 ?height,
429 ?solver_id,
430 new_template = ?template_receiver.has_changed(),
431 shutting_down = ?is_shutting_down(),
432 "solver cancelled: getting a new block template or shutting down"
433 );
434 } else {
435 debug!(
436 ?height,
437 ?solver_id,
438 new_template = ?template_receiver.has_changed(),
439 shutting_down = ?is_shutting_down(),
440 "solver cancelled: getting a new block template or shutting down"
441 );
442 }
443
444 if template_receiver.has_changed().is_ok() && !is_shutting_down() {
447 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
448 }
449
450 continue;
451 };
452
453 let mut any_success = false;
459 for block in blocks {
460 let data = block
461 .zcash_serialize_to_vec()
462 .expect("serializing to Vec never fails");
463
464 match rpc.submit_block(HexData(data), None).await {
465 Ok(success) => {
466 info!(
467 ?height,
468 hash = ?block.hash(),
469 ?solver_id,
470 ?success,
471 "successfully mined a new block",
472 );
473 any_success = true;
474 }
475 Err(error) => info!(
476 ?height,
477 hash = ?block.hash(),
478 ?solver_id,
479 ?error,
480 "validating a newly mined block failed, trying again",
481 ),
482 }
483 }
484
485 if !any_success {
488 if template_receiver.has_changed().is_ok() && !is_shutting_down() {
491 sleep(BLOCK_TEMPLATE_REFRESH_LIMIT).await;
492 }
493 continue;
494 }
495
496 tokio::select! {
499 shutdown_result = template_receiver.changed() => shutdown_result?,
500 _ = sleep(BLOCK_MINING_WAIT_TIME) => {}
501
502 }
503 }
504
505 Ok(())
506}
507
508pub async fn mine_a_block<F>(
516 solver_id: u8,
517 template: Arc<Block>,
518 cancel_fn: F,
519) -> Result<AtLeastOne<Block>, SolverCancelled>
520where
521 F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static,
522{
523 let mut header = *template.header;
526
527 *header.nonce.first_mut().unwrap() = solver_id;
531 *header.nonce.last_mut().unwrap() = solver_id;
532
533 let span = Span::current();
535 let solved_headers =
536 tokio::task::spawn_blocking(move || span.in_scope(move || {
537 let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| {
538 if let Err(error) = priority_result {
539 info!(?error, "could not set miner to run at a low priority: running at default priority");
540 }
541
542 Solution::solve(header, cancel_fn)
543 }).expect("unable to spawn miner thread");
544
545 miner_thread_handle.wait_for_panics()
546 }))
547 .wait_for_panics()
548 .await?;
549
550 let block = (*template).clone();
554
555 let solved_blocks: Vec<Block> = solved_headers
556 .into_iter()
557 .map(|header| {
558 let mut block = block.clone();
559 block.header = Arc::new(header);
560 block
561 })
562 .collect();
563
564 Ok(solved_blocks
565 .try_into()
566 .expect("a 1:1 mapping of AtLeastOne produces at least one block"))
567}