use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use chrono::Utc;
use futures::stream::FuturesUnordered;
use futures_util::FutureExt;
use thiserror::Error;
use tower::{Service, ServiceExt};
use tracing::Instrument;
use zebra_chain::{
amount::Amount,
block,
parameters::{subsidy::FundingStreamReceiver, Network},
transaction, transparent,
work::equihash,
};
use zebra_state as zs;
use crate::{error::*, transaction as tx, BoxError};
pub mod check;
pub mod request;
pub mod subsidy;
pub use request::Request;
#[cfg(test)]
mod tests;
#[derive(Debug)]
pub struct SemanticBlockVerifier<S, V> {
network: Network,
state_service: S,
transaction_verifier: V,
}
#[non_exhaustive]
#[allow(missing_docs)]
#[derive(Debug, Error)]
pub enum VerifyBlockError {
#[error("unable to verify depth for block {hash} from chain state during block verification")]
Depth { source: BoxError, hash: block::Hash },
#[error(transparent)]
Block {
#[from]
source: BlockError,
},
#[error(transparent)]
Equihash {
#[from]
source: equihash::Error,
},
#[error(transparent)]
Time(zebra_chain::block::BlockTimeError),
#[error("unable to commit block after semantic verification: {0}")]
Commit(#[source] BoxError),
#[cfg(feature = "getblocktemplate-rpcs")]
#[error("unable to validate block proposal: failed semantic verification (proof of work is not checked for proposals): {0}")]
ValidateProposal(#[source] BoxError),
#[error("invalid transaction: {0}")]
Transaction(#[from] TransactionError),
#[error("invalid block subsidy: {0}")]
Subsidy(#[from] SubsidyError),
}
impl VerifyBlockError {
pub fn is_duplicate_request(&self) -> bool {
match self {
VerifyBlockError::Block { source, .. } => source.is_duplicate_request(),
_ => false,
}
}
}
pub const MAX_BLOCK_SIGOPS: u64 = 20_000;
impl<S, V> SemanticBlockVerifier<S, V>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
V: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
V::Future: Send + 'static,
{
pub fn new(network: &Network, state_service: S, transaction_verifier: V) -> Self {
Self {
network: network.clone(),
state_service,
transaction_verifier,
}
}
}
impl<S, V> Service<Request> for SemanticBlockVerifier<S, V>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
V: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
V::Future: Send + 'static,
{
type Response = block::Hash;
type Error = VerifyBlockError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
let mut state_service = self.state_service.clone();
let mut transaction_verifier = self.transaction_verifier.clone();
let network = self.network.clone();
let block = request.block();
let span = tracing::debug_span!("block", height = ?block.coinbase_height());
async move {
let hash = block.hash();
tracing::trace!("checking that block is not already in state");
match state_service
.ready()
.await
.map_err(|source| VerifyBlockError::Depth { source, hash })?
.call(zs::Request::KnownBlock(hash))
.await
.map_err(|source| VerifyBlockError::Depth { source, hash })?
{
zs::Response::KnownBlock(Some(location)) => {
return Err(BlockError::AlreadyInChain(hash, location).into())
}
zs::Response::KnownBlock(None) => {}
_ => unreachable!("wrong response to Request::KnownBlock"),
}
tracing::trace!("performing block checks");
let height = block
.coinbase_height()
.ok_or(BlockError::MissingHeight(hash))?;
if height > block::Height::MAX {
Err(BlockError::MaxHeight(height, hash, block::Height::MAX))?;
}
if request.is_proposal() || network.disable_pow() {
check::difficulty_threshold_is_valid(&block.header, &network, &height, &hash)?;
} else {
check::difficulty_is_valid(&block.header, &network, &height, &hash)?;
check::equihash_solution_is_valid(&block.header)?;
}
let transaction_hashes: Arc<[_]> =
block.transactions.iter().map(|t| t.hash()).collect();
check::merkle_root_validity(&network, &block, &transaction_hashes)?;
let now = Utc::now();
check::time_is_valid_at(&block.header, now, &height, &hash)
.map_err(VerifyBlockError::Time)?;
let coinbase_tx = check::coinbase_is_first(&block)?;
let expected_block_subsidy = subsidy::general::block_subsidy(height, &network)?;
check::subsidy_is_valid(&block, &network, expected_block_subsidy)?;
tx::check::coinbase_outputs_are_decryptable(&coinbase_tx, &network, height)?;
let mut async_checks = FuturesUnordered::new();
let known_utxos = Arc::new(transparent::new_ordered_outputs(
&block,
&transaction_hashes,
));
let known_outpoint_hashes: Arc<HashSet<transaction::Hash>> =
Arc::new(known_utxos.keys().map(|outpoint| outpoint.hash).collect());
for (&transaction_hash, transaction) in
transaction_hashes.iter().zip(block.transactions.iter())
{
let rsp = transaction_verifier
.ready()
.await
.expect("transaction verifier is always ready")
.call(tx::Request::Block {
transaction_hash,
transaction: transaction.clone(),
known_outpoint_hashes: known_outpoint_hashes.clone(),
known_utxos: known_utxos.clone(),
height,
time: block.header.time,
});
async_checks.push(rsp);
}
tracing::trace!(len = async_checks.len(), "built async tx checks");
let mut legacy_sigop_count = 0;
let mut block_miner_fees = Ok(Amount::zero());
use futures::StreamExt;
while let Some(result) = async_checks.next().await {
tracing::trace!(?result, remaining = async_checks.len());
let response = result
.map_err(Into::into)
.map_err(VerifyBlockError::Transaction)?;
assert!(
matches!(response, tx::Response::Block { .. }),
"unexpected response from transaction verifier: {response:?}"
);
legacy_sigop_count += response.legacy_sigop_count();
if let Some(miner_fee) = response.miner_fee() {
block_miner_fees += miner_fee;
}
}
if legacy_sigop_count > MAX_BLOCK_SIGOPS {
Err(BlockError::TooManyTransparentSignatureOperations {
height,
hash,
legacy_sigop_count,
})?;
}
let expected_deferred_amount = subsidy::funding_streams::funding_stream_values(
height,
&network,
expected_block_subsidy,
)
.expect("we always expect a funding stream hashmap response even if empty")
.remove(&FundingStreamReceiver::Deferred)
.unwrap_or_default();
let block_miner_fees =
block_miner_fees.map_err(|amount_error| BlockError::SummingMinerFees {
height,
hash,
source: amount_error,
})?;
check::miner_fees_are_valid(
&coinbase_tx,
height,
block_miner_fees,
expected_block_subsidy,
expected_deferred_amount,
&network,
)?;
let new_outputs = Arc::into_inner(known_utxos)
.expect("all verification tasks using known_utxos are complete");
let prepared_block = zs::SemanticallyVerifiedBlock {
block,
hash,
height,
new_outputs,
transaction_hashes,
deferred_balance: Some(expected_deferred_amount),
};
#[cfg(feature = "getblocktemplate-rpcs")]
if request.is_proposal() {
return match state_service
.ready()
.await
.map_err(VerifyBlockError::ValidateProposal)?
.call(zs::Request::CheckBlockProposalValidity(prepared_block))
.await
.map_err(VerifyBlockError::ValidateProposal)?
{
zs::Response::ValidBlockProposal => Ok(hash),
_ => unreachable!("wrong response for CheckBlockProposalValidity"),
};
}
match state_service
.ready()
.await
.map_err(VerifyBlockError::Commit)?
.call(zs::Request::CommitSemanticallyVerifiedBlock(prepared_block))
.await
.map_err(VerifyBlockError::Commit)?
{
zs::Response::Committed(committed_hash) => {
assert_eq!(committed_hash, hash, "state must commit correct hash");
Ok(hash)
}
_ => unreachable!("wrong response for CommitSemanticallyVerifiedBlock"),
}
}
.instrument(span)
.boxed()
}
}