zebra_scan/
init.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//! Initializing the scanner and gRPC server.

use std::{net::SocketAddr, time::Duration};

use color_eyre::Report;
use tokio::task::JoinHandle;
use tower::ServiceBuilder;

use tracing::Instrument;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};
use zebra_state::ChainTipChange;

use crate::{scan, service::ScanService, storage::Storage, Config};

/// The timeout applied to scan service calls
pub const SCAN_SERVICE_TIMEOUT: Duration = Duration::from_secs(30);

/// Initialize [`ScanService`] based on its config.
///
/// TODO: add a test for this function.
pub async fn init_with_server(
    listen_addr: SocketAddr,
    config: Config,
    network: Network,
    state: scan::State,
    chain_tip_change: ChainTipChange,
) -> Result<(), Report> {
    info!(?config, "starting scan service");
    let scan_service = ServiceBuilder::new()
        .buffer(10)
        .timeout(SCAN_SERVICE_TIMEOUT)
        .service(ScanService::new(&config, &network, state, chain_tip_change).await);

    // TODO: move this to zebra-grpc init() function and include addr
    info!(?listen_addr, "starting scan gRPC server");

    // Start the gRPC server.
    let (server_task, _listen_addr) = zebra_grpc::server::init(listen_addr, scan_service).await?;
    server_task.await??;

    Ok(())
}

/// Initialize the scanner and its gRPC server based on its config, and spawn a task for it.
pub fn spawn_init(
    config: Config,
    network: Network,
    state: scan::State,
    chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), Report>> {
    if let Some(listen_addr) = config.listen_addr {
        // TODO: spawn an entirely new executor here, to avoid timing attacks.
        tokio::spawn(
            init_with_server(listen_addr, config, network, state, chain_tip_change)
                .in_current_span(),
        )
    } else {
        // TODO: spawn an entirely new executor here, to avoid timing attacks.
        tokio::spawn(
            async move {
                let storage =
                    tokio::task::spawn_blocking(move || Storage::new(&config, &network, false))
                        .wait_for_panics()
                        .await;
                let (_cmd_sender, cmd_receiver) = tokio::sync::mpsc::channel(1);
                scan::start(state, chain_tip_change, storage, cmd_receiver).await
            }
            .in_current_span(),
        )
    }
}