zebra_consensus/primitives/
ed25519.rs

1//! Async Ed25519 batch verifier service
2
3use std::{
4    future::Future,
5    mem,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18use zebra_chain::primitives::ed25519::*;
19
20use crate::BoxError;
21
22use super::{spawn_fifo, spawn_fifo_and_convert};
23
24#[cfg(test)]
25mod tests;
26
27/// The type of the batch verifier.
28type BatchVerifier = batch::Verifier;
29
30/// The type of verification results.
31type VerifyResult = Result<(), Error>;
32
33/// The type of the batch sender channel.
34type Sender = watch::Sender<Option<VerifyResult>>;
35
36/// The type of the batch item.
37/// This is a newtype around an `Ed25519Item`.
38#[derive(Clone, Debug)]
39pub struct Item(batch::Item);
40
41impl RequestWeight for Item {}
42
43impl<'msg, M: AsRef<[u8]> + ?Sized> From<(VerificationKeyBytes, Signature, &'msg M)> for Item {
44    fn from(tup: (VerificationKeyBytes, Signature, &'msg M)) -> Self {
45        Self(batch::Item::from(tup))
46    }
47}
48
49impl From<Item> for batch::Item {
50    fn from(Item(item): Item) -> Self {
51        item
52    }
53}
54
55impl Item {
56    fn verify_single(self) -> VerifyResult {
57        self.0.verify_single()
58    }
59}
60
61/// Global batch verification context for Ed25519 signatures.
62///
63/// This service transparently batches contemporaneous signature verifications,
64/// handling batch failures by falling back to individual verification.
65///
66/// Note that making a `Service` call requires mutable access to the service, so
67/// you should call `.clone()` on the global handle to create a local, mutable
68/// handle.
69pub static VERIFIER: Lazy<
70    Fallback<
71        Batch<Verifier, Item>,
72        ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
73    >,
74> = Lazy::new(|| {
75    Fallback::new(
76        Batch::new(
77            Verifier::default(),
78            super::MAX_BATCH_SIZE,
79            None,
80            super::MAX_BATCH_LATENCY,
81        ),
82        // We want to fallback to individual verification if batch verification fails,
83        // so we need a Service to use.
84        //
85        // Because we have to specify the type of a static, we need to be able to
86        // write the type of the closure and its return value. But both closures and
87        // async blocks have unnameable types. So instead we cast the closure to a function
88        // (which is possible because it doesn't capture any state), and use a BoxFuture
89        // to erase the result type.
90        // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
91        tower::service_fn(
92            (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
93        ),
94    )
95});
96
97/// Ed25519 signature verifier service
98pub struct Verifier {
99    /// A batch verifier for ed25519 signatures.
100    batch: BatchVerifier,
101
102    /// A channel for broadcasting the result of a batch to the futures for each batch item.
103    ///
104    /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
105    /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
106    tx: Sender,
107}
108
109impl Default for Verifier {
110    fn default() -> Self {
111        let batch = BatchVerifier::default();
112        let (tx, _) = watch::channel(None);
113        Self { batch, tx }
114    }
115}
116
117impl Verifier {
118    /// Returns the batch verifier and channel sender from `self`,
119    /// replacing them with a new empty batch.
120    fn take(&mut self) -> (BatchVerifier, Sender) {
121        // Use a new verifier and channel for each batch.
122        let batch = mem::take(&mut self.batch);
123
124        let (tx, _) = watch::channel(None);
125        let tx = mem::replace(&mut self.tx, tx);
126
127        (batch, tx)
128    }
129
130    /// Synchronously process the batch, and send the result using the channel sender.
131    /// This function blocks until the batch is completed.
132    fn verify(batch: BatchVerifier, tx: Sender) {
133        let result = batch.verify(thread_rng());
134        let _ = tx.send(Some(result));
135    }
136
137    /// Flush the batch using a thread pool, and return the result via the channel.
138    /// This returns immediately, usually before the batch is completed.
139    fn flush_blocking(&mut self) {
140        let (batch, tx) = self.take();
141
142        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
143        //
144        // We don't care about execution order here, because this method is only called on drop.
145        tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
146    }
147
148    /// Flush the batch using a thread pool, and return the result via the channel.
149    /// This function returns a future that becomes ready when the batch is completed.
150    async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
151        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
152        let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
153    }
154
155    /// Verify a single item using a thread pool, and return the result.
156    async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
157        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
158        spawn_fifo_and_convert(move || item.verify_single()).await
159    }
160}
161
162impl Service<BatchControl<Item>> for Verifier {
163    type Response = ();
164    type Error = BoxError;
165    type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
166
167    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168        Poll::Ready(Ok(()))
169    }
170
171    fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
172        match req {
173            BatchControl::Item(item) => {
174                tracing::trace!("got ed25519 item");
175                self.batch.queue(item);
176                let mut rx = self.tx.subscribe();
177
178                Box::pin(async move {
179                    match rx.changed().await {
180                        Ok(()) => {
181                            // We use a new channel for each batch,
182                            // so we always get the correct batch result here.
183                            let result = rx.borrow()
184                                .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
185
186                            if result.is_ok() {
187                                tracing::trace!(?result, "validated ed25519 signature");
188                                metrics::counter!("signatures.ed25519.validated").increment(1);
189                            } else {
190                                tracing::trace!(?result, "invalid ed25519 signature");
191                                metrics::counter!("signatures.ed25519.invalid").increment(1);
192                            }
193                            result.map_err(BoxError::from)
194                        }
195                        Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
196                    }
197                })
198            }
199
200            BatchControl::Flush => {
201                tracing::trace!("got ed25519 flush command");
202
203                let (batch, tx) = self.take();
204
205                Box::pin(Self::flush_spawning(batch, tx).map(Ok))
206            }
207        }
208    }
209}
210
211impl Drop for Verifier {
212    fn drop(&mut self) {
213        // We need to flush the current batch in case there are still any pending futures.
214        // This returns immediately, usually before the batch is completed.
215        self.flush_blocking();
216    }
217}