zebra_consensus/primitives/
ed25519.rs

1//! Async Ed25519 batch verifier service
2
3use std::{
4    future::Future,
5    mem,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl};
17use tower_fallback::Fallback;
18use zebra_chain::primitives::ed25519::*;
19
20use crate::BoxError;
21
22use super::{spawn_fifo, spawn_fifo_and_convert};
23
24#[cfg(test)]
25mod tests;
26
27/// The type of the batch verifier.
28type BatchVerifier = batch::Verifier;
29
30/// The type of verification results.
31type VerifyResult = Result<(), Error>;
32
33/// The type of the batch sender channel.
34type Sender = watch::Sender<Option<VerifyResult>>;
35
36/// The type of the batch item.
37/// This is an `Ed25519Item`.
38pub type Item = batch::Item;
39
40/// Global batch verification context for Ed25519 signatures.
41///
42/// This service transparently batches contemporaneous signature verifications,
43/// handling batch failures by falling back to individual verification.
44///
45/// Note that making a `Service` call requires mutable access to the service, so
46/// you should call `.clone()` on the global handle to create a local, mutable
47/// handle.
48pub static VERIFIER: Lazy<
49    Fallback<
50        Batch<Verifier, Item>,
51        ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
52    >,
53> = Lazy::new(|| {
54    Fallback::new(
55        Batch::new(
56            Verifier::default(),
57            super::MAX_BATCH_SIZE,
58            None,
59            super::MAX_BATCH_LATENCY,
60        ),
61        // We want to fallback to individual verification if batch verification fails,
62        // so we need a Service to use.
63        //
64        // Because we have to specify the type of a static, we need to be able to
65        // write the type of the closure and its return value. But both closures and
66        // async blocks have unnameable types. So instead we cast the closure to a function
67        // (which is possible because it doesn't capture any state), and use a BoxFuture
68        // to erase the result type.
69        // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
70        tower::service_fn(
71            (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
72        ),
73    )
74});
75
76/// Ed25519 signature verifier service
77pub struct Verifier {
78    /// A batch verifier for ed25519 signatures.
79    batch: BatchVerifier,
80
81    /// A channel for broadcasting the result of a batch to the futures for each batch item.
82    ///
83    /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
84    /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
85    tx: Sender,
86}
87
88impl Default for Verifier {
89    fn default() -> Self {
90        let batch = BatchVerifier::default();
91        let (tx, _) = watch::channel(None);
92        Self { batch, tx }
93    }
94}
95
96impl Verifier {
97    /// Returns the batch verifier and channel sender from `self`,
98    /// replacing them with a new empty batch.
99    fn take(&mut self) -> (BatchVerifier, Sender) {
100        // Use a new verifier and channel for each batch.
101        let batch = mem::take(&mut self.batch);
102
103        let (tx, _) = watch::channel(None);
104        let tx = mem::replace(&mut self.tx, tx);
105
106        (batch, tx)
107    }
108
109    /// Synchronously process the batch, and send the result using the channel sender.
110    /// This function blocks until the batch is completed.
111    fn verify(batch: BatchVerifier, tx: Sender) {
112        let result = batch.verify(thread_rng());
113        let _ = tx.send(Some(result));
114    }
115
116    /// Flush the batch using a thread pool, and return the result via the channel.
117    /// This returns immediately, usually before the batch is completed.
118    fn flush_blocking(&mut self) {
119        let (batch, tx) = self.take();
120
121        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
122        //
123        // We don't care about execution order here, because this method is only called on drop.
124        tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
125    }
126
127    /// Flush the batch using a thread pool, and return the result via the channel.
128    /// This function returns a future that becomes ready when the batch is completed.
129    async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
130        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
131        let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
132    }
133
134    /// Verify a single item using a thread pool, and return the result.
135    async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
136        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
137        spawn_fifo_and_convert(move || item.verify_single()).await
138    }
139}
140
141impl Service<BatchControl<Item>> for Verifier {
142    type Response = ();
143    type Error = BoxError;
144    type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
145
146    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        Poll::Ready(Ok(()))
148    }
149
150    fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
151        match req {
152            BatchControl::Item(item) => {
153                tracing::trace!("got ed25519 item");
154                self.batch.queue(item);
155                let mut rx = self.tx.subscribe();
156
157                Box::pin(async move {
158                    match rx.changed().await {
159                        Ok(()) => {
160                            // We use a new channel for each batch,
161                            // so we always get the correct batch result here.
162                            let result = rx.borrow()
163                                .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
164
165                            if result.is_ok() {
166                                tracing::trace!(?result, "validated ed25519 signature");
167                                metrics::counter!("signatures.ed25519.validated").increment(1);
168                            } else {
169                                tracing::trace!(?result, "invalid ed25519 signature");
170                                metrics::counter!("signatures.ed25519.invalid").increment(1);
171                            }
172                            result.map_err(BoxError::from)
173                        }
174                        Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
175                    }
176                })
177            }
178
179            BatchControl::Flush => {
180                tracing::trace!("got ed25519 flush command");
181
182                let (batch, tx) = self.take();
183
184                Box::pin(Self::flush_spawning(batch, tx).map(Ok))
185            }
186        }
187    }
188}
189
190impl Drop for Verifier {
191    fn drop(&mut self) {
192        // We need to flush the current batch in case there are still any pending futures.
193        // This returns immediately, usually before the batch is completed.
194        self.flush_blocking();
195    }
196}