zebra_rpc/indexer/
server.rs
1use std::net::SocketAddr;
4
5use tokio::task::JoinHandle;
6use tonic::transport::{server::TcpIncoming, Server};
7use tower::BoxError;
8use zebra_chain::chain_tip::ChainTip;
9use zebra_node_services::mempool::MempoolTxSubscriber;
10
11use crate::{indexer::indexer_server::IndexerServer, server::OPENED_RPC_ENDPOINT_MSG};
12
13type ServerTask = JoinHandle<Result<(), BoxError>>;
14
15pub struct IndexerRPC<ReadStateService, Tip>
17where
18 ReadStateService: tower::Service<
19 zebra_state::ReadRequest,
20 Response = zebra_state::ReadResponse,
21 Error = BoxError,
22 > + Clone
23 + Send
24 + Sync
25 + 'static,
26 <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
27 Tip: ChainTip + Clone + Send + Sync + 'static,
28{
29 pub(super) read_state: ReadStateService,
30 pub(super) chain_tip_change: Tip,
31 pub(super) mempool_change: MempoolTxSubscriber,
32}
33
34#[tracing::instrument(skip_all)]
36pub async fn init<ReadStateService, Tip>(
37 listen_addr: SocketAddr,
38 read_state: ReadStateService,
39 chain_tip_change: Tip,
40 mempool_change: MempoolTxSubscriber,
41) -> Result<(ServerTask, SocketAddr), BoxError>
42where
43 ReadStateService: tower::Service<
44 zebra_state::ReadRequest,
45 Response = zebra_state::ReadResponse,
46 Error = BoxError,
47 > + Clone
48 + Send
49 + Sync
50 + 'static,
51 <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
52 Tip: ChainTip + Clone + Send + Sync + 'static,
53{
54 let indexer_service = IndexerRPC {
55 read_state,
56 chain_tip_change,
57 mempool_change,
58 };
59
60 let reflection_service = tonic_reflection::server::Builder::configure()
61 .register_encoded_file_descriptor_set(crate::indexer::FILE_DESCRIPTOR_SET)
62 .build_v1()
63 .unwrap();
64
65 tracing::info!("Trying to open indexer RPC endpoint at {}...", listen_addr,);
66
67 let tcp_listener = tokio::net::TcpListener::bind(listen_addr).await?;
68
69 let listen_addr = tcp_listener.local_addr()?;
70 tracing::info!("{OPENED_RPC_ENDPOINT_MSG}{}", listen_addr);
71 let incoming = TcpIncoming::from_listener(tcp_listener, true, None)?;
72
73 let server_task: JoinHandle<Result<(), BoxError>> = tokio::spawn(async move {
74 Server::builder()
75 .add_service(reflection_service)
76 .add_service(IndexerServer::new(indexer_service))
77 .serve_with_incoming(incoming)
78 .await?;
79
80 Ok(())
81 });
82
83 Ok((server_task, listen_addr))
84}