zebra_scan/service/scan_task/
executor.rs

1//! The scan task executor
2
3use std::{collections::HashMap, sync::Arc};
4
5use color_eyre::eyre::Report;
6use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
7use tokio::{
8    sync::{
9        mpsc::{Receiver, Sender},
10        watch,
11    },
12    task::JoinHandle,
13};
14use tracing::Instrument;
15use zebra_node_services::scan_service::response::ScanResult;
16
17use super::scan::ScanRangeTaskBuilder;
18
19const EXECUTOR_BUFFER_SIZE: usize = 100;
20
21pub fn spawn_init(
22    subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
23) -> (Sender<ScanRangeTaskBuilder>, JoinHandle<Result<(), Report>>) {
24    let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::channel(EXECUTOR_BUFFER_SIZE);
25
26    (
27        scan_task_sender,
28        tokio::spawn(
29            scan_task_executor(scan_task_receiver, subscribed_keys_receiver).in_current_span(),
30        ),
31    )
32}
33
34pub async fn scan_task_executor(
35    mut scan_task_receiver: Receiver<ScanRangeTaskBuilder>,
36    subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
37) -> Result<(), Report> {
38    let mut scan_range_tasks = FuturesUnordered::new();
39
40    // Push a pending future so that `.next()` will always return `Some`
41    scan_range_tasks.push(tokio::spawn(
42        std::future::pending::<Result<(), Report>>().boxed(),
43    ));
44
45    loop {
46        tokio::select! {
47            Some(scan_range_task) = scan_task_receiver.recv() => {
48                // TODO: Add a long timeout?
49                scan_range_tasks.push(scan_range_task.spawn(subscribed_keys_receiver.clone()));
50            }
51
52            Some(finished_task) = scan_range_tasks.next() => {
53                // Return early if there's an error
54                finished_task.expect("futures unordered with pending future should always return Some")?;
55            }
56        }
57    }
58}