zebra_rpc/
server.rs
1use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tower::Service;
16use tracing::*;
17
18use zebra_chain::{
19 block, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network,
20};
21use zebra_network::AddressBookPeers;
22use zebra_node_services::mempool;
23
24use crate::{
25 config,
26 methods::{RpcImpl, RpcServer as _},
27 server::{
28 http_request_compatibility::HttpRequestMiddlewareLayer,
29 rpc_call_compatibility::FixRpcResponseMiddleware,
30 },
31};
32
33pub mod cookie;
34pub mod error;
35pub mod http_request_compatibility;
36pub mod rpc_call_compatibility;
37
38#[cfg(test)]
39mod tests;
40
41#[derive(Clone)]
43pub struct RpcServer {
44 config: config::rpc::Config,
46
47 network: Network,
49
50 build_version: String,
52
53 close_handle: ServerHandle,
55}
56
57impl fmt::Debug for RpcServer {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct("RpcServer")
60 .field("config", &self.config)
61 .field("network", &self.network)
62 .field("build_version", &self.build_version)
63 .field(
64 "close_handle",
65 &"ServerHandle",
67 )
68 .finish()
69 }
70}
71
72pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
74
75type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
76
77impl RpcServer {
78 #[allow(clippy::too_many_arguments)]
93 pub async fn start<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
94 rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
95 conf: config::rpc::Config,
96 ) -> Result<ServerTask, tower::BoxError>
97 where
98 Mempool: tower::Service<
99 mempool::Request,
100 Response = mempool::Response,
101 Error = zebra_node_services::BoxError,
102 > + Clone
103 + Send
104 + Sync
105 + 'static,
106 Mempool::Future: Send,
107 State: Service<
108 zebra_state::ReadRequest,
109 Response = zebra_state::ReadResponse,
110 Error = zebra_state::BoxError,
111 > + Clone
112 + Send
113 + Sync
114 + 'static,
115 State::Future: Send,
116 Tip: ChainTip + Clone + Send + Sync + 'static,
117 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
118 BlockVerifierRouter: Service<
119 zebra_consensus::Request,
120 Response = block::Hash,
121 Error = zebra_consensus::BoxError,
122 > + Clone
123 + Send
124 + Sync
125 + 'static,
126 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
127 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
128 {
129 let listen_addr = conf
130 .listen_addr
131 .expect("caller should make sure listen_addr is set");
132
133 let http_middleware_layer = if conf.enable_cookie_auth {
134 let cookie = Cookie::default();
135 cookie::write_to_disk(&cookie, &conf.cookie_dir)
136 .expect("Zebra must be able to write the auth cookie to the disk");
137 HttpRequestMiddlewareLayer::new(Some(cookie))
138 } else {
139 HttpRequestMiddlewareLayer::new(None)
140 };
141
142 let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
143
144 let rpc_middleware = RpcServiceBuilder::new()
145 .rpc_logger(1024)
146 .layer_fn(FixRpcResponseMiddleware::new);
147
148 let server = Server::builder()
149 .http_only()
150 .set_http_middleware(http_middleware)
151 .set_rpc_middleware(rpc_middleware)
152 .build(listen_addr)
153 .await?;
154
155 info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
156
157 Ok(tokio::spawn(async move {
158 server.start(rpc.into_rpc()).stopped().await;
159 Ok(())
160 }))
161 }
162
163 pub fn shutdown_blocking(&self) {
168 Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
169 }
170
171 pub fn shutdown(&self) -> JoinHandle<()> {
174 let close_handle = self.close_handle.clone();
175 let config = self.config.clone();
176 let span = Span::current();
177
178 tokio::task::spawn_blocking(move || {
179 span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
180 })
181 }
182
183 fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
187 let span = Span::current();
190 let wait_on_shutdown = move || {
191 span.in_scope(|| {
192 if config.enable_cookie_auth {
193 if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
194 warn!(
195 ?err,
196 "unexpectedly could not remove the rpc auth cookie from the disk"
197 )
198 }
199 }
200
201 info!("Stopping RPC server");
202 let _ = close_handle.stop();
203 debug!("Stopped RPC server");
204 })
205 };
206
207 let span = Span::current();
208 let thread_handle = std::thread::spawn(wait_on_shutdown);
209
210 span.in_scope(|| match thread_handle.join() {
212 Ok(()) => (),
213 Err(panic_object) => panic::resume_unwind(panic_object),
214 })
215 }
216}
217
218impl Drop for RpcServer {
219 fn drop(&mut self) {
220 self.shutdown_blocking();
225 }
226}