zebra_consensus/primitives/
redpallas.rs1use std::{
4 future::Future,
5 mem,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18
19use zebra_chain::primitives::reddsa::{batch, orchard, Error, Signature, VerificationKeyBytes};
20
21use crate::BoxError;
22
23use super::{spawn_fifo, spawn_fifo_and_convert};
24
25#[cfg(test)]
26mod tests;
27
28type BatchVerifier = batch::Verifier<orchard::SpendAuth, orchard::Binding>;
30
31type VerifyResult = Result<(), Error>;
33
34type Sender = watch::Sender<Option<VerifyResult>>;
36
37#[derive(Clone, Debug)]
40pub struct Item(batch::Item<orchard::SpendAuth, orchard::Binding>);
41
42impl RequestWeight for Item {}
43
44impl From<Item> for batch::Item<orchard::SpendAuth, orchard::Binding> {
45 fn from(Item(item): Item) -> Self {
46 item
47 }
48}
49
50impl Item {
51 fn verify_single(self) -> VerifyResult {
52 self.0.verify_single()
53 }
54
55 pub fn from_spendauth(
57 vk_bytes: VerificationKeyBytes<orchard::SpendAuth>,
58 sig: Signature<orchard::SpendAuth>,
59 msg: &impl AsRef<[u8]>,
60 ) -> Self {
61 Self(batch::Item::from_spendauth(vk_bytes, sig, msg))
62 }
63
64 pub fn from_binding(
66 vk_bytes: VerificationKeyBytes<orchard::Binding>,
67 sig: Signature<orchard::Binding>,
68 msg: &impl AsRef<[u8]>,
69 ) -> Self {
70 Self(batch::Item::from_binding(vk_bytes, sig, msg))
71 }
72}
73
74pub static VERIFIER: Lazy<
83 Fallback<
84 Batch<Verifier, Item>,
85 ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
86 >,
87> = Lazy::new(|| {
88 Fallback::new(
89 Batch::new(
90 Verifier::default(),
91 super::MAX_BATCH_SIZE,
92 None,
93 super::MAX_BATCH_LATENCY,
94 ),
95 tower::service_fn(
105 (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
106 ),
107 )
108});
109
110pub struct Verifier {
112 batch: BatchVerifier,
114
115 tx: Sender,
120}
121
122impl Default for Verifier {
123 fn default() -> Self {
124 let batch = BatchVerifier::default();
125 let (tx, _) = watch::channel(None);
126 Self { batch, tx }
127 }
128}
129
130impl Verifier {
131 fn take(&mut self) -> (BatchVerifier, Sender) {
134 let batch = mem::take(&mut self.batch);
136
137 let (tx, _) = watch::channel(None);
138 let tx = mem::replace(&mut self.tx, tx);
139
140 (batch, tx)
141 }
142
143 fn verify(batch: BatchVerifier, tx: Sender) {
146 let result = batch.verify(thread_rng());
147 let _ = tx.send(Some(result));
148 }
149
150 fn flush_blocking(&mut self) {
153 let (batch, tx) = self.take();
154
155 tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
159 }
160
161 async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
164 let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
166 }
167
168 async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
170 spawn_fifo_and_convert(move || item.verify_single()).await
172 }
173}
174
175impl Service<BatchControl<Item>> for Verifier {
176 type Response = ();
177 type Error = BoxError;
178 type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
179
180 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 Poll::Ready(Ok(()))
182 }
183
184 fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
185 match req {
186 BatchControl::Item(item) => {
187 tracing::trace!("got item");
188 self.batch.queue(item);
189 let mut rx = self.tx.subscribe();
190 Box::pin(async move {
191 match rx.changed().await {
192 Ok(()) => {
193 let result = rx.borrow()
196 .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
197
198 if result.is_ok() {
199 tracing::trace!(?result, "validated redpallas signature");
200 metrics::counter!("signatures.redpallas.validated").increment(1);
201 } else {
202 tracing::trace!(?result, "invalid redpallas signature");
203 metrics::counter!("signatures.redpallas.invalid").increment(1);
204 }
205
206 result.map_err(BoxError::from)
207 }
208 Err(_recv_error) => panic!("verifier was dropped without flushing"),
209 }
210 })
211 }
212
213 BatchControl::Flush => {
214 tracing::trace!("got redpallas flush command");
215
216 let (batch, tx) = self.take();
217
218 Box::pin(Self::flush_spawning(batch, tx).map(Ok))
219 }
220 }
221 }
222}
223
224impl Drop for Verifier {
225 fn drop(&mut self) {
226 self.flush_blocking();
229 }
230}