1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
//! Async Ed25519 batch verifier service
use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rand::thread_rng;
use tokio::sync::watch;
use tower::{util::ServiceFn, Service};
use tower_batch_control::{Batch, BatchControl};
use tower_fallback::Fallback;
use zebra_chain::primitives::ed25519::*;
use crate::BoxError;
use super::{spawn_fifo, spawn_fifo_and_convert};
#[cfg(test)]
mod tests;
/// The type of the batch verifier.
type BatchVerifier = batch::Verifier;
/// The type of verification results.
type VerifyResult = Result<(), Error>;
/// The type of the batch sender channel.
type Sender = watch::Sender<Option<VerifyResult>>;
/// The type of the batch item.
/// This is an `Ed25519Item`.
pub type Item = batch::Item;
/// Global batch verification context for Ed25519 signatures.
///
/// This service transparently batches contemporaneous signature verifications,
/// handling batch failures by falling back to individual verification.
///
/// Note that making a `Service` call requires mutable access to the service, so
/// you should call `.clone()` on the global handle to create a local, mutable
/// handle.
pub static VERIFIER: Lazy<
Fallback<
Batch<Verifier, Item>,
ServiceFn<fn(Item) -> BoxFuture<'static, Result<(), BoxError>>>,
>,
> = Lazy::new(|| {
Fallback::new(
Batch::new(
Verifier::default(),
super::MAX_BATCH_SIZE,
None,
super::MAX_BATCH_LATENCY,
),
// We want to fallback to individual verification if batch verification fails,
// so we need a Service to use.
//
// Because we have to specify the type of a static, we need to be able to
// write the type of the closure and its return value. But both closures and
// async blocks have unnameable types. So instead we cast the closure to a function
// (which is possible because it doesn't capture any state), and use a BoxFuture
// to erase the result type.
// (We can't use BoxCloneService to erase the service type, because it is !Sync.)
tower::service_fn(
(|item: Item| Verifier::verify_single_spawning(item).boxed()) as fn(_) -> _,
),
)
});
/// Ed25519 signature verifier service
pub struct Verifier {
/// A batch verifier for ed25519 signatures.
batch: BatchVerifier,
/// A channel for broadcasting the result of a batch to the futures for each batch item.
///
/// Each batch gets a newly created channel, so there is only ever one result sent per channel.
/// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel.
tx: Sender,
}
impl Default for Verifier {
fn default() -> Self {
let batch = BatchVerifier::default();
let (tx, _) = watch::channel(None);
Self { batch, tx }
}
}
impl Verifier {
/// Returns the batch verifier and channel sender from `self`,
/// replacing them with a new empty batch.
fn take(&mut self) -> (BatchVerifier, Sender) {
// Use a new verifier and channel for each batch.
let batch = mem::take(&mut self.batch);
let (tx, _) = watch::channel(None);
let tx = mem::replace(&mut self.tx, tx);
(batch, tx)
}
/// Synchronously process the batch, and send the result using the channel sender.
/// This function blocks until the batch is completed.
fn verify(batch: BatchVerifier, tx: Sender) {
let result = batch.verify(thread_rng());
let _ = tx.send(Some(result));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This returns immediately, usually before the batch is completed.
fn flush_blocking(&mut self) {
let (batch, tx) = self.take();
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// We don't care about execution order here, because this method is only called on drop.
tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx)));
}
/// Flush the batch using a thread pool, and return the result via the channel.
/// This function returns a future that becomes ready when the batch is completed.
async fn flush_spawning(batch: BatchVerifier, tx: Sender) {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok());
}
/// Verify a single item using a thread pool, and return the result.
async fn verify_single_spawning(item: Item) -> Result<(), BoxError> {
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
spawn_fifo_and_convert(move || item.verify_single()).await
}
}
impl Service<BatchControl<Item>> for Verifier {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<Item>) -> Self::Future {
match req {
BatchControl::Item(item) => {
tracing::trace!("got ed25519 item");
self.batch.queue(item);
let mut rx = self.tx.subscribe();
Box::pin(async move {
match rx.changed().await {
Ok(()) => {
// We use a new channel for each batch,
// so we always get the correct batch result here.
let result = rx.borrow()
.ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?;
if result.is_ok() {
tracing::trace!(?result, "validated ed25519 signature");
metrics::counter!("signatures.ed25519.validated").increment(1);
} else {
tracing::trace!(?result, "invalid ed25519 signature");
metrics::counter!("signatures.ed25519.invalid").increment(1);
}
result.map_err(BoxError::from)
}
Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"),
}
})
}
BatchControl::Flush => {
tracing::trace!("got ed25519 flush command");
let (batch, tx) = self.take();
Box::pin(Self::flush_spawning(batch, tx).map(Ok))
}
}
}
}
impl Drop for Verifier {
fn drop(&mut self) {
// We need to flush the current batch in case there are still any pending futures.
// This returns immediately, usually before the batch is completed.
self.flush_blocking();
}
}