zebrad/components/
tokio.rs

1//! A component owning the Tokio runtime.
2//!
3//! The tokio runtime is used for:
4//! - non-blocking async tasks, via [`Future`]s and
5//! - blocking network and file tasks, via [`spawn_blocking`](tokio::task::spawn_blocking).
6//!
7//! The rayon thread pool is used for:
8//! - long-running CPU-bound tasks like cryptography, via [`rayon::spawn_fifo`].
9
10#![allow(non_local_definitions)]
11
12use std::{future::Future, time::Duration};
13
14use abscissa_core::{Component, FrameworkError, Shutdown};
15use color_eyre::Report;
16use tokio::runtime::Runtime;
17
18use crate::prelude::*;
19
20/// When Zebra is shutting down, wait this long for tokio tasks to finish.
21const TOKIO_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20);
22
23/// An Abscissa component which owns a Tokio runtime.
24///
25/// The runtime is stored as an `Option` so that when it's time to enter an async
26/// context by calling `block_on` with a "root future", the runtime can be taken
27/// independently of Abscissa's component locking system. Otherwise whatever
28/// calls `block_on` holds an application lock for the entire lifetime of the
29/// async context.
30#[derive(Component, Debug)]
31pub struct TokioComponent {
32    pub rt: Option<Runtime>,
33}
34
35impl TokioComponent {
36    #[allow(clippy::unwrap_in_result)]
37    pub fn new() -> Result<Self, FrameworkError> {
38        Ok(Self {
39            rt: Some(
40                tokio::runtime::Builder::new_multi_thread()
41                    .enable_all()
42                    .build()
43                    .expect("runtime building should not fail"),
44            ),
45        })
46    }
47}
48
49/// Zebrad's graceful shutdown function, blocks until one of the supported
50/// shutdown signals is received.
51async fn shutdown() {
52    imp::shutdown().await;
53}
54
55/// Extension trait to centralize entry point for runnable subcommands that
56/// depend on tokio
57pub(crate) trait RuntimeRun {
58    fn run(self, fut: impl Future<Output = Result<(), Report>>);
59}
60
61impl RuntimeRun for Runtime {
62    fn run(self, fut: impl Future<Output = Result<(), Report>>) {
63        let result = self.block_on(async move {
64            // Always poll the shutdown future first.
65            //
66            // Otherwise, a busy Zebra instance could starve the shutdown future,
67            // and delay shutting down.
68            tokio::select! {
69                biased;
70                _ = shutdown() => Ok(()),
71                result = fut => result,
72            }
73        });
74
75        // Don't wait for long blocking tasks before shutting down
76        info!(
77            ?TOKIO_SHUTDOWN_TIMEOUT,
78            "waiting for async tokio tasks to shut down"
79        );
80        self.shutdown_timeout(TOKIO_SHUTDOWN_TIMEOUT);
81
82        match result {
83            Ok(()) => {
84                info!("shutting down Zebra");
85            }
86            Err(error) => {
87                warn!(?error, "shutting down Zebra due to an error");
88                APPLICATION.shutdown(Shutdown::Forced);
89            }
90        }
91    }
92}
93
94#[cfg(unix)]
95mod imp {
96    use tokio::signal::unix::{signal, SignalKind};
97
98    pub(super) async fn shutdown() {
99        // If both signals are received, select! chooses one of them at random.
100        tokio::select! {
101            // SIGINT  - Terminal interrupt signal. Typically generated by shells in response to Ctrl-C.
102            _ = sig(SignalKind::interrupt(), "SIGINT") => {}
103            // SIGTERM - Standard shutdown signal used by process launchers.
104            _ = sig(SignalKind::terminate(), "SIGTERM") => {}
105        };
106    }
107
108    #[instrument]
109    async fn sig(kind: SignalKind, name: &'static str) {
110        // Create a Future that completes the first
111        // time the process receives 'sig'.
112        signal(kind)
113            .expect("Failed to register signal handler")
114            .recv()
115            .await;
116
117        zebra_chain::shutdown::set_shutting_down();
118
119        #[cfg(feature = "progress-bar")]
120        howudoin::disable();
121
122        info!(
123            // use target to remove 'imp' from output
124            target: "zebrad::signal",
125            "received {}, starting shutdown",
126            name,
127        );
128    }
129}
130
131#[cfg(not(unix))]
132mod imp {
133
134    pub(super) async fn shutdown() {
135        //  Wait for Ctrl-C in Windows terminals.
136        // (Zebra doesn't support NT Service control messages. Use a service wrapper for long-running instances.)
137        tokio::signal::ctrl_c()
138            .await
139            .expect("listening for ctrl-c signal should never fail");
140
141        zebra_chain::shutdown::set_shutting_down();
142
143        #[cfg(feature = "progress-bar")]
144        howudoin::disable();
145
146        info!(
147            // use target to remove 'imp' from output
148            target: "zebrad::signal",
149            "received Ctrl-C, starting shutdown",
150        );
151    }
152}