zebra_scan/service/scan_task/
executor.rs
1use std::{collections::HashMap, sync::Arc};
4
5use color_eyre::eyre::Report;
6use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
7use tokio::{
8 sync::{
9 mpsc::{Receiver, Sender},
10 watch,
11 },
12 task::JoinHandle,
13};
14use tracing::Instrument;
15use zebra_node_services::scan_service::response::ScanResult;
16
17use super::scan::ScanRangeTaskBuilder;
18
19const EXECUTOR_BUFFER_SIZE: usize = 100;
20
21pub fn spawn_init(
22 subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
23) -> (Sender<ScanRangeTaskBuilder>, JoinHandle<Result<(), Report>>) {
24 let (scan_task_sender, scan_task_receiver) = tokio::sync::mpsc::channel(EXECUTOR_BUFFER_SIZE);
25
26 (
27 scan_task_sender,
28 tokio::spawn(
29 scan_task_executor(scan_task_receiver, subscribed_keys_receiver).in_current_span(),
30 ),
31 )
32}
33
34pub async fn scan_task_executor(
35 mut scan_task_receiver: Receiver<ScanRangeTaskBuilder>,
36 subscribed_keys_receiver: watch::Receiver<Arc<HashMap<String, Sender<ScanResult>>>>,
37) -> Result<(), Report> {
38 let mut scan_range_tasks = FuturesUnordered::new();
39
40 scan_range_tasks.push(tokio::spawn(
42 std::future::pending::<Result<(), Report>>().boxed(),
43 ));
44
45 loop {
46 tokio::select! {
47 Some(scan_range_task) = scan_task_receiver.recv() => {
48 scan_range_tasks.push(scan_range_task.spawn(subscribed_keys_receiver.clone()));
50 }
51
52 Some(finished_task) = scan_range_tasks.next() => {
53 finished_task.expect("futures unordered with pending future should always return Some")?;
55 }
56 }
57 }
58}