zebra_consensus/primitives/halo2.rs
1//! Async Halo2 batch verifier service
2
3use std::{
4 fmt,
5 future::Future,
6 mem,
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use futures::{future::BoxFuture, FutureExt};
12use once_cell::sync::Lazy;
13use orchard::{bundle::BatchValidator, circuit::VerifyingKey};
14use rand::thread_rng;
15use zcash_protocol::value::ZatBalance;
16use zebra_chain::transaction::SigHash;
17
18use crate::BoxError;
19use thiserror::Error;
20use tokio::sync::watch;
21use tower::{util::ServiceFn, Service};
22use tower_batch_control::{Batch, BatchControl, RequestWeight};
23use tower_fallback::Fallback;
24
25use super::spawn_fifo;
26
27/// Adjusted batch size for halo2 batches.
28///
29/// Unlike other batch verifiers, halo2 has aggregate proofs.
30/// This means that there can be hundreds of actions verified by some proofs,
31/// but just one action in others.
32///
33/// To compensate for larger proofs, we process the batch once there are over
34/// [`HALO2_MAX_BATCH_SIZE`] total actions among pending items in the queue.
35const HALO2_MAX_BATCH_SIZE: usize = super::MAX_BATCH_SIZE;
36
37/// The type of verification results.
38type VerifyResult = bool;
39
40/// The type of the batch sender channel.
41type Sender = watch::Sender<Option<VerifyResult>>;
42
43/// Temporary substitute type for fake batch verification.
44///
45/// TODO: implement batch verification
46pub type BatchVerifyingKey = ItemVerifyingKey;
47
48/// The type of a prepared verifying key.
49/// This is the key used to verify individual items.
50pub type ItemVerifyingKey = VerifyingKey;
51
52lazy_static::lazy_static! {
53 /// The halo2 proof verifying key.
54 pub static ref VERIFYING_KEY: ItemVerifyingKey = ItemVerifyingKey::build();
55}
56
57/// A Halo2 verification item, used as the request type of the service.
58#[derive(Clone, Debug)]
59pub struct Item {
60 bundle: orchard::bundle::Bundle<orchard::bundle::Authorized, ZatBalance>,
61 sighash: SigHash,
62}
63
64impl RequestWeight for Item {
65 fn request_weight(&self) -> usize {
66 self.bundle.actions().len()
67 }
68}
69
70impl Item {
71 /// Creates a new [`Item`] from a bundle and sighash.
72 pub fn new(
73 bundle: orchard::bundle::Bundle<orchard::bundle::Authorized, ZatBalance>,
74 sighash: SigHash,
75 ) -> Self {
76 Self { bundle, sighash }
77 }
78
79 /// Perform non-batched verification of this [`Item`].
80 ///
81 /// This is useful (in combination with `Item::clone`) for implementing
82 /// fallback logic when batch verification fails.
83 pub fn verify_single(self, vk: &ItemVerifyingKey) -> bool {
84 let mut batch = BatchValidator::default();
85 batch.queue(self);
86 batch.validate(vk, thread_rng())
87 }
88}
89
90trait QueueBatchVerify {
91 fn queue(&mut self, item: Item);
92}
93
94impl QueueBatchVerify for BatchValidator {
95 fn queue(&mut self, Item { bundle, sighash }: Item) {
96 self.add_bundle(&bundle, sighash.0);
97 }
98}
99
100/// An error that may occur when verifying [Halo2 proofs of Zcash Orchard Action
101/// descriptions][actions].
102///
103/// [actions]: https://zips.z.cash/protocol/protocol.pdf#actiondesc
104// TODO: if halo2::plonk::Error gets the std::error::Error trait derived on it,
105// remove this and just wrap `halo2::plonk::Error` as an enum variant of
106// `crate::transaction::Error`, which does the trait derivation via `thiserror`
107#[derive(Clone, Debug, Error, Eq, PartialEq)]
108#[allow(missing_docs)]
109pub enum Halo2Error {
110 #[error("the constraint system is not satisfied")]
111 ConstraintSystemFailure,
112 #[error("unknown Halo2 error")]
113 Other,
114}
115
116impl From<halo2::plonk::Error> for Halo2Error {
117 fn from(err: halo2::plonk::Error) -> Halo2Error {
118 match err {
119 halo2::plonk::Error::ConstraintSystemFailure => Halo2Error::ConstraintSystemFailure,
120 _ => Halo2Error::Other,
121 }
122 }
123}
124
125/// Global batch verification context for Halo2 proofs of Action statements.
126///
127/// This service transparently batches contemporaneous proof verifications,
128/// handling batch failures by falling back to individual verification.
129///
130/// Note that making a `Service` call requires mutable access to the service, so
131/// you should call `.clone()` on the global handle to create a local, mutable
132/// handle.
133pub static VERIFIER: Lazy<
134 Fallback<
135 Batch<Verifier, Item>,
136 ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
137 >,
138> = Lazy::new(|| {
139 Fallback::new(
140 Batch::new(
141 Verifier::new(&VERIFYING_KEY),
142 HALO2_MAX_BATCH_SIZE,
143 None,
144 super::MAX_BATCH_LATENCY,
145 ),
146 // We want to fallback to individual verification if batch verification fails,
147 // so we need a Service to use.
148 //
149 // Because we have to specify the type of a static, we need to be able to
150 // write the type of the closure and its return value. But both closures and
151 // async blocks have unnameable types. So instead we cast the closure to a function
152 // (which is possible because it doesn't capture any state), and use a BoxFuture
153 // to erase the result type.
154 // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
155 tower::service_fn(
156 (|item: Item| Verifier::verify_single_spawning(item, &VERIFYING_KEY).boxed())
157 as fn(_) -> _,
158 ),
159 )
160});
161
162/// Halo2 proof verifier implementation
163///
164/// This is the core implementation for the batch verification logic of the
165/// Halo2 verifier. It handles batching incoming requests, driving batches to
166/// completion, and reporting results.
167pub struct Verifier {
168 /// The synchronous Halo2 batch validator.
169 batch: BatchValidator,
170
171 /// The halo2 proof verification key.
172 ///
173 /// Making this 'static makes managing lifetimes much easier.
174 vk: &'static ItemVerifyingKey,
175
176 /// A channel for broadcasting the result of a batch to the futures for each batch item.
177 ///
178 /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
179 /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
180 tx: Sender,
181}
182
183impl Verifier {
184 fn new(vk: &'static ItemVerifyingKey) -> Self {
185 let batch = BatchValidator::default();
186 let (tx, _) = watch::channel(None);
187 Self { batch, vk, tx }
188 }
189
190 /// Returns the batch verifier and channel sender from `self`,
191 /// replacing them with a new empty batch.
192 fn take(&mut self) -> (BatchValidator, &'static BatchVerifyingKey, Sender) {
193 // Use a new verifier and channel for each batch.
194 let batch = mem::take(&mut self.batch);
195
196 let (tx, _) = watch::channel(None);
197 let tx = mem::replace(&mut self.tx, tx);
198
199 (batch, self.vk, tx)
200 }
201
202 /// Synchronously process the batch, and send the result using the channel sender.
203 /// This function blocks until the batch is completed.
204 fn verify(batch: BatchValidator, vk: &'static BatchVerifyingKey, tx: Sender) {
205 let result = batch.validate(vk, thread_rng());
206 let _ = tx.send(Some(result));
207 }
208
209 /// Flush the batch using a thread pool, and return the result via the channel.
210 /// This returns immediately, usually before the batch is completed.
211 fn flush_blocking(&mut self) {
212 let (batch, vk, tx) = self.take();
213
214 // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
215 //
216 // We don't care about execution order here, because this method is only called on drop.
217 tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx)));
218 }
219
220 /// Flush the batch using a thread pool, and return the result via the channel.
221 /// This function returns a future that becomes ready when the batch is completed.
222 async fn flush_spawning(batch: BatchValidator, vk: &'static BatchVerifyingKey, tx: Sender) {
223 // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
224 let _ = tx.send(
225 spawn_fifo(move || batch.validate(vk, thread_rng()))
226 .await
227 .ok(),
228 );
229 }
230
231 /// Verify a single item using a thread pool, and return the result.
232 async fn verify_single_spawning(
233 item: Item,
234 pvk: &'static ItemVerifyingKey,
235 ) -> Result<(), BoxError> {
236 // TODO: Restore code for verifying single proofs or return a result from batch.validate()
237 // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
238 if spawn_fifo(move || item.verify_single(pvk)).await? {
239 Ok(())
240 } else {
241 Err("could not validate orchard proof".into())
242 }
243 }
244}
245
246impl fmt::Debug for Verifier {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 let name = "Verifier";
249 f.debug_struct(name)
250 .field("batch", &"..")
251 .field("vk", &"..")
252 .field("tx", &self.tx)
253 .finish()
254 }
255}
256
257impl Service<BatchControl<Item>> for Verifier {
258 type Response = ();
259 type Error = BoxError;
260 type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
261
262 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263 Poll::Ready(Ok(()))
264 }
265
266 fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
267 match req {
268 BatchControl::Item(item) => {
269 tracing::trace!("got item");
270 self.batch.queue(item);
271 let mut rx = self.tx.subscribe();
272 Box::pin(async move {
273 match rx.changed().await {
274 Ok(()) => {
275 // We use a new channel for each batch,
276 // so we always get the correct batch result here.
277 let is_valid = *rx
278 .borrow()
279 .as_ref()
280 .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
281
282 if is_valid {
283 tracing::trace!(?is_valid, "verified halo2 proof");
284 metrics::counter!("proofs.halo2.verified").increment(1);
285 Ok(())
286 } else {
287 tracing::trace!(?is_valid, "invalid halo2 proof");
288 metrics::counter!("proofs.halo2.invalid").increment(1);
289 Err("could not validate halo2 proofs".into())
290 }
291 }
292 Err(_recv_error) => panic!("verifier was dropped without flushing"),
293 }
294 })
295 }
296
297 BatchControl::Flush => {
298 tracing::trace!("got halo2 flush command");
299
300 let (batch, vk, tx) = self.take();
301
302 Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok))
303 }
304 }
305 }
306}
307
308impl Drop for Verifier {
309 fn drop(&mut self) {
310 // We need to flush the current batch in case there are still any pending futures.
311 // This returns immediately, usually before the batch is completed.
312 self.flush_blocking()
313 }
314}