zebra_grpc/
server.rs

1//! The gRPC server implementation
2
3use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};
4
5use color_eyre::eyre::eyre;
6use futures_util::future::TryFutureExt;
7use tokio::task::JoinHandle;
8use tokio_stream::{wrappers::ReceiverStream, Stream};
9use tonic::{
10    transport::{server::TcpIncoming, Server},
11    Request, Response, Status,
12};
13use tower::{timeout::error::Elapsed, ServiceExt};
14
15use zebra_chain::{block::Height, transaction};
16use zebra_node_services::scan_service::{
17    request::Request as ScanServiceRequest,
18    response::{Response as ScanServiceResponse, ScanResult},
19};
20
21use crate::scanner::{
22    scanner_server::{Scanner, ScannerServer},
23    ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse,
24    InfoReply, KeyWithHeight, RegisterKeysRequest, RegisterKeysResponse, Results, ScanRequest,
25    ScanResponse, Transaction, Transactions,
26};
27
28type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
29
30/// The maximum number of keys that can be requested in a single request.
31pub const MAX_KEYS_PER_REQUEST: usize = 10;
32
33/// The maximum number of messages that can be queued to be streamed to a client
34/// from the `scan` method.
35const SCAN_RESPONDER_BUFFER_SIZE: usize = 10_000;
36
37#[derive(Debug)]
38/// The server implementation
39pub struct ScannerRPC<ScanService>
40where
41    ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
42        + Clone
43        + Send
44        + Sync
45        + 'static,
46    <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
47{
48    scan_service: ScanService,
49}
50
51#[tonic::async_trait]
52impl<ScanService> Scanner for ScannerRPC<ScanService>
53where
54    ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
55        + Clone
56        + Send
57        + Sync
58        + 'static,
59    <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
60{
61    type ScanStream = Pin<Box<dyn Stream<Item = Result<ScanResponse, Status>> + Send>>;
62
63    async fn scan(
64        &self,
65        request: tonic::Request<ScanRequest>,
66    ) -> Result<Response<Self::ScanStream>, Status> {
67        let keys = request.into_inner().keys;
68
69        if keys.is_empty() {
70            let msg = "must provide at least 1 key in scan request";
71            return Err(Status::invalid_argument(msg));
72        }
73
74        let keys: Vec<_> = keys
75            .into_iter()
76            .map(|KeyWithHeight { key, height }| (key, height))
77            .collect();
78
79        let register_keys_response_fut = self
80            .scan_service
81            .clone()
82            .ready()
83            .await
84            .map_err(|_| Status::unknown("service poll_ready() method returned an error"))?
85            .call(ScanServiceRequest::RegisterKeys(keys.clone()));
86
87        let keys: Vec<_> = keys.into_iter().map(|(key, _start_at)| key).collect();
88
89        let subscribe_results_response_fut = self
90            .scan_service
91            .clone()
92            .ready()
93            .await
94            .map_err(|_| Status::unknown("service poll_ready() method returned an error"))?
95            .call(ScanServiceRequest::SubscribeResults(
96                keys.iter().cloned().collect(),
97            ));
98
99        let (register_keys_response, subscribe_results_response) =
100            tokio::join!(register_keys_response_fut, subscribe_results_response_fut);
101
102        // Ignores errors from the register keys request, we expect there to be a timeout if the keys
103        // are already registered, or an empty response if no new keys could be parsed as Sapling efvks.
104        //
105        // This method will still return an error if every key in the `scan` request is invalid, since
106        // the SubscribeResults request will return an error once the `rsp_tx` is dropped in `ScanTask::process_messages`
107        // when it finds that none of the keys in the request are registered.
108        let register_keys_err = match register_keys_response {
109            Ok(ScanServiceResponse::RegisteredKeys(_)) => None,
110            Ok(response) => {
111                return Err(Status::internal(format!(
112                    "unexpected response from scan service: {response:?}"
113                )))
114            }
115            Err(err) if err.downcast_ref::<Elapsed>().is_some() => {
116                return Err(Status::deadline_exceeded(
117                    "scan service requests timed out, is Zebra synced past Sapling activation height?")
118                )
119            }
120            Err(err) => Some(err),
121        };
122
123        let ScanServiceResponse::SubscribeResults(mut results_receiver) =
124            subscribe_results_response.map_err(|err| {
125                register_keys_err
126                    .map(|err| Status::invalid_argument(err.to_string()))
127                    .unwrap_or(Status::internal(err.to_string()))
128            })?
129        else {
130            return Err(Status::unknown(
131                "scan service returned an unexpected response",
132            ));
133        };
134
135        let ScanServiceResponse::Results(results) = self
136            .scan_service
137            .clone()
138            .ready()
139            .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone())))
140            .await
141            .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
142        else {
143            return Err(Status::unknown(
144                "scan service returned an unexpected response",
145            ));
146        };
147
148        let (response_sender, response_receiver) =
149            tokio::sync::mpsc::channel(SCAN_RESPONDER_BUFFER_SIZE);
150        let response_stream = ReceiverStream::new(response_receiver);
151
152        tokio::spawn(async move {
153            let mut initial_results = process_results(keys, results);
154
155            // Empty results receiver channel to filter out duplicate results between the channel and cache
156            while let Ok(ScanResult { key, height, tx_id }) = results_receiver.try_recv() {
157                let entry = initial_results
158                    .entry(key)
159                    .or_default()
160                    .by_height
161                    .entry(height.0)
162                    .or_default();
163
164                let tx_id = Transaction {
165                    hash: tx_id.to_string(),
166                };
167
168                // Add the scan result to the initial results if it's not already present.
169                if !entry.transactions.contains(&tx_id) {
170                    entry.transactions.push(tx_id);
171                }
172            }
173
174            let send_result = response_sender
175                .send(Ok(ScanResponse {
176                    results: initial_results,
177                }))
178                .await;
179
180            if send_result.is_err() {
181                // return early if the client has disconnected
182                return;
183            }
184
185            while let Some(scan_result) = results_receiver.recv().await {
186                let send_result = response_sender.send(Ok(scan_result.into())).await;
187
188                // Finish task if the client has disconnected
189                if send_result.is_err() {
190                    break;
191                }
192            }
193        });
194
195        Ok(Response::new(Box::pin(response_stream)))
196    }
197
198    async fn get_info(
199        &self,
200        _request: tonic::Request<Empty>,
201    ) -> Result<Response<InfoReply>, Status> {
202        let ScanServiceResponse::Info {
203            min_sapling_birthday_height,
204        } = self
205            .scan_service
206            .clone()
207            .ready()
208            .and_then(|service| service.call(ScanServiceRequest::Info))
209            .await
210            .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
211        else {
212            return Err(Status::unknown(
213                "scan service returned an unexpected response",
214            ));
215        };
216
217        let reply = InfoReply {
218            min_sapling_birthday_height: min_sapling_birthday_height.0,
219        };
220
221        Ok(Response::new(reply))
222    }
223
224    async fn register_keys(
225        &self,
226        request: Request<RegisterKeysRequest>,
227    ) -> Result<Response<RegisterKeysResponse>, Status> {
228        let keys: Vec<_> = request
229            .into_inner()
230            .keys
231            .into_iter()
232            .map(|key_with_height| (key_with_height.key, key_with_height.height))
233            .collect();
234
235        if keys.is_empty() {
236            let msg = "must provide at least 1 key for which to register keys";
237            return Err(Status::invalid_argument(msg));
238        }
239
240        if keys.len() > MAX_KEYS_PER_REQUEST {
241            let msg = format!(
242                "must provide at most {} keys to register keys",
243                MAX_KEYS_PER_REQUEST
244            );
245            return Err(Status::invalid_argument(msg));
246        }
247
248        match self
249            .scan_service
250            .clone()
251            .ready()
252            .and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys)))
253            .await
254        {
255            Ok(ScanServiceResponse::RegisteredKeys(keys)) => {
256                Ok(Response::new(RegisterKeysResponse { keys }))
257            }
258
259            Ok(response) => {
260                return Err(Status::internal(format!(
261                    "unexpected response from scan service: {response:?}"
262                )))
263            }
264
265            Err(err) if err.downcast_ref::<Elapsed>().is_some() => Err(Status::deadline_exceeded(
266                "RegisterKeys scan service request timed out, \
267                    is Zebra synced past Sapling activation height?",
268            )),
269
270            Err(err) => Err(Status::unknown(err.to_string())),
271        }
272    }
273
274    async fn clear_results(
275        &self,
276        request: Request<ClearResultsRequest>,
277    ) -> Result<Response<Empty>, Status> {
278        let keys = request.into_inner().keys;
279
280        if keys.is_empty() {
281            let msg = "must provide at least 1 key for which to clear results";
282            return Err(Status::invalid_argument(msg));
283        }
284
285        if keys.len() > MAX_KEYS_PER_REQUEST {
286            let msg = format!(
287                "must provide at most {} keys to clear results",
288                MAX_KEYS_PER_REQUEST
289            );
290            return Err(Status::invalid_argument(msg));
291        }
292
293        let ScanServiceResponse::ClearedResults = self
294            .scan_service
295            .clone()
296            .ready()
297            .and_then(|service| service.call(ScanServiceRequest::ClearResults(keys)))
298            .await
299            .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
300        else {
301            return Err(Status::unknown(
302                "scan service returned an unexpected response",
303            ));
304        };
305
306        Ok(Response::new(Empty {}))
307    }
308
309    async fn delete_keys(
310        &self,
311        request: Request<DeleteKeysRequest>,
312    ) -> Result<Response<Empty>, Status> {
313        let keys = request.into_inner().keys;
314
315        if keys.is_empty() {
316            let msg = "must provide at least 1 key to delete";
317            return Err(Status::invalid_argument(msg));
318        }
319
320        if keys.len() > MAX_KEYS_PER_REQUEST {
321            let msg = format!(
322                "must provide at most {} keys to delete",
323                MAX_KEYS_PER_REQUEST
324            );
325            return Err(Status::invalid_argument(msg));
326        }
327
328        let ScanServiceResponse::DeletedKeys = self
329            .scan_service
330            .clone()
331            .ready()
332            .and_then(|service| service.call(ScanServiceRequest::DeleteKeys(keys)))
333            .await
334            .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
335        else {
336            return Err(Status::unknown(
337                "scan service returned an unexpected response",
338            ));
339        };
340
341        Ok(Response::new(Empty {}))
342    }
343
344    async fn get_results(
345        &self,
346        request: Request<GetResultsRequest>,
347    ) -> Result<Response<GetResultsResponse>, Status> {
348        let keys = request.into_inner().keys;
349
350        if keys.is_empty() {
351            let msg = "must provide at least 1 key to get results";
352            return Err(Status::invalid_argument(msg));
353        }
354
355        if keys.len() > MAX_KEYS_PER_REQUEST {
356            let msg = format!(
357                "must provide at most {} keys to get results",
358                MAX_KEYS_PER_REQUEST
359            );
360            return Err(Status::invalid_argument(msg));
361        }
362
363        let ScanServiceResponse::Results(response) = self
364            .scan_service
365            .clone()
366            .ready()
367            .and_then(|service| service.call(ScanServiceRequest::Results(keys.clone())))
368            .await
369            .map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
370        else {
371            return Err(Status::unknown(
372                "scan service returned an unexpected response",
373            ));
374        };
375
376        let results = process_results(keys, response);
377
378        Ok(Response::new(GetResultsResponse { results }))
379    }
380}
381
382fn process_results(
383    keys: Vec<String>,
384    results: BTreeMap<String, BTreeMap<Height, Vec<transaction::Hash>>>,
385) -> BTreeMap<String, Results> {
386    // If there are no results for a key, we still want to return it with empty results.
387    let empty_map = BTreeMap::new();
388
389    keys.into_iter()
390        .map(|key| {
391            let values = results.get(&key).unwrap_or(&empty_map);
392
393            // Skip heights with no transactions, they are scanner markers and should not be returned.
394            let transactions = Results {
395                by_height: values
396                    .iter()
397                    .filter(|(_, transactions)| !transactions.is_empty())
398                    .map(|(height, transactions)| {
399                        let transactions = transactions
400                            .iter()
401                            .map(ToString::to_string)
402                            .map(|hash| Transaction { hash })
403                            .collect();
404                        (height.0, Transactions { transactions })
405                    })
406                    .collect(),
407            };
408
409            (key, transactions)
410        })
411        .collect::<BTreeMap<_, _>>()
412}
413
414impl From<ScanResult> for ScanResponse {
415    fn from(
416        ScanResult {
417            key,
418            height: Height(height),
419            tx_id,
420        }: ScanResult,
421    ) -> Self {
422        ScanResponse {
423            results: [(
424                key,
425                Results {
426                    by_height: [(
427                        height,
428                        Transactions {
429                            transactions: [tx_id.to_string()]
430                                .map(|hash| Transaction { hash })
431                                .to_vec(),
432                        },
433                    )]
434                    .into_iter()
435                    .collect(),
436                },
437            )]
438            .into_iter()
439            .collect(),
440        }
441    }
442}
443
444type ServerTask = JoinHandle<Result<(), tonic::transport::Error>>;
445
446/// Initializes the zebra-scan gRPC server
447pub async fn init<ScanService>(
448    listen_addr: SocketAddr,
449    scan_service: ScanService,
450) -> Result<(ServerTask, SocketAddr), color_eyre::Report>
451where
452    ScanService: tower::Service<ScanServiceRequest, Response = ScanServiceResponse, Error = BoxError>
453        + Clone
454        + Send
455        + Sync
456        + 'static,
457    <ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
458{
459    let service = ScannerRPC { scan_service };
460    let reflection_service = tonic_reflection::server::Builder::configure()
461        .register_encoded_file_descriptor_set(crate::scanner::FILE_DESCRIPTOR_SET)
462        .build_v1()
463        .unwrap();
464
465    let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
466    let listen_addr = tcp_listener.local_addr()?;
467    let incoming =
468        TcpIncoming::from_listener(tcp_listener, true, None).map_err(|err| eyre!(err))?;
469
470    let server_task: JoinHandle<Result<(), tonic::transport::Error>> = tokio::spawn(async move {
471        Server::builder()
472            .add_service(reflection_service)
473            .add_service(ScannerServer::new(service))
474            .serve_with_incoming(incoming)
475            .await?;
476
477        Ok(())
478    });
479
480    Ok((server_task, listen_addr))
481}