1use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
4
5use color_eyre::eyre::eyre;
6use futures_util::future::TryFutureExt;
7use tokio::task::JoinHandle;
8use tokio_stream::{wrappers::ReceiverStream, Stream};
9use tonic::{
10 transport::{server::TcpIncoming, Server},
11 Request, Response, Status,
12};
13use tower::{timeout::error::Elapsed, ServiceExt};
14
15use zebra_chain::{block::Height, transaction};
16use zebra_node_services::scan_service::{
17 request::Request as ScanServiceRequest,
18 response::{Response as ScanServiceResponse, ScanResult},
19};
20
21use crate::scanner::{
22 scanner_server::{Scanner, ScannerServer},
23 ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse,
24 InfoReply, KeyWithHeight, RegisterKeysRequest, RegisterKeysResponse, Results, ScanRequest,
25 ScanResponse, Transaction, Transactions,
26};
27
28type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
29
30pub const MAX_KEYS_PER_REQUEST: usize = 10;
32
33const SCAN_RESPONDER_BUFFER_SIZE: usize = 10_000;
36
37#[derive(Debug)]
38pub struct ScannerRPC<ScanService>
40where
41 ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
42 + Clone
43 + Send
44 + Sync
45 + 'static,
46 <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
47{
48 scan_service: ScanService,
49}
50
51#[tonic::async_trait]
52impl<ScanService> Scanner for ScannerRPC<ScanService>
53where
54 ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
55 + Clone
56 + Send
57 + Sync
58 + 'static,
59 <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
60{
61 type ScanStream = Pin<Box<dyn Stream<Item = Result<ScanResponse, Status>> + Send>>;
62
63 async fn scan(
64 &self,
65 request: tonic::Request<ScanRequest>,
66 ) -> Result<Response<Self::ScanStream>, Status> {
67 let keys = request.into_inner().keys;
68
69 if keys.is_empty() {
70 let msg = "must provide at least 1 key in scan request";
71 return Err(Status::invalid_argument(msg));
72 }
73
74 let keys: Vec<_> = keys
75 .into_iter()
76 .map(|KeyWithHeight { key, height }| (key, height))
77 .collect();
78
79 let register_keys_response_fut = self
80 .scan_service
81 .clone()
82 .ready()
83 .await
84 .map_err(|_| Status::unknown("service poll_ready() method returned an error"))?
85 .call(ScanServiceRequest::RegisterKeys(keys.clone()));
86
87 let keys: Vec<_> = keys.into_iter().map(|(key, _start_at)| key).collect();
88
89 let subscribe_results_response_fut = self
90 .scan_service
91 .clone()
92 .ready()
93 .await
94 .map_err(|_| Status::unknown("service poll_ready() method returned an error"))?
95 .call(ScanServiceRequest::SubscribeResults(
96 keys.iter().cloned().collect(),
97 ));
98
99 let (register_keys_response, subscribe_results_response) =
100 tokio::join!(register_keys_response_fut, subscribe_results_response_fut);
101
102 let register_keys_err = match register_keys_response {
109 Ok(ScanServiceResponse::RegisteredKeys(_)) => None,
110 Ok(response) => {
111 return Err(Status::internal(format!(
112 "unexpected response from scan service: {response:?}"
113 )))
114 }
115 Err(err) if err.downcast_ref::<Elapsed>().is_some() => {
116 return Err(Status::deadline_exceeded(
117 "scan service requests timed out, is Zebra synced past Sapling activation height?")
118 )
119 }
120 Err(err) => Some(err),
121 };
122
123 let ScanServiceResponse::SubscribeResults(mut results_receiver) =
124 subscribe_results_response.map_err(|err| {
125 register_keys_err
126 .map(|err| Status::invalid_argument(err.to_string()))
127 .unwrap_or(Status::internal(err.to_string()))
128 })?
129 else {
130 return Err(Status::unknown(
131 "scan service returned an unexpected response",
132 ));
133 };
134
135 let ScanServiceResponse::Results(results) = self
136 .scan_service
137 .clone()
138 .ready()
139 .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone())))
140 .await
141 .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
142 else {
143 return Err(Status::unknown(
144 "scan service returned an unexpected response",
145 ));
146 };
147
148 let (response_sender, response_receiver) =
149 tokio::sync::mpsc::channel(SCAN_RESPONDER_BUFFER_SIZE);
150 let response_stream = ReceiverStream::new(response_receiver);
151
152 tokio::spawn(async move {
153 let mut initial_results = process_results(keys, results);
154
155 while let Ok(ScanResult { key, height, tx_id }) = results_receiver.try_recv() {
157 let entry = initial_results
158 .entry(key)
159 .or_default()
160 .by_height
161 .entry(height.0)
162 .or_default();
163
164 let tx_id = Transaction {
165 hash: tx_id.to_string(),
166 };
167
168 if !entry.transactions.contains(&tx_id) {
170 entry.transactions.push(tx_id);
171 }
172 }
173
174 let send_result = response_sender
175 .send(Ok(ScanResponse {
176 results: initial_results,
177 }))
178 .await;
179
180 if send_result.is_err() {
181 return;
183 }
184
185 while let Some(scan_result) = results_receiver.recv().await {
186 let send_result = response_sender.send(Ok(scan_result.into())).await;
187
188 if send_result.is_err() {
190 break;
191 }
192 }
193 });
194
195 Ok(Response::new(Box::pin(response_stream)))
196 }
197
198 async fn get_info(
199 &self,
200 _request: tonic::Request<Empty>,
201 ) -> Result<Response<InfoReply>, Status> {
202 let ScanServiceResponse::Info {
203 min_sapling_birthday_height,
204 } = self
205 .scan_service
206 .clone()
207 .ready()
208 .and_then(|service| service.call(ScanServiceRequest::Info))
209 .await
210 .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
211 else {
212 return Err(Status::unknown(
213 "scan service returned an unexpected response",
214 ));
215 };
216
217 let reply = InfoReply {
218 min_sapling_birthday_height: min_sapling_birthday_height.0,
219 };
220
221 Ok(Response::new(reply))
222 }
223
224 async fn register_keys(
225 &self,
226 request: Request<RegisterKeysRequest>,
227 ) -> Result<Response<RegisterKeysResponse>, Status> {
228 let keys: Vec<_> = request
229 .into_inner()
230 .keys
231 .into_iter()
232 .map(|key_with_height| (key_with_height.key, key_with_height.height))
233 .collect();
234
235 if keys.is_empty() {
236 let msg = "must provide at least 1 key for which to register keys";
237 return Err(Status::invalid_argument(msg));
238 }
239
240 if keys.len() > MAX_KEYS_PER_REQUEST {
241 let msg = format!(
242 "must provide at most {} keys to register keys",
243 MAX_KEYS_PER_REQUEST
244 );
245 return Err(Status::invalid_argument(msg));
246 }
247
248 match self
249 .scan_service
250 .clone()
251 .ready()
252 .and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys)))
253 .await
254 {
255 Ok(ScanServiceResponse::RegisteredKeys(keys)) => {
256 Ok(Response::new(RegisterKeysResponse { keys }))
257 }
258
259 Ok(response) => {
260 return Err(Status::internal(format!(
261 "unexpected response from scan service: {response:?}"
262 )))
263 }
264
265 Err(err) if err.downcast_ref::<Elapsed>().is_some() => Err(Status::deadline_exceeded(
266 "RegisterKeys scan service request timed out, \
267 is Zebra synced past Sapling activation height?",
268 )),
269
270 Err(err) => Err(Status::unknown(err.to_string())),
271 }
272 }
273
274 async fn clear_results(
275 &self,
276 request: Request<ClearResultsRequest>,
277 ) -> Result<Response<Empty>, Status> {
278 let keys = request.into_inner().keys;
279
280 if keys.is_empty() {
281 let msg = "must provide at least 1 key for which to clear results";
282 return Err(Status::invalid_argument(msg));
283 }
284
285 if keys.len() > MAX_KEYS_PER_REQUEST {
286 let msg = format!(
287 "must provide at most {} keys to clear results",
288 MAX_KEYS_PER_REQUEST
289 );
290 return Err(Status::invalid_argument(msg));
291 }
292
293 let ScanServiceResponse::ClearedResults = self
294 .scan_service
295 .clone()
296 .ready()
297 .and_then(|service| service.call(ScanServiceRequest::ClearResults(keys)))
298 .await
299 .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
300 else {
301 return Err(Status::unknown(
302 "scan service returned an unexpected response",
303 ));
304 };
305
306 Ok(Response::new(Empty {}))
307 }
308
309 async fn delete_keys(
310 &self,
311 request: Request<DeleteKeysRequest>,
312 ) -> Result<Response<Empty>, Status> {
313 let keys = request.into_inner().keys;
314
315 if keys.is_empty() {
316 let msg = "must provide at least 1 key to delete";
317 return Err(Status::invalid_argument(msg));
318 }
319
320 if keys.len() > MAX_KEYS_PER_REQUEST {
321 let msg = format!(
322 "must provide at most {} keys to delete",
323 MAX_KEYS_PER_REQUEST
324 );
325 return Err(Status::invalid_argument(msg));
326 }
327
328 let ScanServiceResponse::DeletedKeys = self
329 .scan_service
330 .clone()
331 .ready()
332 .and_then(|service| service.call(ScanServiceRequest::DeleteKeys(keys)))
333 .await
334 .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
335 else {
336 return Err(Status::unknown(
337 "scan service returned an unexpected response",
338 ));
339 };
340
341 Ok(Response::new(Empty {}))
342 }
343
344 async fn get_results(
345 &self,
346 request: Request<GetResultsRequest>,
347 ) -> Result<Response<GetResultsResponse>, Status> {
348 let keys = request.into_inner().keys;
349
350 if keys.is_empty() {
351 let msg = "must provide at least 1 key to get results";
352 return Err(Status::invalid_argument(msg));
353 }
354
355 if keys.len() > MAX_KEYS_PER_REQUEST {
356 let msg = format!(
357 "must provide at most {} keys to get results",
358 MAX_KEYS_PER_REQUEST
359 );
360 return Err(Status::invalid_argument(msg));
361 }
362
363 let ScanServiceResponse::Results(response) = self
364 .scan_service
365 .clone()
366 .ready()
367 .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone())))
368 .await
369 .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
370 else {
371 return Err(Status::unknown(
372 "scan service returned an unexpected response",
373 ));
374 };
375
376 let results = process_results(keys, response);
377
378 Ok(Response::new(GetResultsResponse { results }))
379 }
380}
381
382fn process_results(
383 keys: Vec<String>,
384 results: BTreeMap<String, BTreeMap<Height, Vec<transaction::Hash>>>,
385) -> BTreeMap<String, Results> {
386 let empty_map = BTreeMap::new();
388
389 keys.into_iter()
390 .map(|key| {
391 let values = results.get(&key).unwrap_or(&empty_map);
392
393 let transactions = Results {
395 by_height: values
396 .iter()
397 .filter(|(_, transactions)| !transactions.is_empty())
398 .map(|(height, transactions)| {
399 let transactions = transactions
400 .iter()
401 .map(ToString::to_string)
402 .map(|hash| Transaction { hash })
403 .collect();
404 (height.0, Transactions { transactions })
405 })
406 .collect(),
407 };
408
409 (key, transactions)
410 })
411 .collect::<BTreeMap<_, _>>()
412}
413
414impl From<ScanResult> for ScanResponse {
415 fn from(
416 ScanResult {
417 key,
418 height: Height(height),
419 tx_id,
420 }: ScanResult,
421 ) -> Self {
422 ScanResponse {
423 results: [(
424 key,
425 Results {
426 by_height: [(
427 height,
428 Transactions {
429 transactions: [tx_id.to_string()]
430 .map(|hash| Transaction { hash })
431 .to_vec(),
432 },
433 )]
434 .into_iter()
435 .collect(),
436 },
437 )]
438 .into_iter()
439 .collect(),
440 }
441 }
442}
443
444type ServerTask = JoinHandle<Result<(), tonic::transport::Error>>;
445
446pub async fn init<ScanService>(
448 listen_addr: SocketAddr,
449 scan_service: ScanService,
450) -> Result<(ServerTask, SocketAddr), color_eyre::Report>
451where
452 ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
453 + Clone
454 + Send
455 + Sync
456 + 'static,
457 <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
458{
459 let service = ScannerRPC { scan_service };
460 let reflection_service = tonic_reflection::server::Builder::configure()
461 .register_encoded_file_descriptor_set(crate::scanner::FILE_DESCRIPTOR_SET)
462 .build_v1()
463 .unwrap();
464
465 let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
466 let listen_addr = tcp_listener.local_addr()?;
467 let incoming =
468 TcpIncoming::from_listener(tcp_listener, true, None).map_err(|err| eyre!(err))?;
469
470 let server_task: JoinHandle<Result<(), tonic::transport::Error>> = tokio::spawn(async move {
471 Server::builder()
472 .add_service(reflection_service)
473 .add_service(ScannerServer::new(service))
474 .serve_with_incoming(incoming)
475 .await?;
476
477 Ok(())
478 });
479
480 Ok((server_task, listen_addr))
481}