zebrad/components/tokio.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
//! A component owning the Tokio runtime.
//!
//! The tokio runtime is used for:
//! - non-blocking async tasks, via [`Future`]s and
//! - blocking network and file tasks, via [`spawn_blocking`](tokio::task::spawn_blocking).
//!
//! The rayon thread pool is used for:
//! - long-running CPU-bound tasks like cryptography, via [`rayon::spawn_fifo`].
#![allow(non_local_definitions)]
use std::{future::Future, time::Duration};
use abscissa_core::{Component, FrameworkError, Shutdown};
use color_eyre::Report;
use tokio::runtime::Runtime;
use crate::prelude::*;
/// When Zebra is shutting down, wait this long for tokio tasks to finish.
const TOKIO_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20);
/// An Abscissa component which owns a Tokio runtime.
///
/// The runtime is stored as an `Option` so that when it's time to enter an async
/// context by calling `block_on` with a "root future", the runtime can be taken
/// independently of Abscissa's component locking system. Otherwise whatever
/// calls `block_on` holds an application lock for the entire lifetime of the
/// async context.
#[derive(Component, Debug)]
pub struct TokioComponent {
pub rt: Option<Runtime>,
}
impl TokioComponent {
#[allow(clippy::unwrap_in_result)]
pub fn new() -> Result<Self, FrameworkError> {
Ok(Self {
rt: Some(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("runtime building should not fail"),
),
})
}
}
/// Zebrad's graceful shutdown function, blocks until one of the supported
/// shutdown signals is received.
async fn shutdown() {
imp::shutdown().await;
}
/// Extension trait to centralize entry point for runnable subcommands that
/// depend on tokio
pub(crate) trait RuntimeRun {
fn run(self, fut: impl Future<Output = Result<(), Report>>);
}
impl RuntimeRun for Runtime {
fn run(self, fut: impl Future<Output = Result<(), Report>>) {
let result = self.block_on(async move {
// Always poll the shutdown future first.
//
// Otherwise, a busy Zebra instance could starve the shutdown future,
// and delay shutting down.
tokio::select! {
biased;
_ = shutdown() => Ok(()),
result = fut => result,
}
});
// Don't wait for long blocking tasks before shutting down
info!(
?TOKIO_SHUTDOWN_TIMEOUT,
"waiting for async tokio tasks to shut down"
);
self.shutdown_timeout(TOKIO_SHUTDOWN_TIMEOUT);
match result {
Ok(()) => {
info!("shutting down Zebra");
}
Err(error) => {
warn!(?error, "shutting down Zebra due to an error");
APPLICATION.shutdown(Shutdown::Forced);
}
}
}
}
#[cfg(unix)]
mod imp {
use tokio::signal::unix::{signal, SignalKind};
pub(super) async fn shutdown() {
// If both signals are received, select! chooses one of them at random.
tokio::select! {
// SIGINT - Terminal interrupt signal. Typically generated by shells in response to Ctrl-C.
_ = sig(SignalKind::interrupt(), "SIGINT") => {}
// SIGTERM - Standard shutdown signal used by process launchers.
_ = sig(SignalKind::terminate(), "SIGTERM") => {}
};
}
#[instrument]
async fn sig(kind: SignalKind, name: &'static str) {
// Create a Future that completes the first
// time the process receives 'sig'.
signal(kind)
.expect("Failed to register signal handler")
.recv()
.await;
zebra_chain::shutdown::set_shutting_down();
#[cfg(feature = "progress-bar")]
howudoin::disable();
info!(
// use target to remove 'imp' from output
target: "zebrad::signal",
"received {}, starting shutdown",
name,
);
}
}
#[cfg(not(unix))]
mod imp {
pub(super) async fn shutdown() {
// Wait for Ctrl-C in Windows terminals.
// (Zebra doesn't support NT Service control messages. Use a service wrapper for long-running instances.)
tokio::signal::ctrl_c()
.await
.expect("listening for ctrl-c signal should never fail");
zebra_chain::shutdown::set_shutting_down();
#[cfg(feature = "progress-bar")]
howudoin::disable();
info!(
// use target to remove 'imp' from output
target: "zebrad::signal",
"received Ctrl-C, starting shutdown",
);
}
}