zebra_rpc/indexer/
server.rs

1//! A tonic RPC server for Zebra's indexer API.
2
3use std::net::SocketAddr;
4
5use tokio::task::JoinHandle;
6use tonic::transport::{server::TcpIncoming, Server};
7use tower::BoxError;
8use zebra_chain::chain_tip::ChainTip;
9
10use super::indexer_server::IndexerServer;
11
12type ServerTask = JoinHandle<Result<(), BoxError>>;
13
14/// Indexer RPC service.
15pub struct IndexerRPC<ReadStateService, Tip>
16where
17    ReadStateService: tower::Service<
18            zebra_state::ReadRequest,
19            Response = zebra_state::ReadResponse,
20            Error = BoxError,
21        > + Clone
22        + Send
23        + Sync
24        + 'static,
25    <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
26    Tip: ChainTip + Clone + Send + Sync + 'static,
27{
28    _read_state: ReadStateService,
29    pub(super) chain_tip_change: Tip,
30}
31
32/// Initializes the indexer RPC server
33pub async fn init<ReadStateService, Tip>(
34    listen_addr: SocketAddr,
35    _read_state: ReadStateService,
36    chain_tip_change: Tip,
37) -> Result<(ServerTask, SocketAddr), BoxError>
38where
39    ReadStateService: tower::Service<
40            zebra_state::ReadRequest,
41            Response = zebra_state::ReadResponse,
42            Error = BoxError,
43        > + Clone
44        + Send
45        + Sync
46        + 'static,
47    <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
48    Tip: ChainTip + Clone + Send + Sync + 'static,
49{
50    let indexer_service = IndexerRPC {
51        _read_state,
52        chain_tip_change,
53    };
54
55    let reflection_service = tonic_reflection::server::Builder::configure()
56        .register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
57        .build_v1()
58        .unwrap();
59
60    tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);
61
62    let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
63    let listen_addr = tcp_listener.local_addr()?;
64    let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?;
65
66    let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
67        Server::builder()
68            .add_service(reflection_service)
69            .add_service(IndexerServer::new(indexer_service))
70            .serve_with_incoming(incoming)
71            .await?;
72
73        Ok(())
74    });
75
76    Ok((server_task, listen_addr))
77}