1use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::{router::BackgroundTaskHandles, ParameterCheckpoint};
87use zebra_rpc::{methods::RpcImpl, server::RpcServer, SubmitBlockChannel};
88
89use crate::{
90 application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
91 components::{
92 inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
93 mempool::{self, Mempool},
94 sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
95 tokio::{RuntimeRun, TokioComponent},
96 ChainSync, Inbound,
97 },
98 config::ZebradConfig,
99 prelude::*,
100};
101
102#[cfg(feature = "internal-miner")]
103use crate::components;
104
105#[derive(Command, Debug, Default, clap::Parser)]
107pub struct StartCmd {
108 #[clap(help = "tracing filters which override the zebrad.toml config")]
110 filters: Vec<String>,
111}
112
113impl StartCmd {
114 async fn start(&self) -> Result<(), Report> {
115 let config = APPLICATION.config();
116 let is_regtest = config.network.network.is_regtest();
117
118 let config = if is_regtest {
119 Arc::new(ZebradConfig {
120 mempool: mempool::Config {
121 debug_enable_at_height: Some(0),
122 ..config.mempool
123 },
124 ..Arc::unwrap_or_clone(config)
125 })
126 } else {
127 config
128 };
129
130 info!("initializing node state");
131 let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
132 config.consensus.clone(),
133 &config.network.network,
134 );
135
136 info!("opening database, this may take a few minutes");
137
138 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
139 zebra_state::spawn_init(
140 config.state.clone(),
141 &config.network.network,
142 max_checkpoint_height,
143 config.sync.checkpoint_verify_concurrency_limit
144 * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
145 )
146 .await?;
147
148 info!("logging database metrics on startup");
149 read_only_state_service.log_db_metrics();
150
151 let state = ServiceBuilder::new()
152 .buffer(Self::state_buffer_bound())
153 .service(state_service);
154
155 info!("initializing network");
156 let (setup_tx, setup_rx) = oneshot::channel();
167 let inbound = ServiceBuilder::new()
168 .load_shed()
169 .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
170 .timeout(MAX_INBOUND_RESPONSE_TIME)
171 .service(Inbound::new(
172 config.sync.full_verify_concurrency_limit,
173 setup_rx,
174 ));
175
176 let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
177 config.network.clone(),
178 inbound,
179 latest_chain_tip.clone(),
180 user_agent(),
181 )
182 .await;
183
184 info!("initializing verifiers");
185 let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
186 let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
187 zebra_consensus::router::init(
188 config.consensus.clone(),
189 &config.network.network,
190 state.clone(),
191 tx_verifier_setup_rx,
192 )
193 .await;
194
195 info!("initializing syncer");
196 let (mut syncer, sync_status) = ChainSync::new(
197 &config,
198 max_checkpoint_height,
199 peer_set.clone(),
200 block_verifier_router.clone(),
201 state.clone(),
202 latest_chain_tip.clone(),
203 misbehavior_sender.clone(),
204 );
205
206 info!("initializing mempool");
207 let (mempool, mempool_transaction_subscriber) = Mempool::new(
208 &config.mempool,
209 peer_set.clone(),
210 state.clone(),
211 tx_verifier,
212 sync_status.clone(),
213 latest_chain_tip.clone(),
214 chain_tip_change.clone(),
215 misbehavior_sender.clone(),
216 );
217 let mempool = BoxService::new(mempool);
218 let mempool = ServiceBuilder::new()
219 .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
220 .service(mempool);
221
222 if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
223 warn!("error setting up the transaction verifier with a handle to the mempool service");
224 };
225
226 info!("fully initializing inbound peer request handler");
227 let setup_data = InboundSetupData {
229 address_book: address_book.clone(),
230 block_download_peer_set: peer_set.clone(),
231 block_verifier: block_verifier_router.clone(),
232 mempool: mempool.clone(),
233 state: state.clone(),
234 latest_chain_tip: latest_chain_tip.clone(),
235 misbehavior_sender,
236 };
237 setup_tx
238 .send(setup_data)
239 .map_err(|_| eyre!("could not send setup data to inbound service"))?;
240 tokio::task::yield_now().await;
242
243 let submit_block_channel = SubmitBlockChannel::new();
245
246 let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
248 config.network.network.clone(),
249 config.mining.clone(),
250 config.rpc.debug_force_finished_sync,
251 build_version(),
252 user_agent(),
253 mempool.clone(),
254 state.clone(),
255 read_only_state_service.clone(),
256 block_verifier_router.clone(),
257 sync_status.clone(),
258 latest_chain_tip.clone(),
259 address_book.clone(),
260 LAST_WARN_ERROR_LOG_SENDER.subscribe(),
261 Some(submit_block_channel.sender()),
262 );
263
264 let rpc_task_handle = if config.rpc.listen_addr.is_some() {
265 RpcServer::start(rpc_impl.clone(), config.rpc.clone())
266 .await
267 .expect("server should start")
268 } else {
269 tokio::spawn(std::future::pending().in_current_span())
270 };
271
272 let indexer_rpc_task_handle = {
275 if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
276 info!("spawning indexer RPC server");
277 let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
278 indexer_listen_addr,
279 read_only_state_service.clone(),
280 latest_chain_tip.clone(),
281 mempool_transaction_subscriber.clone(),
282 )
283 .await
284 .map_err(|err| eyre!(err))?;
285
286 indexer_rpc_task_handle
287 } else {
288 warn!("configure an indexer_listen_addr to start the indexer RPC server");
289 tokio::spawn(std::future::pending().in_current_span())
290 }
291 };
292
293 info!("spawning block gossip task");
295 let block_gossip_task_handle = tokio::spawn(
296 sync::gossip_best_tip_block_hashes(
297 sync_status.clone(),
298 chain_tip_change.clone(),
299 peer_set.clone(),
300 Some(submit_block_channel.receiver()),
301 )
302 .in_current_span(),
303 );
304
305 info!("spawning mempool queue checker task");
306 let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
307
308 info!("spawning mempool transaction gossip task");
309 let tx_gossip_task_handle = tokio::spawn(
310 mempool::gossip_mempool_transaction_id(
311 mempool_transaction_subscriber.subscribe(),
312 peer_set.clone(),
313 )
314 .in_current_span(),
315 );
316
317 info!("spawning delete old databases task");
318 let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
319 &config.state,
320 &config.network.network,
321 );
322
323 info!("spawning progress logging task");
324 let progress_task_handle = tokio::spawn(
325 show_block_chain_progress(
326 config.network.network.clone(),
327 latest_chain_tip.clone(),
328 sync_status.clone(),
329 )
330 .in_current_span(),
331 );
332
333 info!("spawning end of support checking task");
335 let end_of_support_task_handle = tokio::spawn(
336 sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
337 .in_current_span(),
338 );
339
340 tokio::task::yield_now().await;
344
345 info!("spawning mempool crawler task");
347 let mempool_crawler_task_handle = mempool::Crawler::spawn(
348 &config.mempool,
349 peer_set,
350 mempool.clone(),
351 sync_status.clone(),
352 chain_tip_change.clone(),
353 );
354
355 info!("spawning syncer task");
356 let syncer_task_handle = if is_regtest {
357 if !syncer
358 .state_contains(config.network.network.genesis_hash())
359 .await?
360 {
361 let genesis_hash = block_verifier_router
362 .clone()
363 .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
364 .await
365 .expect("should validate Regtest genesis block");
366
367 assert_eq!(
368 genesis_hash,
369 config.network.network.genesis_hash(),
370 "validated block hash should match network genesis hash"
371 )
372 }
373
374 tokio::spawn(std::future::pending().in_current_span())
375 } else {
376 tokio::spawn(syncer.sync().in_current_span())
377 };
378
379 #[cfg(feature = "internal-miner")]
383 let miner_task_handle = if config.mining.is_internal_miner_enabled() {
384 info!("spawning Zcash miner");
385 components::miner::spawn_init(&config.metrics, rpc_impl)
386 } else {
387 tokio::spawn(std::future::pending().in_current_span())
388 };
389
390 #[cfg(not(feature = "internal-miner"))]
391 let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
393 tokio::spawn(std::future::pending().in_current_span());
394
395 info!("spawned initial Zebra tasks");
396
397 pin!(rpc_task_handle);
401 pin!(indexer_rpc_task_handle);
402 pin!(syncer_task_handle);
403 pin!(block_gossip_task_handle);
404 pin!(mempool_crawler_task_handle);
405 pin!(mempool_queue_checker_task_handle);
406 pin!(tx_gossip_task_handle);
407 pin!(progress_task_handle);
408 pin!(end_of_support_task_handle);
409 pin!(miner_task_handle);
410
411 let BackgroundTaskHandles {
413 mut state_checkpoint_verify_handle,
414 } = consensus_task_handles;
415
416 let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
417 pin!(state_checkpoint_verify_handle_fused);
418
419 let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
420 pin!(old_databases_task_handle_fused);
421
422 let exit_status = loop {
424 let mut exit_when_task_finishes = true;
425
426 let result = select! {
427 rpc_join_result = &mut rpc_task_handle => {
428 let rpc_server_result = rpc_join_result
429 .expect("unexpected panic in the rpc task");
430 info!(?rpc_server_result, "rpc task exited");
431 Ok(())
432 }
433
434 rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
435 rpc_tx_queue_result
436 .expect("unexpected panic in the rpc transaction queue task");
437 info!("rpc transaction queue task exited");
438 Ok(())
439 }
440
441 indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
442 let indexer_rpc_server_result = indexer_rpc_join_result
443 .expect("unexpected panic in the indexer task");
444 info!(?indexer_rpc_server_result, "indexer rpc task exited");
445 Ok(())
446 }
447
448 sync_result = &mut syncer_task_handle => sync_result
449 .expect("unexpected panic in the syncer task")
450 .map(|_| info!("syncer task exited")),
451
452 block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
453 .expect("unexpected panic in the chain tip block gossip task")
454 .map(|_| info!("chain tip block gossip task exited"))
455 .map_err(|e| eyre!(e)),
456
457 mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
458 .expect("unexpected panic in the mempool crawler")
459 .map(|_| info!("mempool crawler task exited"))
460 .map_err(|e| eyre!(e)),
461
462 mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
463 .expect("unexpected panic in the mempool queue checker")
464 .map(|_| info!("mempool queue checker task exited"))
465 .map_err(|e| eyre!(e)),
466
467 tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
468 .expect("unexpected panic in the transaction gossip task")
469 .map(|_| info!("transaction gossip task exited"))
470 .map_err(|e| eyre!(e)),
471
472 progress_result = &mut progress_task_handle => {
475 info!("chain progress task exited");
476 progress_result
477 .expect("unexpected panic in the chain progress task");
478 }
479
480 end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
481 .expect("unexpected panic in the end of support task")
482 .map(|_| info!("end of support task exited")),
483
484 state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
486 state_checkpoint_verify_result
487 .unwrap_or_else(|_| panic!(
488 "unexpected panic checking previous state followed the best chain"));
489
490 exit_when_task_finishes = false;
491 Ok(())
492 }
493
494 old_databases_result = &mut old_databases_task_handle_fused => {
496 old_databases_result
497 .unwrap_or_else(|_| panic!(
498 "unexpected panic deleting old database directories"));
499
500 exit_when_task_finishes = false;
501 Ok(())
502 }
503
504 miner_result = &mut miner_task_handle => miner_result
505 .expect("unexpected panic in the miner task")
506 .map(|_| info!("miner task exited")),
507 };
508
509 if let Err(err) = result {
512 break Err(err);
513 }
514
515 if exit_when_task_finishes {
516 break Ok(());
517 }
518 };
519
520 info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
521
522 rpc_task_handle.abort();
524 rpc_tx_queue_handle.abort();
525 syncer_task_handle.abort();
526 block_gossip_task_handle.abort();
527 mempool_crawler_task_handle.abort();
528 mempool_queue_checker_task_handle.abort();
529 tx_gossip_task_handle.abort();
530 progress_task_handle.abort();
531 end_of_support_task_handle.abort();
532 miner_task_handle.abort();
533
534 state_checkpoint_verify_handle.abort();
536 old_databases_task_handle.abort();
537
538 info!(
539 "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
540 );
541
542 exit_status
543 }
544
545 fn state_buffer_bound() -> usize {
548 let config = APPLICATION.config();
549
550 [
555 config.sync.download_concurrency_limit,
556 config.sync.full_verify_concurrency_limit,
557 inbound::downloads::MAX_INBOUND_CONCURRENCY,
558 mempool::downloads::MAX_INBOUND_CONCURRENCY,
559 ]
560 .into_iter()
561 .max()
562 .unwrap()
563 }
564}
565
566impl Runnable for StartCmd {
567 fn run(&self) {
569 info!("Starting zebrad");
570 let rt = APPLICATION
571 .state()
572 .components_mut()
573 .get_downcast_mut::<TokioComponent>()
574 .expect("TokioComponent should be available")
575 .rt
576 .take();
577
578 rt.expect("runtime should not already be taken")
579 .run(self.start());
580
581 info!("stopping zebrad");
582 }
583}
584
585impl config::Override<ZebradConfig> for StartCmd {
586 fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
590 if !self.filters.is_empty() {
591 config.tracing.filter = Some(self.filters.join(","));
592 }
593
594 Ok(config)
595 }
596}