zebrad/commands/
start.rs

1//! `start` subcommand - entry point for starting a zebra node
2//!
3//! ## Application Structure
4//!
5//! A zebra node consists of the following major services and tasks:
6//!
7//! Peers:
8//!  * Peer Connection Pool Service
9//!    * primary external interface for outbound requests from this node to remote peers
10//!    * accepts requests from services and tasks in this node, and sends them to remote peers
11//!  * Peer Discovery Service
12//!    * maintains a list of peer addresses, and connection priority metadata
13//!    * discovers new peer addresses from existing peer connections
14//!    * initiates new outbound peer connections in response to demand from tasks within this node
15//!  * Peer Cache Service
16//!    * Reads previous peer cache on startup, and adds it to the configured DNS seed peers
17//!    * Periodically updates the peer cache on disk from the latest address book state
18//!
19//! Blocks & Mempool Transactions:
20//!  * Consensus Service
21//!    * handles all validation logic for the node
22//!    * verifies blocks using zebra-chain, then stores verified blocks in zebra-state
23//!    * verifies mempool and block transactions using zebra-chain and zebra-script,
24//!      and returns verified mempool transactions for mempool storage
25//!  * Inbound Service
26//!    * primary external interface for inbound peer requests to this node
27//!    * handles requests from peers for network data, chain data, and mempool transactions
28//!    * spawns download and verify tasks for each gossiped block
29//!    * sends gossiped transactions to the mempool service
30//!
31//! Blocks:
32//!  * Sync Task
33//!    * runs in the background and continuously queries the network for
34//!      new blocks to be verified and added to the local state
35//!    * spawns download and verify tasks for each crawled block
36//!  * State Service
37//!    * contextually verifies blocks
38//!    * handles in-memory storage of multiple non-finalized chains
39//!    * handles permanent storage of the best finalized chain
40//!  * Old State Version Cleanup Task
41//!    * deletes outdated state versions
42//!  * Block Gossip Task
43//!    * runs in the background and continuously queries the state for
44//!      newly committed blocks to be gossiped to peers
45//!  * Progress Task
46//!    * logs progress towards the chain tip
47//!
48//! Block Mining:
49//!  * Internal Miner Task
50//!    * if the user has configured Zebra to mine blocks, spawns tasks to generate new blocks,
51//!      and submits them for verification. This automatically shares these new blocks with peers.
52//!
53//! Mempool Transactions:
54//!  * Mempool Service
55//!    * activates when the syncer is near the chain tip
56//!    * spawns download and verify tasks for each crawled or gossiped transaction
57//!    * handles in-memory storage of unmined transactions
58//!  * Queue Checker Task
59//!    * runs in the background, polling the mempool to store newly verified transactions
60//!  * Transaction Gossip Task
61//!    * runs in the background and gossips newly added mempool transactions
62//!      to peers
63//!
64//! Remote Procedure Calls:
65//!  * JSON-RPC Service
66//!    * answers RPC client requests using the State Service and Mempool Service
67//!    * submits client transactions to the node's mempool
68//!
69//! Zebra also has diagnostic support:
70//! * [metrics](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/metrics.md)
71//! * [tracing](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/tracing.md)
72//! * [progress-bar](https://docs.rs/howudoin/0.1.1/howudoin)
73//!
74//! Some of the diagnostic features are optional, and need to be enabled at compile-time.
75
76use std::sync::Arc;
77
78use abscissa_core::{config, Command, FrameworkError};
79use color_eyre::eyre::{eyre, Report};
80use futures::FutureExt;
81use tokio::{pin, select, sync::oneshot};
82use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
83use tracing_futures::Instrument;
84
85use zebra_chain::block::genesis::regtest_genesis_block;
86use zebra_consensus::{router::BackgroundTaskHandles, ParameterCheckpoint};
87use zebra_rpc::{methods::RpcImpl, server::RpcServer, SubmitBlockChannel};
88
89use crate::{
90    application::{build_version, user_agent, LAST_WARN_ERROR_LOG_SENDER},
91    components::{
92        inbound::{self, InboundSetupData, MAX_INBOUND_RESPONSE_TIME},
93        mempool::{self, Mempool},
94        sync::{self, show_block_chain_progress, VERIFICATION_PIPELINE_SCALING_MULTIPLIER},
95        tokio::{RuntimeRun, TokioComponent},
96        ChainSync, Inbound,
97    },
98    config::ZebradConfig,
99    prelude::*,
100};
101
102#[cfg(feature = "internal-miner")]
103use crate::components;
104
105/// Start the application (default command)
106#[derive(Command, Debug, Default, clap::Parser)]
107pub struct StartCmd {
108    /// Filter strings which override the config file and defaults
109    #[clap(help = "tracing filters which override the zebrad.toml config")]
110    filters: Vec<String>,
111}
112
113impl StartCmd {
114    async fn start(&self) -> Result<(), Report> {
115        let config = APPLICATION.config();
116        let is_regtest = config.network.network.is_regtest();
117
118        let config = if is_regtest {
119            Arc::new(ZebradConfig {
120                mempool: mempool::Config {
121                    debug_enable_at_height: Some(0),
122                    ..config.mempool
123                },
124                ..Arc::unwrap_or_clone(config)
125            })
126        } else {
127            config
128        };
129
130        info!("initializing node state");
131        let (_, max_checkpoint_height) = zebra_consensus::router::init_checkpoint_list(
132            config.consensus.clone(),
133            &config.network.network,
134        );
135
136        info!("opening database, this may take a few minutes");
137
138        let (state_service, read_only_state_service, latest_chain_tip, chain_tip_change) =
139            zebra_state::spawn_init(
140                config.state.clone(),
141                &config.network.network,
142                max_checkpoint_height,
143                config.sync.checkpoint_verify_concurrency_limit
144                    * (VERIFICATION_PIPELINE_SCALING_MULTIPLIER + 1),
145            )
146            .await?;
147
148        info!("logging database metrics on startup");
149        read_only_state_service.log_db_metrics();
150
151        let state = ServiceBuilder::new()
152            .buffer(Self::state_buffer_bound())
153            .service(state_service);
154
155        info!("initializing network");
156        // The service that our node uses to respond to requests by peers. The
157        // load_shed middleware ensures that we reduce the size of the peer set
158        // in response to excess load.
159        //
160        // # Security
161        //
162        // This layer stack is security-sensitive, modifying it can cause hangs,
163        // or enable denial of service attacks.
164        //
165        // See `zebra_network::Connection::drive_peer_request()` for details.
166        let (setup_tx, setup_rx) = oneshot::channel();
167        let inbound = ServiceBuilder::new()
168            .load_shed()
169            .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
170            .timeout(MAX_INBOUND_RESPONSE_TIME)
171            .service(Inbound::new(
172                config.sync.full_verify_concurrency_limit,
173                setup_rx,
174            ));
175
176        let (peer_set, address_book, misbehavior_sender) = zebra_network::init(
177            config.network.clone(),
178            inbound,
179            latest_chain_tip.clone(),
180            user_agent(),
181        )
182        .await;
183
184        info!("initializing verifiers");
185        let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
186        let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
187            zebra_consensus::router::init(
188                config.consensus.clone(),
189                &config.network.network,
190                state.clone(),
191                tx_verifier_setup_rx,
192            )
193            .await;
194
195        info!("initializing syncer");
196        let (mut syncer, sync_status) = ChainSync::new(
197            &config,
198            max_checkpoint_height,
199            peer_set.clone(),
200            block_verifier_router.clone(),
201            state.clone(),
202            latest_chain_tip.clone(),
203            misbehavior_sender.clone(),
204        );
205
206        info!("initializing mempool");
207        let (mempool, mempool_transaction_subscriber) = Mempool::new(
208            &config.mempool,
209            peer_set.clone(),
210            state.clone(),
211            tx_verifier,
212            sync_status.clone(),
213            latest_chain_tip.clone(),
214            chain_tip_change.clone(),
215            misbehavior_sender.clone(),
216        );
217        let mempool = BoxService::new(mempool);
218        let mempool = ServiceBuilder::new()
219            .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
220            .service(mempool);
221
222        if tx_verifier_setup_tx.send(mempool.clone()).is_err() {
223            warn!("error setting up the transaction verifier with a handle to the mempool service");
224        };
225
226        info!("fully initializing inbound peer request handler");
227        // Fully start the inbound service as soon as possible
228        let setup_data = InboundSetupData {
229            address_book: address_book.clone(),
230            block_download_peer_set: peer_set.clone(),
231            block_verifier: block_verifier_router.clone(),
232            mempool: mempool.clone(),
233            state: state.clone(),
234            latest_chain_tip: latest_chain_tip.clone(),
235            misbehavior_sender,
236        };
237        setup_tx
238            .send(setup_data)
239            .map_err(|_| eyre!("could not send setup data to inbound service"))?;
240        // And give it time to clear its queue
241        tokio::task::yield_now().await;
242
243        // Create a channel to send mined blocks to the gossip task
244        let submit_block_channel = SubmitBlockChannel::new();
245
246        // Launch RPC server
247        let (rpc_impl, mut rpc_tx_queue_handle) = RpcImpl::new(
248            config.network.network.clone(),
249            config.mining.clone(),
250            config.rpc.debug_force_finished_sync,
251            build_version(),
252            user_agent(),
253            mempool.clone(),
254            state.clone(),
255            read_only_state_service.clone(),
256            block_verifier_router.clone(),
257            sync_status.clone(),
258            latest_chain_tip.clone(),
259            address_book.clone(),
260            LAST_WARN_ERROR_LOG_SENDER.subscribe(),
261            Some(submit_block_channel.sender()),
262        );
263
264        let rpc_task_handle = if config.rpc.listen_addr.is_some() {
265            RpcServer::start(rpc_impl.clone(), config.rpc.clone())
266                .await
267                .expect("server should start")
268        } else {
269            tokio::spawn(std::future::pending().in_current_span())
270        };
271
272        // TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
273        //       any related unit tests sometimes crash with memory errors
274        let indexer_rpc_task_handle = {
275            if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr {
276                info!("spawning indexer RPC server");
277                let (indexer_rpc_task_handle, _listen_addr) = zebra_rpc::indexer::server::init(
278                    indexer_listen_addr,
279                    read_only_state_service.clone(),
280                    latest_chain_tip.clone(),
281                    mempool_transaction_subscriber.clone(),
282                )
283                .await
284                .map_err(|err| eyre!(err))?;
285
286                indexer_rpc_task_handle
287            } else {
288                warn!("configure an indexer_listen_addr to start the indexer RPC server");
289                tokio::spawn(std::future::pending().in_current_span())
290            }
291        };
292
293        // Start concurrent tasks which don't add load to other tasks
294        info!("spawning block gossip task");
295        let block_gossip_task_handle = tokio::spawn(
296            sync::gossip_best_tip_block_hashes(
297                sync_status.clone(),
298                chain_tip_change.clone(),
299                peer_set.clone(),
300                Some(submit_block_channel.receiver()),
301            )
302            .in_current_span(),
303        );
304
305        info!("spawning mempool queue checker task");
306        let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());
307
308        info!("spawning mempool transaction gossip task");
309        let tx_gossip_task_handle = tokio::spawn(
310            mempool::gossip_mempool_transaction_id(
311                mempool_transaction_subscriber.subscribe(),
312                peer_set.clone(),
313            )
314            .in_current_span(),
315        );
316
317        info!("spawning delete old databases task");
318        let mut old_databases_task_handle = zebra_state::check_and_delete_old_state_databases(
319            &config.state,
320            &config.network.network,
321        );
322
323        info!("spawning progress logging task");
324        let progress_task_handle = tokio::spawn(
325            show_block_chain_progress(
326                config.network.network.clone(),
327                latest_chain_tip.clone(),
328                sync_status.clone(),
329            )
330            .in_current_span(),
331        );
332
333        // Spawn never ending end of support task.
334        info!("spawning end of support checking task");
335        let end_of_support_task_handle = tokio::spawn(
336            sync::end_of_support::start(config.network.network.clone(), latest_chain_tip.clone())
337                .in_current_span(),
338        );
339
340        // Give the inbound service more time to clear its queue,
341        // then start concurrent tasks that can add load to the inbound service
342        // (by opening more peer connections, so those peers send us requests)
343        tokio::task::yield_now().await;
344
345        // The crawler only activates immediately in tests that use mempool debug mode
346        info!("spawning mempool crawler task");
347        let mempool_crawler_task_handle = mempool::Crawler::spawn(
348            &config.mempool,
349            peer_set,
350            mempool.clone(),
351            sync_status.clone(),
352            chain_tip_change.clone(),
353        );
354
355        info!("spawning syncer task");
356        let syncer_task_handle = if is_regtest {
357            if !syncer
358                .state_contains(config.network.network.genesis_hash())
359                .await?
360            {
361                let genesis_hash = block_verifier_router
362                    .clone()
363                    .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
364                    .await
365                    .expect("should validate Regtest genesis block");
366
367                assert_eq!(
368                    genesis_hash,
369                    config.network.network.genesis_hash(),
370                    "validated block hash should match network genesis hash"
371                )
372            }
373
374            tokio::spawn(std::future::pending().in_current_span())
375        } else {
376            tokio::spawn(syncer.sync().in_current_span())
377        };
378
379        // And finally, spawn the internal Zcash miner, if it is enabled.
380        //
381        // TODO: add a config to enable the miner rather than a feature.
382        #[cfg(feature = "internal-miner")]
383        let miner_task_handle = if config.mining.is_internal_miner_enabled() {
384            info!("spawning Zcash miner");
385            components::miner::spawn_init(&config.metrics, rpc_impl)
386        } else {
387            tokio::spawn(std::future::pending().in_current_span())
388        };
389
390        #[cfg(not(feature = "internal-miner"))]
391        // Spawn a dummy miner task which doesn't do anything and never finishes.
392        let miner_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
393            tokio::spawn(std::future::pending().in_current_span());
394
395        info!("spawned initial Zebra tasks");
396
397        // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
398
399        // ongoing tasks
400        pin!(rpc_task_handle);
401        pin!(indexer_rpc_task_handle);
402        pin!(syncer_task_handle);
403        pin!(block_gossip_task_handle);
404        pin!(mempool_crawler_task_handle);
405        pin!(mempool_queue_checker_task_handle);
406        pin!(tx_gossip_task_handle);
407        pin!(progress_task_handle);
408        pin!(end_of_support_task_handle);
409        pin!(miner_task_handle);
410
411        // startup tasks
412        let BackgroundTaskHandles {
413            mut state_checkpoint_verify_handle,
414        } = consensus_task_handles;
415
416        let state_checkpoint_verify_handle_fused = (&mut state_checkpoint_verify_handle).fuse();
417        pin!(state_checkpoint_verify_handle_fused);
418
419        let old_databases_task_handle_fused = (&mut old_databases_task_handle).fuse();
420        pin!(old_databases_task_handle_fused);
421
422        // Wait for tasks to finish
423        let exit_status = loop {
424            let mut exit_when_task_finishes = true;
425
426            let result = select! {
427                rpc_join_result = &mut rpc_task_handle => {
428                    let rpc_server_result = rpc_join_result
429                        .expect("unexpected panic in the rpc task");
430                    info!(?rpc_server_result, "rpc task exited");
431                    Ok(())
432                }
433
434                rpc_tx_queue_result = &mut rpc_tx_queue_handle => {
435                    rpc_tx_queue_result
436                        .expect("unexpected panic in the rpc transaction queue task");
437                    info!("rpc transaction queue task exited");
438                    Ok(())
439                }
440
441                indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
442                    let indexer_rpc_server_result = indexer_rpc_join_result
443                        .expect("unexpected panic in the indexer task");
444                    info!(?indexer_rpc_server_result, "indexer rpc task exited");
445                    Ok(())
446                }
447
448                sync_result = &mut syncer_task_handle => sync_result
449                    .expect("unexpected panic in the syncer task")
450                    .map(|_| info!("syncer task exited")),
451
452                block_gossip_result = &mut block_gossip_task_handle => block_gossip_result
453                    .expect("unexpected panic in the chain tip block gossip task")
454                    .map(|_| info!("chain tip block gossip task exited"))
455                    .map_err(|e| eyre!(e)),
456
457                mempool_crawl_result = &mut mempool_crawler_task_handle => mempool_crawl_result
458                    .expect("unexpected panic in the mempool crawler")
459                    .map(|_| info!("mempool crawler task exited"))
460                    .map_err(|e| eyre!(e)),
461
462                mempool_queue_result = &mut mempool_queue_checker_task_handle => mempool_queue_result
463                    .expect("unexpected panic in the mempool queue checker")
464                    .map(|_| info!("mempool queue checker task exited"))
465                    .map_err(|e| eyre!(e)),
466
467                tx_gossip_result = &mut tx_gossip_task_handle => tx_gossip_result
468                    .expect("unexpected panic in the transaction gossip task")
469                    .map(|_| info!("transaction gossip task exited"))
470                    .map_err(|e| eyre!(e)),
471
472                // The progress task runs forever, unless it panics.
473                // So we don't need to provide an exit status for it.
474                progress_result = &mut progress_task_handle => {
475                    info!("chain progress task exited");
476                    progress_result
477                        .expect("unexpected panic in the chain progress task");
478                }
479
480                end_of_support_result = &mut end_of_support_task_handle => end_of_support_result
481                    .expect("unexpected panic in the end of support task")
482                    .map(|_| info!("end of support task exited")),
483
484                // We also expect the state checkpoint verify task to finish.
485                state_checkpoint_verify_result = &mut state_checkpoint_verify_handle_fused => {
486                    state_checkpoint_verify_result
487                        .unwrap_or_else(|_| panic!(
488                            "unexpected panic checking previous state followed the best chain"));
489
490                    exit_when_task_finishes = false;
491                    Ok(())
492                }
493
494                // And the old databases task should finish while Zebra is running.
495                old_databases_result = &mut old_databases_task_handle_fused => {
496                    old_databases_result
497                        .unwrap_or_else(|_| panic!(
498                            "unexpected panic deleting old database directories"));
499
500                    exit_when_task_finishes = false;
501                    Ok(())
502                }
503
504                miner_result = &mut miner_task_handle => miner_result
505                    .expect("unexpected panic in the miner task")
506                    .map(|_| info!("miner task exited")),
507            };
508
509            // Stop Zebra if a task finished and returned an error,
510            // or if an ongoing task exited.
511            if let Err(err) = result {
512                break Err(err);
513            }
514
515            if exit_when_task_finishes {
516                break Ok(());
517            }
518        };
519
520        info!("exiting Zebra because an ongoing task exited: asking other tasks to stop");
521
522        // ongoing tasks
523        rpc_task_handle.abort();
524        rpc_tx_queue_handle.abort();
525        syncer_task_handle.abort();
526        block_gossip_task_handle.abort();
527        mempool_crawler_task_handle.abort();
528        mempool_queue_checker_task_handle.abort();
529        tx_gossip_task_handle.abort();
530        progress_task_handle.abort();
531        end_of_support_task_handle.abort();
532        miner_task_handle.abort();
533
534        // startup tasks
535        state_checkpoint_verify_handle.abort();
536        old_databases_task_handle.abort();
537
538        info!(
539            "exiting Zebra: all tasks have been asked to stop, waiting for remaining tasks to finish"
540        );
541
542        exit_status
543    }
544
545    /// Returns the bound for the state service buffer,
546    /// based on the configurations of the services that use the state concurrently.
547    fn state_buffer_bound() -> usize {
548        let config = APPLICATION.config();
549
550        // Ignore the checkpoint verify limit, because it is very large.
551        //
552        // TODO: do we also need to account for concurrent use across services?
553        //       we could multiply the maximum by 3/2, or add a fixed constant
554        [
555            config.sync.download_concurrency_limit,
556            config.sync.full_verify_concurrency_limit,
557            inbound::downloads::MAX_INBOUND_CONCURRENCY,
558            mempool::downloads::MAX_INBOUND_CONCURRENCY,
559        ]
560        .into_iter()
561        .max()
562        .unwrap()
563    }
564}
565
566impl Runnable for StartCmd {
567    /// Start the application.
568    fn run(&self) {
569        info!("Starting zebrad");
570        let rt = APPLICATION
571            .state()
572            .components_mut()
573            .get_downcast_mut::<TokioComponent>()
574            .expect("TokioComponent should be available")
575            .rt
576            .take();
577
578        rt.expect("runtime should not already be taken")
579            .run(self.start());
580
581        info!("stopping zebrad");
582    }
583}
584
585impl config::Override<ZebradConfig> for StartCmd {
586    // Process the given command line options, overriding settings from
587    // a configuration file using explicit flags taken from command-line
588    // arguments.
589    fn override_config(&self, mut config: ZebradConfig) -> Result<ZebradConfig, FrameworkError> {
590        if !self.filters.is_empty() {
591            config.tracing.filter = Some(self.filters.join(","));
592        }
593
594        Ok(config)
595    }
596}