zebra_rpc/
server.rs
1use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tower::Service;
16use tracing::*;
17
18use zebra_chain::{
19 block, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network,
20};
21use zebra_network::AddressBookPeers;
22use zebra_node_services::mempool;
23
24use crate::{
25 config,
26 methods::{RpcImpl, RpcServer as _},
27 server::{
28 http_request_compatibility::HttpRequestMiddlewareLayer,
29 rpc_call_compatibility::FixRpcResponseMiddleware,
30 },
31};
32
33pub mod cookie;
34pub mod error;
35pub mod http_request_compatibility;
36pub mod rpc_call_compatibility;
37
38#[cfg(test)]
39mod tests;
40
41#[derive(Clone)]
43pub struct RpcServer {
44 config: config::rpc::Config,
46
47 network: Network,
49
50 build_version: String,
52
53 close_handle: ServerHandle,
55}
56
57impl fmt::Debug for RpcServer {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.debug_struct("RpcServer")
60 .field("config", &self.config)
61 .field("network", &self.network)
62 .field("build_version", &self.build_version)
63 .field(
64 "close_handle",
65 &"ServerHandle",
67 )
68 .finish()
69 }
70}
71
72pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
74
75type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
76
77impl RpcServer {
78 #[allow(clippy::too_many_arguments)]
93 pub async fn start<
94 Mempool,
95 State,
96 ReadState,
97 Tip,
98 BlockVerifierRouter,
99 SyncStatus,
100 AddressBook,
101 >(
102 rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
103 conf: config::rpc::Config,
104 ) -> Result<ServerTask, tower::BoxError>
105 where
106 Mempool: tower::Service<
107 mempool::Request,
108 Response = mempool::Response,
109 Error = zebra_node_services::BoxError,
110 > + Clone
111 + Send
112 + Sync
113 + 'static,
114 Mempool::Future: Send,
115 State: Service<
116 zebra_state::Request,
117 Response = zebra_state::Response,
118 Error = zebra_state::BoxError,
119 > + Clone
120 + Send
121 + Sync
122 + 'static,
123 State::Future: Send,
124 ReadState: Service<
125 zebra_state::ReadRequest,
126 Response = zebra_state::ReadResponse,
127 Error = zebra_state::BoxError,
128 > + Clone
129 + Send
130 + Sync
131 + 'static,
132 ReadState::Future: Send,
133 Tip: ChainTip + Clone + Send + Sync + 'static,
134 AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
135 BlockVerifierRouter: Service<
136 zebra_consensus::Request,
137 Response = block::Hash,
138 Error = zebra_consensus::BoxError,
139 > + Clone
140 + Send
141 + Sync
142 + 'static,
143 <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
144 SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
145 {
146 let listen_addr = conf
147 .listen_addr
148 .expect("caller should make sure listen_addr is set");
149
150 let http_middleware_layer = if conf.enable_cookie_auth {
151 let cookie = Cookie::default();
152 cookie::write_to_disk(&cookie, &conf.cookie_dir)
153 .expect("Zebra must be able to write the auth cookie to the disk");
154 HttpRequestMiddlewareLayer::new(Some(cookie))
155 } else {
156 HttpRequestMiddlewareLayer::new(None)
157 };
158
159 let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
160
161 let rpc_middleware = RpcServiceBuilder::new()
162 .rpc_logger(1024)
163 .layer_fn(FixRpcResponseMiddleware::new);
164
165 let server = Server::builder()
166 .http_only()
167 .set_http_middleware(http_middleware)
168 .set_rpc_middleware(rpc_middleware)
169 .build(listen_addr)
170 .await?;
171
172 info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
173
174 Ok(tokio::spawn(async move {
175 server.start(rpc.into_rpc()).stopped().await;
176 Ok(())
177 }))
178 }
179
180 pub fn shutdown_blocking(&self) {
185 Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
186 }
187
188 pub fn shutdown(&self) -> JoinHandle<()> {
191 let close_handle = self.close_handle.clone();
192 let config = self.config.clone();
193 let span = Span::current();
194
195 tokio::task::spawn_blocking(move || {
196 span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
197 })
198 }
199
200 fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
204 let span = Span::current();
207 let wait_on_shutdown = move || {
208 span.in_scope(|| {
209 if config.enable_cookie_auth {
210 if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
211 warn!(
212 ?err,
213 "unexpectedly could not remove the rpc auth cookie from the disk"
214 )
215 }
216 }
217
218 info!("Stopping RPC server");
219 let _ = close_handle.stop();
220 debug!("Stopped RPC server");
221 })
222 };
223
224 let span = Span::current();
225 let thread_handle = std::thread::spawn(wait_on_shutdown);
226
227 span.in_scope(|| match thread_handle.join() {
229 Ok(()) => (),
230 Err(panic_object) => panic::resume_unwind(panic_object),
231 })
232 }
233}
234
235impl Drop for RpcServer {
236 fn drop(&mut self) {
237 self.shutdown_blocking();
242 }
243}