zebra_rpc/indexer/
server.rs

1//! A tonic RPC server for Zebra's indexer API.
2
3use std::net::SocketAddr;
4
5use tokio::task::JoinHandle;
6use tonic::transport::{server::TcpIncoming, Server};
7use tower::BoxError;
8use zebra_chain::chain_tip::ChainTip;
9use zebra_node_services::mempool::MempoolTxSubscriber;
10
11use crate::{indexer::indexer_server::IndexerServer, server::OPENED_RPC_ENDPOINT_MSG};
12
13type ServerTask = JoinHandle<Result<(), BoxError>>;
14
15/// Indexer RPC service.
16pub struct IndexerRPC<ReadStateService, Tip>
17where
18    ReadStateService: tower::Service<
19            zebra_state::ReadRequest,
20            Response = zebra_state::ReadResponse,
21            Error = BoxError,
22        > + Clone
23        + Send
24        + Sync
25        + 'static,
26    <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
27    Tip: ChainTip + Clone + Send + Sync + 'static,
28{
29    pub(super) read_state: ReadStateService,
30    pub(super) chain_tip_change: Tip,
31    pub(super) mempool_change: MempoolTxSubscriber,
32}
33
34/// Initializes the indexer RPC server
35#[tracing::instrument(skip_all)]
36pub async fn init<ReadStateService, Tip>(
37    listen_addr: SocketAddr,
38    read_state: ReadStateService,
39    chain_tip_change: Tip,
40    mempool_change: MempoolTxSubscriber,
41) -> Result<(ServerTask, SocketAddr), BoxError>
42where
43    ReadStateService: tower::Service<
44            zebra_state::ReadRequest,
45            Response = zebra_state::ReadResponse,
46            Error = BoxError,
47        > + Clone
48        + Send
49        + Sync
50        + 'static,
51    <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
52    Tip: ChainTip + Clone + Send + Sync + 'static,
53{
54    let indexer_service = IndexerRPC {
55        read_state,
56        chain_tip_change,
57        mempool_change,
58    };
59
60    let reflection_service = tonic_reflection::server::Builder::configure()
61        .register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
62        .build_v1()
63        .unwrap();
64
65    tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);
66
67    let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
68
69    let listen_addr = tcp_listener.local_addr()?;
70    tracing::info!("{OPENED_RPC_ENDPOINT_MSG}{}", listen_addr);
71    let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?;
72
73    let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
74        Server::builder()
75            .add_service(reflection_service)
76            .add_service(IndexerServer::new(indexer_service))
77            .serve_with_incoming(incoming)
78            .await?;
79
80        Ok(())
81    });
82
83    Ok((server_task, listen_addr))
84}