zebra_scan/
init.rs

1//! Initializing the scanner and gRPC server.
2
3use std::{net::SocketAddr, time::Duration};
4
5use color_eyre::Report;
6use tokio::task::JoinHandle;
7use tower::ServiceBuilder;
8
9use tracing::Instrument;
10use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
11use zebra_state::ChainTipChange;
12
13use crate::{scan, service::ScanService, storage::Storage, Config};
14
15/// The timeout applied to scan service calls
16pub const SCAN_SERVICE_TIMEOUT: Duration = Duration::from_secs(30);
17
18/// Initialize [`ScanService`] based on its config.
19///
20/// TODO: add a test for this function.
21pub async fn init_with_server(
22    listen_addr: SocketAddr,
23    config: Config,
24    network: Network,
25    state: scan::State,
26    chain_tip_change: ChainTipChange,
27) -> Result<(), Report> {
28    info!(?config, "starting scan service");
29    let scan_service = ServiceBuilder::new()
30        .buffer(10)
31        .timeout(SCAN_SERVICE_TIMEOUT)
32        .service(ScanService::new(&config, &network, state, chain_tip_change).await);
33
34    // TODO: move this to zebra-grpc init() function and include addr
35    info!(?listen_addr, "starting scan gRPC server");
36
37    // Start the gRPC server.
38    let (server_task, _listen_addr) = zebra_grpc::server::init(listen_addr, scan_service).await?;
39    server_task.await??;
40
41    Ok(())
42}
43
44/// Initialize the scanner and its gRPC server based on its config, and spawn a task for it.
45pub fn spawn_init(
46    config: Config,
47    network: Network,
48    state: scan::State,
49    chain_tip_change: ChainTipChange,
50) -> JoinHandle<Result<(), Report>> {
51    if let Some(listen_addr) = config.listen_addr {
52        // TODO: spawn an entirely new executor here, to avoid timing attacks.
53        tokio::spawn(
54            init_with_server(listen_addr, config, network, state, chain_tip_change)
55                .in_current_span(),
56        )
57    } else {
58        // TODO: spawn an entirely new executor here, to avoid timing attacks.
59        tokio::spawn(
60            async move {
61                let storage =
62                    tokio::task::spawn_blocking(move || Storage::new(&config, &network, false))
63                        .wait_for_panics()
64                        .await;
65                let (_cmd_sender, cmd_receiver) = tokio::sync::mpsc::channel(1);
66                scan::start(state, chain_tip_change, storage, cmd_receiver).await
67            }
68            .in_current_span(),
69        )
70    }
71}