zebra_consensus/primitives/redpallas.rs
1//! Async RedPallas batch verifier service
2
3use std::{
4 future::Future,
5 mem,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl};
17use tower_fallback::Fallback;
18
19use zebra_chain::primitives::reddsa::{batch, orchard, Error};
20
21use crate::BoxError;
22
23use super::{spawn_fifo, spawn_fifo_and_convert};
24
25#[cfg(test)]
26mod tests;
27
28/// The type of the batch verifier.
29type BatchVerifier = batch::Verifier<orchard::SpendAuth, orchard::Binding>;
30
31/// The type of verification results.
32type VerifyResult = Result<(), Error>;
33
34/// The type of the batch sender channel.
35type Sender = watch::Sender<Option<VerifyResult>>;
36
37/// The type of the batch item.
38/// This is a `RedPallasItem`.
39pub type Item = batch::Item<orchard::SpendAuth, orchard::Binding>;
40
41/// Global batch verification context for RedPallas signatures.
42///
43/// This service transparently batches contemporaneous signature verifications,
44/// handling batch failures by falling back to individual verification.
45///
46/// Note that making a `Service` call requires mutable access to the service, so
47/// you should call `.clone()` on the global handle to create a local, mutable
48/// handle.
49pub static VERIFIER: Lazy<
50 Fallback<
51 Batch<Verifier, Item>,
52 ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
53 >,
54> = Lazy::new(|| {
55 Fallback::new(
56 Batch::new(
57 Verifier::default(),
58 super::MAX_BATCH_SIZE,
59 None,
60 super::MAX_BATCH_LATENCY,
61 ),
62 // We want to fallback to individual verification if batch verification fails,
63 // so we need a Service to use.
64 //
65 // Because we have to specify the type of a static, we need to be able to
66 // write the type of the closure and its return value. But both closures and
67 // async blocks have unnameable types. So instead we cast the closure to a function
68 // (which is possible because it doesn't capture any state), and use a BoxFuture
69 // to erase the result type.
70 // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
71 tower::service_fn(
72 (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
73 ),
74 )
75});
76
77/// RedPallas signature verifier service
78pub struct Verifier {
79 /// A batch verifier for RedPallas signatures.
80 batch: BatchVerifier,
81
82 /// A channel for broadcasting the result of a batch to the futures for each batch item.
83 ///
84 /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
85 /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
86 tx: Sender,
87}
88
89impl Default for Verifier {
90 fn default() -> Self {
91 let batch = BatchVerifier::default();
92 let (tx, _) = watch::channel(None);
93 Self { batch, tx }
94 }
95}
96
97impl Verifier {
98 /// Returns the batch verifier and channel sender from `self`,
99 /// replacing them with a new empty batch.
100 fn take(&mut self) -> (BatchVerifier, Sender) {
101 // Use a new verifier and channel for each batch.
102 let batch = mem::take(&mut self.batch);
103
104 let (tx, _) = watch::channel(None);
105 let tx = mem::replace(&mut self.tx, tx);
106
107 (batch, tx)
108 }
109
110 /// Synchronously process the batch, and send the result using the channel sender.
111 /// This function blocks until the batch is completed.
112 fn verify(batch: BatchVerifier, tx: Sender) {
113 let result = batch.verify(thread_rng());
114 let _ = tx.send(Some(result));
115 }
116
117 /// Flush the batch using a thread pool, and return the result via the channel.
118 /// This returns immediately, usually before the batch is completed.
119 fn flush_blocking(&mut self) {
120 let (batch, tx) = self.take();
121
122 // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
123 //
124 // We don't care about execution order here, because this method is only called on drop.
125 tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
126 }
127
128 /// Flush the batch using a thread pool, and return the result via the channel.
129 /// This function returns a future that becomes ready when the batch is completed.
130 async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
131 // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
132 let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
133 }
134
135 /// Verify a single item using a thread pool, and return the result.
136 async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
137 // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
138 spawn_fifo_and_convert(move || item.verify_single()).await
139 }
140}
141
142impl Service<BatchControl<Item>> for Verifier {
143 type Response = ();
144 type Error = BoxError;
145 type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
146
147 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 Poll::Ready(Ok(()))
149 }
150
151 fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
152 match req {
153 BatchControl::Item(item) => {
154 tracing::trace!("got item");
155 self.batch.queue(item);
156 let mut rx = self.tx.subscribe();
157 Box::pin(async move {
158 match rx.changed().await {
159 Ok(()) => {
160 // We use a new channel for each batch,
161 // so we always get the correct batch result here.
162 let result = rx.borrow()
163 .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
164
165 if result.is_ok() {
166 tracing::trace!(?result, "validated redpallas signature");
167 metrics::counter!("signatures.redpallas.validated").increment(1);
168 } else {
169 tracing::trace!(?result, "invalid redpallas signature");
170 metrics::counter!("signatures.redpallas.invalid").increment(1);
171 }
172
173 result.map_err(BoxError::from)
174 }
175 Err(_recv_error) => panic!("verifier was dropped without flushing"),
176 }
177 })
178 }
179
180 BatchControl::Flush => {
181 tracing::trace!("got redpallas flush command");
182
183 let (batch, tx) = self.take();
184
185 Box::pin(Self::flush_spawning(batch, tx).map(Ok))
186 }
187 }
188 }
189}
190
191impl Drop for Verifier {
192 fn drop(&mut self) {
193 // We need to flush the current batch in case there are still any pending futures.
194 // This returns immediately, usually before the batch is completed.
195 self.flush_blocking();
196 }
197}