zebra_scan/
service.rs

1//! [`tower::Service`] for zebra-scan.
2
3use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration};
4
5use futures::future::FutureExt;
6use tower::{BoxError, Service};
7
8use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash};
9
10use zebra_state::ChainTipChange;
11
12use crate::{scan, storage::Storage, Config, Request, Response};
13
14#[cfg(test)]
15mod tests;
16
17pub mod scan_task;
18
19pub use scan_task::{ScanTask, ScanTaskCommand};
20
21#[cfg(any(test, feature = "proptest-impl"))]
22use tokio::sync::mpsc::Receiver;
23
24/// Zebra-scan [`tower::Service`]
25#[derive(Debug)]
26pub struct ScanService {
27    /// On-disk storage
28    pub db: Storage,
29
30    /// Handle to scan task that's responsible for writing results
31    scan_task: ScanTask,
32}
33
34/// A timeout applied to `DeleteKeys` requests.
35///
36/// This should be shorter than [`SCAN_SERVICE_TIMEOUT`](crate::init::SCAN_SERVICE_TIMEOUT) so the
37/// request can try to delete entries from storage after the timeout before the future is dropped.
38const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);
39
40impl ScanService {
41    /// Create a new [`ScanService`].
42    pub async fn new(
43        config: &Config,
44        network: &Network,
45        state: scan::State,
46        chain_tip_change: ChainTipChange,
47    ) -> Self {
48        let config = config.clone();
49        let network = network.clone();
50        let storage = tokio::task::spawn_blocking(move || Storage::new(&config, &network, false))
51            .wait_for_panics()
52            .await;
53
54        Self {
55            scan_task: ScanTask::spawn(storage.clone(), state, chain_tip_change),
56            db: storage,
57        }
58    }
59
60    /// Create a new [`ScanService`] with a mock `ScanTask`
61    // TODO: Move this to tests behind `cfg(any(test, feature = "proptest-impl"))`
62    #[cfg(any(test, feature = "proptest-impl"))]
63    pub fn new_with_mock_scanner(db: Storage) -> (Self, Receiver<ScanTaskCommand>) {
64        let (scan_task, cmd_receiver) = ScanTask::mock();
65        (Self { db, scan_task }, cmd_receiver)
66    }
67}
68
69impl Service<Request> for ScanService {
70    type Response = Response;
71    type Error = BoxError;
72    type Future =
73        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
74
75    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
76        // TODO: If scan task returns an error, add error to the panic message
77        assert!(
78            !self.scan_task.handle.is_finished(),
79            "scan task finished unexpectedly"
80        );
81
82        self.db.check_for_panics();
83
84        Poll::Ready(Ok(()))
85    }
86
87    fn call(&mut self, req: Request) -> Self::Future {
88        if let Err(error) = req.check() {
89            return async move { Err(error) }.boxed();
90        }
91
92        match req {
93            Request::Info => {
94                let db = self.db.clone();
95
96                async move {
97                    Ok(Response::Info {
98                        min_sapling_birthday_height: db.network().sapling_activation_height(),
99                    })
100                }
101                .boxed()
102            }
103
104            Request::RegisterKeys(keys) => {
105                let mut scan_task = self.scan_task.clone();
106
107                async move {
108                    let newly_registered_keys = scan_task.register_keys(keys)?.await?;
109                    if !newly_registered_keys.is_empty() {
110                        Ok(Response::RegisteredKeys(newly_registered_keys))
111                    } else {
112                        Err("no keys were registered, check that keys are not already registered and \
113                        are valid Sapling extended full viewing keys".into())
114                    }
115                }
116                .boxed()
117            }
118
119            Request::DeleteKeys(keys) => {
120                let mut db = self.db.clone();
121                let mut scan_task = self.scan_task.clone();
122
123                async move {
124                    // Wait for a message to confirm that the scan task has removed the key up to `DELETE_KEY_TIMEOUT`
125                    let remove_keys_result = tokio::time::timeout(
126                        DELETE_KEY_TIMEOUT,
127                        scan_task.remove_keys(keys.clone())?,
128                    )
129                    .await
130                    .map_err(|_| "request timed out removing keys from scan task".to_string());
131
132                    // Delete the key from the database after either confirmation that it's been removed from the scan task, or
133                    // waiting `DELETE_KEY_TIMEOUT`.
134                    let delete_key_task = tokio::task::spawn_blocking(move || {
135                        db.delete_sapling_keys(keys);
136                    });
137
138                    // Return timeout errors or `RecvError`s, or wait for the key to be deleted from the database.
139                    remove_keys_result??;
140                    delete_key_task.await?;
141
142                    Ok(Response::DeletedKeys)
143                }
144                .boxed()
145            }
146
147            Request::Results(keys) => {
148                let db = self.db.clone();
149
150                async move {
151                    let mut final_result = BTreeMap::new();
152                    for key in keys {
153                        let db = db.clone();
154                        let mut heights_and_transactions = BTreeMap::new();
155                        let txs = {
156                            let key = key.clone();
157                            tokio::task::spawn_blocking(move || db.sapling_results_for_key(&key))
158                        }
159                        .await?;
160                        txs.iter().for_each(|(k, v)| {
161                            heights_and_transactions
162                                .entry(*k)
163                                .or_insert_with(Vec::new)
164                                .extend(v.iter().map(|x| Hash::from(*x)));
165                        });
166                        final_result.entry(key).or_insert(heights_and_transactions);
167                    }
168
169                    Ok(Response::Results(final_result))
170                }
171                .boxed()
172            }
173
174            Request::SubscribeResults(keys) => {
175                let mut scan_task = self.scan_task.clone();
176
177                async move {
178                    let results_receiver = scan_task.subscribe(keys)?.await.map_err(|_| {
179                        "scan task dropped responder, check that keys are registered"
180                    })?;
181
182                    Ok(Response::SubscribeResults(results_receiver))
183                }
184                .boxed()
185            }
186
187            Request::ClearResults(keys) => {
188                let mut db = self.db.clone();
189
190                async move {
191                    // Clear results from db for the provided `keys`
192                    tokio::task::spawn_blocking(move || {
193                        db.delete_sapling_results(keys);
194                    })
195                    .await?;
196
197                    Ok(Response::ClearedResults)
198                }
199                .boxed()
200            }
201        }
202    }
203}