zebra_consensus/primitives/
ed25519.rs1use std::{
4 future::Future,
5 mem,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{future::BoxFuture, FutureExt};
11use once_cell::sync::Lazy;
12use rand::thread_rng;
13
14use tokio::sync::watch;
15use tower::{util::ServiceFn, Service};
16use tower_batch_control::{Batch, BatchControl, RequestWeight};
17use tower_fallback::Fallback;
18use zebra_chain::primitives::ed25519::*;
19
20use crate::BoxError;
21
22use super::{spawn_fifo, spawn_fifo_and_convert};
23
24#[cfg(test)]
25mod tests;
26
27type BatchVerifier = batch::Verifier;
29
30type VerifyResult = Result<(), Error>;
32
33type Sender = watch::Sender<Option<VerifyResult>>;
35
36#[derive(Clone, Debug)]
39pub struct Item(batch::Item);
40
41impl RequestWeight for Item {}
42
43impl<'msg, M: AsRef<[u8]> + ?Sized> From<(VerificationKeyBytes, Signature, &'msg M)> for Item {
44 fn from(tup: (VerificationKeyBytes, Signature, &'msg M)) -> Self {
45 Self(batch::Item::from(tup))
46 }
47}
48
49impl From<Item> for batch::Item {
50 fn from(Item(item): Item) -> Self {
51 item
52 }
53}
54
55impl Item {
56 fn verify_single(self) -> VerifyResult {
57 self.0.verify_single()
58 }
59}
60
61pub static VERIFIER: Lazy<
70 Fallback<
71 Batch<Verifier, Item>,
72 ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
73 >,
74> = Lazy::new(|| {
75 Fallback::new(
76 Batch::new(
77 Verifier::default(),
78 super::MAX_BATCH_SIZE,
79 None,
80 super::MAX_BATCH_LATENCY,
81 ),
82 tower::service_fn(
92 (|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
93 ),
94 )
95});
96
97pub struct Verifier {
99 batch: BatchVerifier,
101
102 tx: Sender,
107}
108
109impl Default for Verifier {
110 fn default() -> Self {
111 let batch = BatchVerifier::default();
112 let (tx, _) = watch::channel(None);
113 Self { batch, tx }
114 }
115}
116
117impl Verifier {
118 fn take(&mut self) -> (BatchVerifier, Sender) {
121 let batch = mem::take(&mut self.batch);
123
124 let (tx, _) = watch::channel(None);
125 let tx = mem::replace(&mut self.tx, tx);
126
127 (batch, tx)
128 }
129
130 fn verify(batch: BatchVerifier, tx: Sender) {
133 let result = batch.verify(thread_rng());
134 let _ = tx.send(Some(result));
135 }
136
137 fn flush_blocking(&mut self) {
140 let (batch, tx) = self.take();
141
142 tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
146 }
147
148 async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
151 let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
153 }
154
155 async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
157 spawn_fifo_and_convert(move || item.verify_single()).await
159 }
160}
161
162impl Service<BatchControl<Item>> for Verifier {
163 type Response = ();
164 type Error = BoxError;
165 type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
166
167 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168 Poll::Ready(Ok(()))
169 }
170
171 fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
172 match req {
173 BatchControl::Item(item) => {
174 tracing::trace!("got ed25519 item");
175 self.batch.queue(item);
176 let mut rx = self.tx.subscribe();
177
178 Box::pin(async move {
179 match rx.changed().await {
180 Ok(()) => {
181 let result = rx.borrow()
184 .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
185
186 if result.is_ok() {
187 tracing::trace!(?result, "validated ed25519 signature");
188 metrics::counter!("signatures.ed25519.validated").increment(1);
189 } else {
190 tracing::trace!(?result, "invalid ed25519 signature");
191 metrics::counter!("signatures.ed25519.invalid").increment(1);
192 }
193 result.map_err(BoxError::from)
194 }
195 Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
196 }
197 })
198 }
199
200 BatchControl::Flush => {
201 tracing::trace!("got ed25519 flush command");
202
203 let (batch, tx) = self.take();
204
205 Box::pin(Self::flush_spawning(batch, tx).map(Ok))
206 }
207 }
208 }
209}
210
211impl Drop for Verifier {
212 fn drop(&mut self) {
213 self.flush_blocking();
216 }
217}