zebra_consensus/primitives/
redpallas.rs

1//! Async RedPallas batch verifier service
2
3use std::{
4    future::Future,
5    mem,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18
19use zebra_chain::primitives::reddsa::{batch, orchard, Error, Signature, VerificationKeyBytes};
20
21use crate::BoxError;
22
23use super::{spawn_fifo, spawn_fifo_and_convert};
24
25#[cfg(test)]
26mod tests;
27
28/// The type of the batch verifier.
29type BatchVerifier = batch::Verifier<orchard::SpendAuth, orchard::Binding>;
30
31/// The type of verification results.
32type VerifyResult = Result<(), Error>;
33
34/// The type of the batch sender channel.
35type Sender = watch::Sender<Option<VerifyResult>>;
36
37/// The type of the batch item.
38/// This is a newtype around a `RedPallasItem`.
39#[derive(Clone, Debug)]
40pub struct Item(batch::Item<orchard::SpendAuth, orchard::Binding>);
41
42impl RequestWeight for Item {}
43
44impl From<Item> for batch::Item<orchard::SpendAuth, orchard::Binding> {
45    fn from(Item(item): Item) -> Self {
46        item
47    }
48}
49
50impl Item {
51    fn verify_single(self) -> VerifyResult {
52        self.0.verify_single()
53    }
54
55    /// Create a batch item from a `SpendAuth` signature.
56    pub fn from_spendauth(
57        vk_bytes: VerificationKeyBytes<orchard::SpendAuth>,
58        sig: Signature<orchard::SpendAuth>,
59        msg: &impl AsRef<[u8]>,
60    ) -> Self {
61        Self(batch::Item::from_spendauth(vk_bytes, sig, msg))
62    }
63
64    /// Create a batch item from a `Binding` signature.
65    pub fn from_binding(
66        vk_bytes: VerificationKeyBytes<orchard::Binding>,
67        sig: Signature<orchard::Binding>,
68        msg: &impl AsRef<[u8]>,
69    ) -> Self {
70        Self(batch::Item::from_binding(vk_bytes, sig, msg))
71    }
72}
73
74/// Global batch verification context for RedPallas signatures.
75///
76/// This service transparently batches contemporaneous signature verifications,
77/// handling batch failures by falling back to individual verification.
78///
79/// Note that making a `Service` call requires mutable access to the service, so
80/// you should call `.clone()` on the global handle to create a local, mutable
81/// handle.
82pub static VERIFIER: Lazy<
83    Fallback<
84        Batch<Verifier, Item>,
85        ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
86    >,
87> = Lazy::new(|| {
88    Fallback::new(
89        Batch::new(
90            Verifier::default(),
91            super::MAX_BATCH_SIZE,
92            None,
93            super::MAX_BATCH_LATENCY,
94        ),
95        // We want to fallback to individual verification if batch verification fails,
96        // so we need a Service to use.
97        //
98        // Because we have to specify the type of a static, we need to be able to
99        // write the type of the closure and its return value. But both closures and
100        // async blocks have unnameable types. So instead we cast the closure to a function
101        // (which is possible because it doesn't capture any state), and use a BoxFuture
102        // to erase the result type.
103        // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
104        tower::service_fn(
105            (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
106        ),
107    )
108});
109
110/// RedPallas signature verifier service
111pub struct Verifier {
112    /// A batch verifier for RedPallas signatures.
113    batch: BatchVerifier,
114
115    /// A channel for broadcasting the result of a batch to the futures for each batch item.
116    ///
117    /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
118    /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
119    tx: Sender,
120}
121
122impl Default for Verifier {
123    fn default() -> Self {
124        let batch = BatchVerifier::default();
125        let (tx, _) = watch::channel(None);
126        Self { batch, tx }
127    }
128}
129
130impl Verifier {
131    /// Returns the batch verifier and channel sender from `self`,
132    /// replacing them with a new empty batch.
133    fn take(&mut self) -> (BatchVerifier, Sender) {
134        // Use a new verifier and channel for each batch.
135        let batch = mem::take(&mut self.batch);
136
137        let (tx, _) = watch::channel(None);
138        let tx = mem::replace(&mut self.tx, tx);
139
140        (batch, tx)
141    }
142
143    /// Synchronously process the batch, and send the result using the channel sender.
144    /// This function blocks until the batch is completed.
145    fn verify(batch: BatchVerifier, tx: Sender) {
146        let result = batch.verify(thread_rng());
147        let _ = tx.send(Some(result));
148    }
149
150    /// Flush the batch using a thread pool, and return the result via the channel.
151    /// This returns immediately, usually before the batch is completed.
152    fn flush_blocking(&mut self) {
153        let (batch, tx) = self.take();
154
155        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
156        //
157        // We don't care about execution order here, because this method is only called on drop.
158        tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
159    }
160
161    /// Flush the batch using a thread pool, and return the result via the channel.
162    /// This function returns a future that becomes ready when the batch is completed.
163    async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
164        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
165        let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
166    }
167
168    /// Verify a single item using a thread pool, and return the result.
169    async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
170        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
171        spawn_fifo_and_convert(move || item.verify_single()).await
172    }
173}
174
175impl Service<BatchControl<Item>> for Verifier {
176    type Response = ();
177    type Error = BoxError;
178    type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
179
180    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181        Poll::Ready(Ok(()))
182    }
183
184    fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
185        match req {
186            BatchControl::Item(item) => {
187                tracing::trace!("got item");
188                self.batch.queue(item);
189                let mut rx = self.tx.subscribe();
190                Box::pin(async move {
191                    match rx.changed().await {
192                        Ok(()) => {
193                            // We use a new channel for each batch,
194                            // so we always get the correct batch result here.
195                            let result = rx.borrow()
196                                .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
197
198                            if result.is_ok() {
199                                tracing::trace!(?result, "validated redpallas signature");
200                                metrics::counter!("signatures.redpallas.validated").increment(1);
201                            } else {
202                                tracing::trace!(?result, "invalid redpallas signature");
203                                metrics::counter!("signatures.redpallas.invalid").increment(1);
204                            }
205
206                            result.map_err(BoxError::from)
207                        }
208                        Err(_recv_error) => panic!("verifier was dropped without flushing"),
209                    }
210                })
211            }
212
213            BatchControl::Flush => {
214                tracing::trace!("got redpallas flush command");
215
216                let (batch, tx) = self.take();
217
218                Box::pin(Self::flush_spawning(batch, tx).map(Ok))
219            }
220        }
221    }
222}
223
224impl Drop for Verifier {
225    fn drop(&mut self) {
226        // We need to flush the current batch in case there are still any pending futures.
227        // This returns immediately, usually before the batch is completed.
228        self.flush_blocking();
229    }
230}