zebra_consensus/primitives/
halo2.rs

1//! Async Halo2 batch verifier service
2
3use std::{
4    fmt,
5    future::Future,
6    mem,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use futures::{future::BoxFuture, FutureExt};
12use once_cell::sync::Lazy;
13use orchard::{bundle::BatchValidator, circuit::VerifyingKey};
14use rand::thread_rng;
15use zcash_protocol::value::ZatBalance;
16use zebra_chain::transaction::SigHash;
17
18use crate::BoxError;
19use thiserror::Error;
20use tokio::sync::watch;
21use tower::{util::ServiceFn, Service};
22use tower_batch_control::{Batch, BatchControl, RequestWeight};
23use tower_fallback::Fallback;
24
25use super::spawn_fifo;
26
27/// Adjusted batch size for halo2 batches.
28///
29/// Unlike other batch verifiers, halo2 has aggregate proofs.
30/// This means that there can be hundreds of actions verified by some proofs,
31/// but just one action in others.
32///
33/// To compensate for larger proofs, we process the batch once there are over
34/// [`HALO2_MAX_BATCH_SIZE`] total actions among pending items in the queue.
35const HALO2_MAX_BATCH_SIZE: usize = super::MAX_BATCH_SIZE;
36
37/// The type of verification results.
38type VerifyResult = bool;
39
40/// The type of the batch sender channel.
41type Sender = watch::Sender<Option<VerifyResult>>;
42
43/// Temporary substitute type for fake batch verification.
44///
45/// TODO: implement batch verification
46pub type BatchVerifyingKey = ItemVerifyingKey;
47
48/// The type of a prepared verifying key.
49/// This is the key used to verify individual items.
50pub type ItemVerifyingKey = VerifyingKey;
51
52lazy_static::lazy_static! {
53    /// The halo2 proof verifying key.
54    pub static ref VERIFYING_KEY: ItemVerifyingKey = ItemVerifyingKey::build();
55}
56
57/// A Halo2 verification item, used as the request type of the service.
58#[derive(Clone, Debug)]
59pub struct Item {
60    bundle: orchard::bundle::Bundle<orchard::bundle::Authorized, ZatBalance>,
61    sighash: SigHash,
62}
63
64impl RequestWeight for Item {
65    fn request_weight(&self) -> usize {
66        self.bundle.actions().len()
67    }
68}
69
70impl Item {
71    /// Creates a new [`Item`] from a bundle and sighash.
72    pub fn new(
73        bundle: orchard::bundle::Bundle<orchard::bundle::Authorized, ZatBalance>,
74        sighash: SigHash,
75    ) -> Self {
76        Self { bundle, sighash }
77    }
78
79    /// Perform non-batched verification of this [`Item`].
80    ///
81    /// This is useful (in combination with `Item::clone`) for implementing
82    /// fallback logic when batch verification fails.
83    pub fn verify_single(self, vk: &ItemVerifyingKey) -> bool {
84        let mut batch = BatchValidator::default();
85        batch.queue(self);
86        batch.validate(vk, thread_rng())
87    }
88}
89
90trait QueueBatchVerify {
91    fn queue(&mut self, item: Item);
92}
93
94impl QueueBatchVerify for BatchValidator {
95    fn queue(&mut self, Item { bundle, sighash }: Item) {
96        self.add_bundle(&bundle, sighash.0);
97    }
98}
99
100/// An error that may occur when verifying [Halo2 proofs of Zcash Orchard Action
101/// descriptions][actions].
102///
103/// [actions]: https://zips.z.cash/protocol/protocol.pdf#actiondesc
104// TODO: if halo2::plonk::Error gets the std::error::Error trait derived on it,
105// remove this and just wrap `halo2::plonk::Error` as an enum variant of
106// `crate::transaction::Error`, which does the trait derivation via `thiserror`
107#[derive(Clone, Debug, Error, Eq, PartialEq)]
108#[allow(missing_docs)]
109pub enum Halo2Error {
110    #[error("the constraint system is not satisfied")]
111    ConstraintSystemFailure,
112    #[error("unknown Halo2 error")]
113    Other,
114}
115
116impl From<halo2::plonk::Error> for Halo2Error {
117    fn from(err: halo2::plonk::Error) -> Halo2Error {
118        match err {
119            halo2::plonk::Error::ConstraintSystemFailure => Halo2Error::ConstraintSystemFailure,
120            _ => Halo2Error::Other,
121        }
122    }
123}
124
125/// Global batch verification context for Halo2 proofs of Action statements.
126///
127/// This service transparently batches contemporaneous proof verifications,
128/// handling batch failures by falling back to individual verification.
129///
130/// Note that making a `Service` call requires mutable access to the service, so
131/// you should call `.clone()` on the global handle to create a local, mutable
132/// handle.
133pub static VERIFIER: Lazy<
134    Fallback<
135        Batch<Verifier, Item>,
136        ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
137    >,
138> = Lazy::new(|| {
139    Fallback::new(
140        Batch::new(
141            Verifier::new(&VERIFYING_KEY),
142            HALO2_MAX_BATCH_SIZE,
143            None,
144            super::MAX_BATCH_LATENCY,
145        ),
146        // We want to fallback to individual verification if batch verification fails,
147        // so we need a Service to use.
148        //
149        // Because we have to specify the type of a static, we need to be able to
150        // write the type of the closure and its return value. But both closures and
151        // async blocks have unnameable types. So instead we cast the closure to a function
152        // (which is possible because it doesn't capture any state), and use a BoxFuture
153        // to erase the result type.
154        // (We can't use BoxCloneService to erase the service type, because it is !Sync.)
155        tower::service_fn(
156            (|item: Item| Verifier::verify_single_spawning(item, &VERIFYING_KEY).boxed())
157                as fn(_) -> _,
158        ),
159    )
160});
161
162/// Halo2 proof verifier implementation
163///
164/// This is the core implementation for the batch verification logic of the
165/// Halo2 verifier. It handles batching incoming requests, driving batches to
166/// completion, and reporting results.
167pub struct Verifier {
168    /// The synchronous Halo2 batch validator.
169    batch: BatchValidator,
170
171    /// The halo2 proof verification key.
172    ///
173    /// Making this 'static makes managing lifetimes much easier.
174    vk: &'static ItemVerifyingKey,
175
176    /// A channel for broadcasting the result of a batch to the futures for each batch item.
177    ///
178    /// Each batch gets a newly created channel, so there is only ever one result sent per channel.
179    /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
180    tx: Sender,
181}
182
183impl Verifier {
184    fn new(vk: &'static ItemVerifyingKey) -> Self {
185        let batch = BatchValidator::default();
186        let (tx, _) = watch::channel(None);
187        Self { batch, vk, tx }
188    }
189
190    /// Returns the batch verifier and channel sender from `self`,
191    /// replacing them with a new empty batch.
192    fn take(&mut self) -> (BatchValidator, &'static BatchVerifyingKey, Sender) {
193        // Use a new verifier and channel for each batch.
194        let batch = mem::take(&mut self.batch);
195
196        let (tx, _) = watch::channel(None);
197        let tx = mem::replace(&mut self.tx, tx);
198
199        (batch, self.vk, tx)
200    }
201
202    /// Synchronously process the batch, and send the result using the channel sender.
203    /// This function blocks until the batch is completed.
204    fn verify(batch: BatchValidator, vk: &'static BatchVerifyingKey, tx: Sender) {
205        let result = batch.validate(vk, thread_rng());
206        let _ = tx.send(Some(result));
207    }
208
209    /// Flush the batch using a thread pool, and return the result via the channel.
210    /// This returns immediately, usually before the batch is completed.
211    fn flush_blocking(&mut self) {
212        let (batch, vk, tx) = self.take();
213
214        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
215        //
216        // We don't care about execution order here, because this method is only called on drop.
217        tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, vk, tx)));
218    }
219
220    /// Flush the batch using a thread pool, and return the result via the channel.
221    /// This function returns a future that becomes ready when the batch is completed.
222    async fn flush_spawning(batch: BatchValidator, vk: &'static BatchVerifyingKey, tx: Sender) {
223        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
224        let _ = tx.send(
225            spawn_fifo(move || batch.validate(vk, thread_rng()))
226                .await
227                .ok(),
228        );
229    }
230
231    /// Verify a single item using a thread pool, and return the result.
232    async fn verify_single_spawning(
233        item: Item,
234        pvk: &'static ItemVerifyingKey,
235    ) -> Result<(), BoxError> {
236        // TODO: Restore code for verifying single proofs or return a result from batch.validate()
237        // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
238        if spawn_fifo(move || item.verify_single(pvk)).await? {
239            Ok(())
240        } else {
241            Err("could not validate orchard proof".into())
242        }
243    }
244}
245
246impl fmt::Debug for Verifier {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        let name = "Verifier";
249        f.debug_struct(name)
250            .field("batch", &"..")
251            .field("vk", &"..")
252            .field("tx", &self.tx)
253            .finish()
254    }
255}
256
257impl Service<BatchControl<Item>> for Verifier {
258    type Response = ();
259    type Error = BoxError;
260    type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
261
262    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263        Poll::Ready(Ok(()))
264    }
265
266    fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
267        match req {
268            BatchControl::Item(item) => {
269                tracing::trace!("got item");
270                self.batch.queue(item);
271                let mut rx = self.tx.subscribe();
272                Box::pin(async move {
273                    match rx.changed().await {
274                        Ok(()) => {
275                            // We use a new channel for each batch,
276                            // so we always get the correct batch result here.
277                            let is_valid = *rx
278                                .borrow()
279                                .as_ref()
280                                .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
281
282                            if is_valid {
283                                tracing::trace!(?is_valid, "verified halo2 proof");
284                                metrics::counter!("proofs.halo2.verified").increment(1);
285                                Ok(())
286                            } else {
287                                tracing::trace!(?is_valid, "invalid halo2 proof");
288                                metrics::counter!("proofs.halo2.invalid").increment(1);
289                                Err("could not validate halo2 proofs".into())
290                            }
291                        }
292                        Err(_recv_error) => panic!("verifier was dropped without flushing"),
293                    }
294                })
295            }
296
297            BatchControl::Flush => {
298                tracing::trace!("got halo2 flush command");
299
300                let (batch, vk, tx) = self.take();
301
302                Box::pin(Self::flush_spawning(batch, vk, tx).map(Ok))
303            }
304        }
305    }
306}
307
308impl Drop for Verifier {
309    fn drop(&mut self) {
310        // We need to flush the current batch in case there are still any pending futures.
311        // This returns immediately, usually before the batch is completed.
312        self.flush_blocking()
313    }
314}