1use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::{router::BackgroundTaskHandles, ParameterCheckpoint};
87use zebra_rpc::{
88 methods::{types::submit_block::SubmitBlockChannel, RpcImpl},
89 server::RpcServer,
90};
91
92use crate::{
93 application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
94 components::{
95 inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
96 mempool::{self, Mempool},
97 sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
98 tokio::{RuntimeRun, TokioComponent},
99 ChainSync, Inbound,
100 },
101 config::ZebradConfig,
102 prelude::*,
103};
104
105#[cfg(feature = "internal-miner")]
106use crate::components;
107
108#[derive(Command, Debug, Default, clap::Parser)]
110pub struct StartCmd {
111 #[clap(help = "tracing filters which override the zebrad.toml config")]
113 filters: Vec<String>,
114}
115
116impl StartCmd {
117 async fn start(&self) -> Result<(), Report> {
118 let config = APPLICATION.config();
119 let is_regtest = config.network.network.is_regtest();
120
121 let config = if is_regtest {
122 Arc::new(ZebradConfig {
123 mempool: mempool::Config {
124 debug_enable_at_height: Some(0),
125 ..config.mempool
126 },
127 ..Arc::unwrap_or_clone(config)
128 })
129 } else {
130 config
131 };
132
133 info!("initializing node state");
134 let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
135 config.consensus.clone(),
136 &config.network.network,
137 );
138
139 info!("opening database, this may take a few minutes");
140
141 let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
142 zebra_state::spawn_init(
143 config.state.clone(),
144 &config.network.network,
145 max_checkpoint_height,
146 config.sync.checkpoint_verify_concurrency_limit
147 * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
148 )
149 .await?;
150
151 info!("logging database metrics on startup");
152 read_only_state_service.log_db_metrics();
153
154 let state = ServiceBuilder::new()
155 .buffer(Self::state_buffer_bound())
156 .service(state_service);
157
158 info!("initializing network");
159 let (setup_tx, setup_rx) = oneshot::channel();
170 let inbound = ServiceBuilder::new()
171 .load_shed()
172 .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
173 .timeout(MAX_INBOUND_RESPONSE_TIME)
174 .service(Inbound::new(
175 config.sync.full_verify_concurrency_limit,
176 setup_rx,
177 ));
178
179 let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
180 config.network.clone(),
181 inbound,
182 latest_chain_tip.clone(),
183 user_agent(),
184 )
185 .await;
186
187 info!("initializing verifiers");
188 let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
189 let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
190 zebra_consensus::router::init(
191 config.consensus.clone(),
192 &config.network.network,
193 state.clone(),
194 tx_verifier_setup_rx,
195 )
196 .await;
197
198 info!("initializing syncer");
199 let (mut syncer, sync_status) = ChainSync::new(
200 &config,
201 max_checkpoint_height,
202 peer_set.clone(),
203 block_verifier_router.clone(),
204 state.clone(),
205 latest_chain_tip.clone(),
206 misbehavior_sender.clone(),
207 );
208
209 info!("initializing mempool");
210 let (mempool, mempool_transaction_receiver) = Mempool::new(
211 &config.mempool,
212 peer_set.clone(),
213 state.clone(),
214 tx_verifier,
215 sync_status.clone(),
216 latest_chain_tip.clone(),
217 chain_tip_change.clone(),
218 misbehavior_sender.clone(),
219 );
220 let mempool = BoxService::new(mempool);
221 let mempool = ServiceBuilder::new()
222 .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
223 .service(mempool);
224
225 if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
226 warn!("error setting up the transaction verifier with a handle to the mempool service");
227 };
228
229 info!("fully initializing inbound peer request handler");
230 let setup_data = InboundSetupData {
232 address_book: address_book.clone(),
233 block_download_peer_set: peer_set.clone(),
234 block_verifier: block_verifier_router.clone(),
235 mempool: mempool.clone(),
236 state: state.clone(),
237 latest_chain_tip: latest_chain_tip.clone(),
238 misbehavior_sender,
239 };
240 setup_tx
241 .send(setup_data)
242 .map_err(|_| eyre!("could not send setup data to inbound service"))?;
243 tokio::task::yield_now().await;
245
246 let submit_block_channel = SubmitBlockChannel::new();
248
249 let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
251 config.network.network.clone(),
252 config.mining.clone(),
253 config.rpc.debug_force_finished_sync,
254 build_version(),
255 user_agent(),
256 mempool.clone(),
257 read_only_state_service.clone(),
258 block_verifier_router.clone(),
259 sync_status.clone(),
260 latest_chain_tip.clone(),
261 address_book.clone(),
262 LAST_WARN_ERROR_LOG_SENDER.subscribe(),
263 Some(submit_block_channel.sender()),
264 );
265
266 let rpc_task_handle = if config.rpc.listen_addr.is_some() {
267 RpcServer::start(rpc_impl.clone(), config.rpc.clone())
268 .await
269 .expect("server should start")
270 } else {
271 tokio::spawn(std::future::pending().in_current_span())
272 };
273
274 #[cfg(feature = "indexer")]
277 let indexer_rpc_task_handle =
278 if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
279 info!("spawning indexer RPC server");
280 let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
281 indexer_listen_addr,
282 read_only_state_service.clone(),
283 latest_chain_tip.clone(),
284 )
285 .await
286 .map_err(|err| eyre!(err))?;
287
288 indexer_rpc_task_handle
289 } else {
290 warn!("configure an indexer_listen_addr to start the indexer RPC server");
291 tokio::spawn(std::future::pending().in_current_span())
292 };
293
294 #[cfg(not(feature = "indexer"))]
295 let indexer_rpc_task_handle: tokio::task::JoinHandle<Result<(), tower::BoxError>> =
297 tokio::spawn(std::future::pending().in_current_span());
298
299 info!("spawning block gossip task");
301 let block_gossip_task_handle = tokio::spawn(
302 sync::gossip_best_tip_block_hashes(
303 sync_status.clone(),
304 chain_tip_change.clone(),
305 peer_set.clone(),
306 Some(submit_block_channel.receiver()),
307 )
308 .in_current_span(),
309 );
310
311 info!("spawning mempool queue checker task");
312 let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
313
314 info!("spawning mempool transaction gossip task");
315 let tx_gossip_task_handle = tokio::spawn(
316 mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set.clone())
317 .in_current_span(),
318 );
319
320 info!("spawning delete old databases task");
321 let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
322 &config.state,
323 &config.network.network,
324 );
325
326 info!("spawning progress logging task");
327 let progress_task_handle = tokio::spawn(
328 show_block_chain_progress(
329 config.network.network.clone(),
330 latest_chain_tip.clone(),
331 sync_status.clone(),
332 )
333 .in_current_span(),
334 );
335
336 info!("spawning end of support checking task");
338 let end_of_support_task_handle = tokio::spawn(
339 sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
340 .in_current_span(),
341 );
342
343 tokio::task::yield_now().await;
347
348 info!("spawning mempool crawler task");
350 let mempool_crawler_task_handle = mempool::Crawler::spawn(
351 &config.mempool,
352 peer_set,
353 mempool.clone(),
354 sync_status.clone(),
355 chain_tip_change.clone(),
356 );
357
358 info!("spawning syncer task");
359 let syncer_task_handle = if is_regtest {
360 if !syncer
361 .state_contains(config.network.network.genesis_hash())
362 .await?
363 {
364 let genesis_hash = block_verifier_router
365 .clone()
366 .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
367 .await
368 .expect("should validate Regtest genesis block");
369
370 assert_eq!(
371 genesis_hash,
372 config.network.network.genesis_hash(),
373 "validated block hash should match network genesis hash"
374 )
375 }
376
377 tokio::spawn(std::future::pending().in_current_span())
378 } else {
379 tokio::spawn(syncer.sync().in_current_span())
380 };
381
382 #[cfg(feature = "internal-miner")]
386 let miner_task_handle = if config.mining.is_internal_miner_enabled() {
387 info!("spawning Zcash miner");
388 components::miner::spawn_init(&config.network.network, &config.mining, rpc_impl)
389 } else {
390 tokio::spawn(std::future::pending().in_current_span())
391 };
392
393 #[cfg(not(feature = "internal-miner"))]
394 let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
396 tokio::spawn(std::future::pending().in_current_span());
397
398 info!("spawned initial Zebra tasks");
399
400 pin!(rpc_task_handle);
404 pin!(indexer_rpc_task_handle);
405 pin!(syncer_task_handle);
406 pin!(block_gossip_task_handle);
407 pin!(mempool_crawler_task_handle);
408 pin!(mempool_queue_checker_task_handle);
409 pin!(tx_gossip_task_handle);
410 pin!(progress_task_handle);
411 pin!(end_of_support_task_handle);
412 pin!(miner_task_handle);
413
414 let BackgroundTaskHandles {
416 mut state_checkpoint_verify_handle,
417 } = consensus_task_handles;
418
419 let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
420 pin!(state_checkpoint_verify_handle_fused);
421
422 let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
423 pin!(old_databases_task_handle_fused);
424
425 let exit_status = loop {
427 let mut exit_when_task_finishes = true;
428
429 let result = select! {
430 rpc_join_result = &mut rpc_task_handle => {
431 let rpc_server_result = rpc_join_result
432 .expect("unexpected panic in the rpc task");
433 info!(?rpc_server_result, "rpc task exited");
434 Ok(())
435 }
436
437 rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
438 rpc_tx_queue_result
439 .expect("unexpected panic in the rpc transaction queue task");
440 info!("rpc transaction queue task exited");
441 Ok(())
442 }
443
444 indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
445 let indexer_rpc_server_result = indexer_rpc_join_result
446 .expect("unexpected panic in the indexer task");
447 info!(?indexer_rpc_server_result, "indexer rpc task exited");
448 Ok(())
449 }
450
451 sync_result = &mut syncer_task_handle => sync_result
452 .expect("unexpected panic in the syncer task")
453 .map(|_| info!("syncer task exited")),
454
455 block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
456 .expect("unexpected panic in the chain tip block gossip task")
457 .map(|_| info!("chain tip block gossip task exited"))
458 .map_err(|e| eyre!(e)),
459
460 mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
461 .expect("unexpected panic in the mempool crawler")
462 .map(|_| info!("mempool crawler task exited"))
463 .map_err(|e| eyre!(e)),
464
465 mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
466 .expect("unexpected panic in the mempool queue checker")
467 .map(|_| info!("mempool queue checker task exited"))
468 .map_err(|e| eyre!(e)),
469
470 tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
471 .expect("unexpected panic in the transaction gossip task")
472 .map(|_| info!("transaction gossip task exited"))
473 .map_err(|e| eyre!(e)),
474
475 progress_result = &mut progress_task_handle => {
478 info!("chain progress task exited");
479 progress_result
480 .expect("unexpected panic in the chain progress task");
481 }
482
483 end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
484 .expect("unexpected panic in the end of support task")
485 .map(|_| info!("end of support task exited")),
486
487 state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
489 state_checkpoint_verify_result
490 .unwrap_or_else(|_| panic!(
491 "unexpected panic checking previous state followed the best chain"));
492
493 exit_when_task_finishes = false;
494 Ok(())
495 }
496
497 old_databases_result = &mut old_databases_task_handle_fused => {
499 old_databases_result
500 .unwrap_or_else(|_| panic!(
501 "unexpected panic deleting old database directories"));
502
503 exit_when_task_finishes = false;
504 Ok(())
505 }
506
507 miner_result = &mut miner_task_handle => miner_result
508 .expect("unexpected panic in the miner task")
509 .map(|_| info!("miner task exited")),
510 };
511
512 if let Err(err) = result {
515 break Err(err);
516 }
517
518 if exit_when_task_finishes {
519 break Ok(());
520 }
521 };
522
523 info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
524
525 rpc_task_handle.abort();
527 rpc_tx_queue_handle.abort();
528 syncer_task_handle.abort();
529 block_gossip_task_handle.abort();
530 mempool_crawler_task_handle.abort();
531 mempool_queue_checker_task_handle.abort();
532 tx_gossip_task_handle.abort();
533 progress_task_handle.abort();
534 end_of_support_task_handle.abort();
535 miner_task_handle.abort();
536
537 state_checkpoint_verify_handle.abort();
539 old_databases_task_handle.abort();
540
541 info!(
542 "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
543 );
544
545 exit_status
546 }
547
548 fn state_buffer_bound() -> usize {
551 let config = APPLICATION.config();
552
553 [
558 config.sync.download_concurrency_limit,
559 config.sync.full_verify_concurrency_limit,
560 inbound::downloads::MAX_INBOUND_CONCURRENCY,
561 mempool::downloads::MAX_INBOUND_CONCURRENCY,
562 ]
563 .into_iter()
564 .max()
565 .unwrap()
566 }
567}
568
569impl Runnable for StartCmd {
570 fn run(&self) {
572 info!("Starting zebrad");
573 let rt = APPLICATION
574 .state()
575 .components_mut()
576 .get_downcast_mut::<TokioComponent>()
577 .expect("TokioComponent should be available")
578 .rt
579 .take();
580
581 rt.expect("runtime should not already be taken")
582 .run(self.start());
583
584 info!("stopping zebrad");
585 }
586}
587
588impl config::Override<ZebradConfig> for StartCmd {
589 fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
593 if !self.filters.is_empty() {
594 config.tracing.filter = Some(self.filters.join(","));
595 }
596
597 Ok(config)
598 }
599}