zebra_scan/
service.rs
1use std::{collections::BTreeMap, future::Future, pin::Pin, task::Poll, time::Duration};
4
5use futures::future::FutureExt;
6use tower::{BoxError, Service};
7
8use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network, transaction::Hash};
9
10use zebra_state::ChainTipChange;
11
12use crate::{scan, storage::Storage, Config, Request, Response};
13
14#[cfg(test)]
15mod tests;
16
17pub mod scan_task;
18
19pub use scan_task::{ScanTask, ScanTaskCommand};
20
21#[cfg(any(test, feature = "proptest-impl"))]
22use tokio::sync::mpsc::Receiver;
23
24#[derive(Debug)]
26pub struct ScanService {
27 pub db: Storage,
29
30 scan_task: ScanTask,
32}
33
34const DELETE_KEY_TIMEOUT: Duration = Duration::from_secs(15);
39
40impl ScanService {
41 pub async fn new(
43 config: &Config,
44 network: &Network,
45 state: scan::State,
46 chain_tip_change: ChainTipChange,
47 ) -> Self {
48 let config = config.clone();
49 let network = network.clone();
50 let storage = tokio::task::spawn_blocking(move || Storage::new(&config, &network, false))
51 .wait_for_panics()
52 .await;
53
54 Self {
55 scan_task: ScanTask::spawn(storage.clone(), state, chain_tip_change),
56 db: storage,
57 }
58 }
59
60 #[cfg(any(test, feature = "proptest-impl"))]
63 pub fn new_with_mock_scanner(db: Storage) -> (Self, Receiver<ScanTaskCommand>) {
64 let (scan_task, cmd_receiver) = ScanTask::mock();
65 (Self { db, scan_task }, cmd_receiver)
66 }
67}
68
69impl Service<Request> for ScanService {
70 type Response = Response;
71 type Error = BoxError;
72 type Future =
73 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
74
75 fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
76 assert!(
78 !self.scan_task.handle.is_finished(),
79 "scan task finished unexpectedly"
80 );
81
82 self.db.check_for_panics();
83
84 Poll::Ready(Ok(()))
85 }
86
87 fn call(&mut self, req: Request) -> Self::Future {
88 if let Err(error) = req.check() {
89 return async move { Err(error) }.boxed();
90 }
91
92 match req {
93 Request::Info => {
94 let db = self.db.clone();
95
96 async move {
97 Ok(Response::Info {
98 min_sapling_birthday_height: db.network().sapling_activation_height(),
99 })
100 }
101 .boxed()
102 }
103
104 Request::RegisterKeys(keys) => {
105 let mut scan_task = self.scan_task.clone();
106
107 async move {
108 let newly_registered_keys = scan_task.register_keys(keys)?.await?;
109 if !newly_registered_keys.is_empty() {
110 Ok(Response::RegisteredKeys(newly_registered_keys))
111 } else {
112 Err("no keys were registered, check that keys are not already registered and \
113 are valid Sapling extended full viewing keys".into())
114 }
115 }
116 .boxed()
117 }
118
119 Request::DeleteKeys(keys) => {
120 let mut db = self.db.clone();
121 let mut scan_task = self.scan_task.clone();
122
123 async move {
124 let remove_keys_result = tokio::time::timeout(
126 DELETE_KEY_TIMEOUT,
127 scan_task.remove_keys(keys.clone())?,
128 )
129 .await
130 .map_err(|_| "request timed out removing keys from scan task".to_string());
131
132 let delete_key_task = tokio::task::spawn_blocking(move || {
135 db.delete_sapling_keys(keys);
136 });
137
138 remove_keys_result??;
140 delete_key_task.await?;
141
142 Ok(Response::DeletedKeys)
143 }
144 .boxed()
145 }
146
147 Request::Results(keys) => {
148 let db = self.db.clone();
149
150 async move {
151 let mut final_result = BTreeMap::new();
152 for key in keys {
153 let db = db.clone();
154 let mut heights_and_transactions = BTreeMap::new();
155 let txs = {
156 let key = key.clone();
157 tokio::task::spawn_blocking(move || db.sapling_results_for_key(&key))
158 }
159 .await?;
160 txs.iter().for_each(|(k, v)| {
161 heights_and_transactions
162 .entry(*k)
163 .or_insert_with(Vec::new)
164 .extend(v.iter().map(|x| Hash::from(*x)));
165 });
166 final_result.entry(key).or_insert(heights_and_transactions);
167 }
168
169 Ok(Response::Results(final_result))
170 }
171 .boxed()
172 }
173
174 Request::SubscribeResults(keys) => {
175 let mut scan_task = self.scan_task.clone();
176
177 async move {
178 let results_receiver = scan_task.subscribe(keys)?.await.map_err(|_| {
179 "scan task dropped responder, check that keys are registered"
180 })?;
181
182 Ok(Response::SubscribeResults(results_receiver))
183 }
184 .boxed()
185 }
186
187 Request::ClearResults(keys) => {
188 let mut db = self.db.clone();
189
190 async move {
191 tokio::task::spawn_blocking(move || {
193 db.delete_sapling_results(keys);
194 })
195 .await?;
196
197 Ok(Response::ClearedResults)
198 }
199 .boxed()
200 }
201 }
202 }
203}