zebrad/commands/
start.rs

1//! `start` subcommand - entry point for starting a zebra node
2//!
3//! ## Application Structure
4//!
5//! A zebra node consists of the following major services and tasks:
6//!
7//! Peers:
8//!  * Peer Connection Pool Service
9//!    * primary external interface for outbound requests from this node to remote peers
10//!    * accepts requests from services and tasks in this node, and sends them to remote peers
11//!  * Peer Discovery Service
12//!    * maintains a list of peer addresses, and connection priority metadata
13//!    * discovers new peer addresses from existing peer connections
14//!    * initiates new outbound peer connections in response to demand from tasks within this node
15//!  * Peer Cache Service
16//!    * Reads previous peer cache on startup, and adds it to the configured DNS seed peers
17//!    * Periodically updates the peer cache on disk from the latest address book state
18//!
19//! Blocks & Mempool Transactions:
20//!  * Consensus Service
21//!    * handles all validation logic for the node
22//!    * verifies blocks using zebra-chain, then stores verified blocks in zebra-state
23//!    * verifies mempool and block transactions using zebra-chain and zebra-script,
24//!      and returns verified mempool transactions for mempool storage
25//!  * Inbound Service
26//!    * primary external interface for inbound peer requests to this node
27//!    * handles requests from peers for network data, chain data, and mempool transactions
28//!    * spawns download and verify tasks for each gossiped block
29//!    * sends gossiped transactions to the mempool service
30//!
31//! Blocks:
32//!  * Sync Task
33//!    * runs in the background and continuously queries the network for
34//!      new blocks to be verified and added to the local state
35//!    * spawns download and verify tasks for each crawled block
36//!  * State Service
37//!    * contextually verifies blocks
38//!    * handles in-memory storage of multiple non-finalized chains
39//!    * handles permanent storage of the best finalized chain
40//!  * Old State Version Cleanup Task
41//!    * deletes outdated state versions
42//!  * Block Gossip Task
43//!    * runs in the background and continuously queries the state for
44//!      newly committed blocks to be gossiped to peers
45//!  * Progress Task
46//!    * logs progress towards the chain tip
47//!
48//! Block Mining:
49//!  * Internal Miner Task
50//!    * if the user has configured Zebra to mine blocks, spawns tasks to generate new blocks,
51//!      and submits them for verification. This automatically shares these new blocks with peers.
52//!
53//! Mempool Transactions:
54//!  * Mempool Service
55//!    * activates when the syncer is near the chain tip
56//!    * spawns download and verify tasks for each crawled or gossiped transaction
57//!    * handles in-memory storage of unmined transactions
58//!  * Queue Checker Task
59//!    * runs in the background, polling the mempool to store newly verified transactions
60//!  * Transaction Gossip Task
61//!    * runs in the background and gossips newly added mempool transactions
62//!      to peers
63//!
64//! Remote Procedure Calls:
65//!  * JSON-RPC Service
66//!    * answers RPC client requests using the State Service and Mempool Service
67//!    * submits client transactions to the node's mempool
68//!
69//! Zebra also has diagnostic support:
70//! * [metrics](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/metrics.md)
71//! * [tracing](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/tracing.md)
72//! * [progress-bar](https://docs.rs/howudoin/0.1.1/howudoin)
73//!
74//! Some of the diagnostic features are optional, and need to be enabled at compile-time.
75
76use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::{router::BackgroundTaskHandles, ParameterCheckpoint};
87use zebra_rpc::{
88    methods::{types::submit_block::SubmitBlockChannel, RpcImpl},
89    server::RpcServer,
90};
91
92use crate::{
93    application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
94    components::{
95        inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
96        mempool::{self, Mempool},
97        sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
98        tokio::{RuntimeRun, TokioComponent},
99        ChainSync, Inbound,
100    },
101    config::ZebradConfig,
102    prelude::*,
103};
104
105#[cfg(feature = "internal-miner")]
106use crate::components;
107
108/// Start the application (default command)
109#[derive(Command, Debug, Default, clap::Parser)]
110pub struct StartCmd {
111    /// Filter strings which override the config file and defaults
112    #[clap(help = "tracing filters which override the zebrad.toml config")]
113    filters: Vec<String>,
114}
115
116impl StartCmd {
117    async fn start(&self) -> Result<(), Report> {
118        let config = APPLICATION.config();
119        let is_regtest = config.network.network.is_regtest();
120
121        let config = if is_regtest {
122            Arc::new(ZebradConfig {
123                mempool: mempool::Config {
124                    debug_enable_at_height: Some(0),
125                    ..config.mempool
126                },
127                ..Arc::unwrap_or_clone(config)
128            })
129        } else {
130            config
131        };
132
133        info!("initializing node state");
134        let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
135            config.consensus.clone(),
136            &config.network.network,
137        );
138
139        info!("opening database, this may take a few minutes");
140
141        let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
142            zebra_state::spawn_init(
143                config.state.clone(),
144                &config.network.network,
145                max_checkpoint_height,
146                config.sync.checkpoint_verify_concurrency_limit
147                    * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
148            )
149            .await?;
150
151        info!("logging database metrics on startup");
152        read_only_state_service.log_db_metrics();
153
154        let state = ServiceBuilder::new()
155            .buffer(Self::state_buffer_bound())
156            .service(state_service);
157
158        info!("initializing network");
159        // The service that our node uses to respond to requests by peers. The
160        // load_shed middleware ensures that we reduce the size of the peer set
161        // in response to excess load.
162        //
163        // # Security
164        //
165        // This layer stack is security-sensitive, modifying it can cause hangs,
166        // or enable denial of service attacks.
167        //
168        // See `zebra_network::Connection::drive_peer_request()` for details.
169        let (setup_tx, setup_rx) = oneshot::channel();
170        let inbound = ServiceBuilder::new()
171            .load_shed()
172            .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
173            .timeout(MAX_INBOUND_RESPONSE_TIME)
174            .service(Inbound::new(
175                config.sync.full_verify_concurrency_limit,
176                setup_rx,
177            ));
178
179        let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
180            config.network.clone(),
181            inbound,
182            latest_chain_tip.clone(),
183            user_agent(),
184        )
185        .await;
186
187        info!("initializing verifiers");
188        let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
189        let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
190            zebra_consensus::router::init(
191                config.consensus.clone(),
192                &config.network.network,
193                state.clone(),
194                tx_verifier_setup_rx,
195            )
196            .await;
197
198        info!("initializing syncer");
199        let (mut syncer, sync_status) = ChainSync::new(
200            &config,
201            max_checkpoint_height,
202            peer_set.clone(),
203            block_verifier_router.clone(),
204            state.clone(),
205            latest_chain_tip.clone(),
206            misbehavior_sender.clone(),
207        );
208
209        info!("initializing mempool");
210        let (mempool, mempool_transaction_receiver) = Mempool::new(
211            &config.mempool,
212            peer_set.clone(),
213            state.clone(),
214            tx_verifier,
215            sync_status.clone(),
216            latest_chain_tip.clone(),
217            chain_tip_change.clone(),
218            misbehavior_sender.clone(),
219        );
220        let mempool = BoxService::new(mempool);
221        let mempool = ServiceBuilder::new()
222            .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
223            .service(mempool);
224
225        if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
226            warn!("error setting up the transaction verifier with a handle to the mempool service");
227        };
228
229        info!("fully initializing inbound peer request handler");
230        // Fully start the inbound service as soon as possible
231        let setup_data = InboundSetupData {
232            address_book: address_book.clone(),
233            block_download_peer_set: peer_set.clone(),
234            block_verifier: block_verifier_router.clone(),
235            mempool: mempool.clone(),
236            state: state.clone(),
237            latest_chain_tip: latest_chain_tip.clone(),
238            misbehavior_sender,
239        };
240        setup_tx
241            .send(setup_data)
242            .map_err(|_| eyre!("could not send setup data to inbound service"))?;
243        // And give it time to clear its queue
244        tokio::task::yield_now().await;
245
246        // Create a channel to send mined blocks to the gossip task
247        let submit_block_channel = SubmitBlockChannel::new();
248
249        // Launch RPC server
250        let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
251            config.network.network.clone(),
252            config.mining.clone(),
253            config.rpc.debug_force_finished_sync,
254            build_version(),
255            user_agent(),
256            mempool.clone(),
257            read_only_state_service.clone(),
258            block_verifier_router.clone(),
259            sync_status.clone(),
260            latest_chain_tip.clone(),
261            address_book.clone(),
262            LAST_WARN_ERROR_LOG_SENDER.subscribe(),
263            Some(submit_block_channel.sender()),
264        );
265
266        let rpc_task_handle = if config.rpc.listen_addr.is_some() {
267            RpcServer::start(rpc_impl.clone(), config.rpc.clone())
268                .await
269                .expect("server should start")
270        } else {
271            tokio::spawn(std::future::pending().in_current_span())
272        };
273
274        // TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
275        //       any related unit tests sometimes crash with memory errors
276        #[cfg(feature = "indexer")]
277        let indexer_rpc_task_handle =
278            if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
279                info!("spawning indexer RPC server");
280                let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
281                    indexer_listen_addr,
282                    read_only_state_service.clone(),
283                    latest_chain_tip.clone(),
284                )
285                .await
286                .map_err(|err| eyre!(err))?;
287
288                indexer_rpc_task_handle
289            } else {
290                warn!("configure an indexer_listen_addr to start the indexer RPC server");
291                tokio::spawn(std::future::pending().in_current_span())
292            };
293
294        #[cfg(not(feature = "indexer"))]
295        // Spawn a dummy indexer rpc task which doesn't do anything and never finishes.
296        let indexer_rpc_task_handle: tokio::task::JoinHandle<Result<(), tower::BoxError>> =
297            tokio::spawn(std::future::pending().in_current_span());
298
299        // Start concurrent tasks which don't add load to other tasks
300        info!("spawning block gossip task");
301        let block_gossip_task_handle = tokio::spawn(
302            sync::gossip_best_tip_block_hashes(
303                sync_status.clone(),
304                chain_tip_change.clone(),
305                peer_set.clone(),
306                Some(submit_block_channel.receiver()),
307            )
308            .in_current_span(),
309        );
310
311        info!("spawning mempool queue checker task");
312        let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
313
314        info!("spawning mempool transaction gossip task");
315        let tx_gossip_task_handle = tokio::spawn(
316            mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set.clone())
317                .in_current_span(),
318        );
319
320        info!("spawning delete old databases task");
321        let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
322            &config.state,
323            &config.network.network,
324        );
325
326        info!("spawning progress logging task");
327        let progress_task_handle = tokio::spawn(
328            show_block_chain_progress(
329                config.network.network.clone(),
330                latest_chain_tip.clone(),
331                sync_status.clone(),
332            )
333            .in_current_span(),
334        );
335
336        // Spawn never ending end of support task.
337        info!("spawning end of support checking task");
338        let end_of_support_task_handle = tokio::spawn(
339            sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
340                .in_current_span(),
341        );
342
343        // Give the inbound service more time to clear its queue,
344        // then start concurrent tasks that can add load to the inbound service
345        // (by opening more peer connections, so those peers send us requests)
346        tokio::task::yield_now().await;
347
348        // The crawler only activates immediately in tests that use mempool debug mode
349        info!("spawning mempool crawler task");
350        let mempool_crawler_task_handle = mempool::Crawler::spawn(
351            &config.mempool,
352            peer_set,
353            mempool.clone(),
354            sync_status.clone(),
355            chain_tip_change.clone(),
356        );
357
358        info!("spawning syncer task");
359        let syncer_task_handle = if is_regtest {
360            if !syncer
361                .state_contains(config.network.network.genesis_hash())
362                .await?
363            {
364                let genesis_hash = block_verifier_router
365                    .clone()
366                    .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
367                    .await
368                    .expect("should validate Regtest genesis block");
369
370                assert_eq!(
371                    genesis_hash,
372                    config.network.network.genesis_hash(),
373                    "validated block hash should match network genesis hash"
374                )
375            }
376
377            tokio::spawn(std::future::pending().in_current_span())
378        } else {
379            tokio::spawn(syncer.sync().in_current_span())
380        };
381
382        // And finally, spawn the internal Zcash miner, if it is enabled.
383        //
384        // TODO: add a config to enable the miner rather than a feature.
385        #[cfg(feature = "internal-miner")]
386        let miner_task_handle = if config.mining.is_internal_miner_enabled() {
387            info!("spawning Zcash miner");
388            components::miner::spawn_init(&config.network.network, &config.mining, rpc_impl)
389        } else {
390            tokio::spawn(std::future::pending().in_current_span())
391        };
392
393        #[cfg(not(feature = "internal-miner"))]
394        // Spawn a dummy miner task which doesn't do anything and never finishes.
395        let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
396            tokio::spawn(std::future::pending().in_current_span());
397
398        info!("spawned initial Zebra tasks");
399
400        // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
401
402        // ongoing tasks
403        pin!(rpc_task_handle);
404        pin!(indexer_rpc_task_handle);
405        pin!(syncer_task_handle);
406        pin!(block_gossip_task_handle);
407        pin!(mempool_crawler_task_handle);
408        pin!(mempool_queue_checker_task_handle);
409        pin!(tx_gossip_task_handle);
410        pin!(progress_task_handle);
411        pin!(end_of_support_task_handle);
412        pin!(miner_task_handle);
413
414        // startup tasks
415        let BackgroundTaskHandles {
416            mut state_checkpoint_verify_handle,
417        } = consensus_task_handles;
418
419        let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
420        pin!(state_checkpoint_verify_handle_fused);
421
422        let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
423        pin!(old_databases_task_handle_fused);
424
425        // Wait for tasks to finish
426        let exit_status = loop {
427            let mut exit_when_task_finishes = true;
428
429            let result = select! {
430                rpc_join_result = &mut rpc_task_handle => {
431                    let rpc_server_result = rpc_join_result
432                        .expect("unexpected panic in the rpc task");
433                    info!(?rpc_server_result, "rpc task exited");
434                    Ok(())
435                }
436
437                rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
438                    rpc_tx_queue_result
439                        .expect("unexpected panic in the rpc transaction queue task");
440                    info!("rpc transaction queue task exited");
441                    Ok(())
442                }
443
444                indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
445                    let indexer_rpc_server_result = indexer_rpc_join_result
446                        .expect("unexpected panic in the indexer task");
447                    info!(?indexer_rpc_server_result, "indexer rpc task exited");
448                    Ok(())
449                }
450
451                sync_result = &mut syncer_task_handle => sync_result
452                    .expect("unexpected panic in the syncer task")
453                    .map(|_| info!("syncer task exited")),
454
455                block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
456                    .expect("unexpected panic in the chain tip block gossip task")
457                    .map(|_| info!("chain tip block gossip task exited"))
458                    .map_err(|e| eyre!(e)),
459
460                mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
461                    .expect("unexpected panic in the mempool crawler")
462                    .map(|_| info!("mempool crawler task exited"))
463                    .map_err(|e| eyre!(e)),
464
465                mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
466                    .expect("unexpected panic in the mempool queue checker")
467                    .map(|_| info!("mempool queue checker task exited"))
468                    .map_err(|e| eyre!(e)),
469
470                tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
471                    .expect("unexpected panic in the transaction gossip task")
472                    .map(|_| info!("transaction gossip task exited"))
473                    .map_err(|e| eyre!(e)),
474
475                // The progress task runs forever, unless it panics.
476                // So we don't need to provide an exit status for it.
477                progress_result = &mut progress_task_handle => {
478                    info!("chain progress task exited");
479                    progress_result
480                        .expect("unexpected panic in the chain progress task");
481                }
482
483                end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
484                    .expect("unexpected panic in the end of support task")
485                    .map(|_| info!("end of support task exited")),
486
487                // We also expect the state checkpoint verify task to finish.
488                state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
489                    state_checkpoint_verify_result
490                        .unwrap_or_else(|_| panic!(
491                            "unexpected panic checking previous state followed the best chain"));
492
493                    exit_when_task_finishes = false;
494                    Ok(())
495                }
496
497                // And the old databases task should finish while Zebra is running.
498                old_databases_result = &mut old_databases_task_handle_fused => {
499                    old_databases_result
500                        .unwrap_or_else(|_| panic!(
501                            "unexpected panic deleting old database directories"));
502
503                    exit_when_task_finishes = false;
504                    Ok(())
505                }
506
507                miner_result = &mut miner_task_handle => miner_result
508                    .expect("unexpected panic in the miner task")
509                    .map(|_| info!("miner task exited")),
510            };
511
512            // Stop Zebra if a task finished and returned an error,
513            // or if an ongoing task exited.
514            if let Err(err) = result {
515                break Err(err);
516            }
517
518            if exit_when_task_finishes {
519                break Ok(());
520            }
521        };
522
523        info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
524
525        // ongoing tasks
526        rpc_task_handle.abort();
527        rpc_tx_queue_handle.abort();
528        syncer_task_handle.abort();
529        block_gossip_task_handle.abort();
530        mempool_crawler_task_handle.abort();
531        mempool_queue_checker_task_handle.abort();
532        tx_gossip_task_handle.abort();
533        progress_task_handle.abort();
534        end_of_support_task_handle.abort();
535        miner_task_handle.abort();
536
537        // startup tasks
538        state_checkpoint_verify_handle.abort();
539        old_databases_task_handle.abort();
540
541        info!(
542            "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
543        );
544
545        exit_status
546    }
547
548    /// Returns the bound for the state service buffer,
549    /// based on the configurations of the services that use the state concurrently.
550    fn state_buffer_bound() -> usize {
551        let config = APPLICATION.config();
552
553        // Ignore the checkpoint verify limit, because it is very large.
554        //
555        // TODO: do we also need to account for concurrent use across services?
556        //       we could multiply the maximum by 3/2, or add a fixed constant
557        [
558            config.sync.download_concurrency_limit,
559            config.sync.full_verify_concurrency_limit,
560            inbound::downloads::MAX_INBOUND_CONCURRENCY,
561            mempool::downloads::MAX_INBOUND_CONCURRENCY,
562        ]
563        .into_iter()
564        .max()
565        .unwrap()
566    }
567}
568
569impl Runnable for StartCmd {
570    /// Start the application.
571    fn run(&self) {
572        info!("Starting zebrad");
573        let rt = APPLICATION
574            .state()
575            .components_mut()
576            .get_downcast_mut::<TokioComponent>()
577            .expect("TokioComponent should be available")
578            .rt
579            .take();
580
581        rt.expect("runtime should not already be taken")
582            .run(self.start());
583
584        info!("stopping zebrad");
585    }
586}
587
588impl config::Override<ZebradConfig> for StartCmd {
589    // Process the given command line options, overriding settings from
590    // a configuration file using explicit flags taken from command-line
591    // arguments.
592    fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
593        if !self.filters.is_empty() {
594            config.tracing.filter = Some(self.filters.join(","));
595        }
596
597        Ok(config)
598    }
599}