zebra_rpc/
server.rs

1//! A JSON-RPC 1.0 & 2.0 endpoint for Zebra.
2//!
3//! This endpoint is compatible with clients that incorrectly send
4//! `"jsonrpc" = 1.0` fields in JSON-RPC 1.0 requests,
5//! such as `lightwalletd`.
6//!
7//! See the full list of
8//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
9
10use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tower::Service;
16use tracing::*;
17
18use zebra_chain::{
19    block, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network,
20};
21use zebra_network::AddressBookPeers;
22use zebra_node_services::mempool;
23
24use crate::{
25    config,
26    methods::{RpcImpl, RpcServer as _},
27    server::{
28        http_request_compatibility::HttpRequestMiddlewareLayer,
29        rpc_call_compatibility::FixRpcResponseMiddleware,
30    },
31};
32
33pub mod cookie;
34pub mod error;
35pub mod http_request_compatibility;
36pub mod rpc_call_compatibility;
37
38#[cfg(test)]
39mod tests;
40
41/// Zebra RPC Server
42#[derive(Clone)]
43pub struct RpcServer {
44    /// The RPC config.
45    config: config::rpc::Config,
46
47    /// The configured network.
48    network: Network,
49
50    /// Zebra's application version, with build metadata.
51    build_version: String,
52
53    /// A server handle used to shuts down the RPC server.
54    close_handle: ServerHandle,
55}
56
57impl fmt::Debug for RpcServer {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("RpcServer")
60            .field("config", &self.config)
61            .field("network", &self.network)
62            .field("build_version", &self.build_version)
63            .field(
64                "close_handle",
65                // TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
66                &"ServerHandle",
67            )
68            .finish()
69    }
70}
71
72/// The message to log when logging the RPC server's listen address
73pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
74
75type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
76
77impl RpcServer {
78    /// Starts the RPC server.
79    ///
80    /// `build_version` and `user_agent` are version strings for the application,
81    /// which are used in RPC responses.
82    ///
83    /// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks,
84    /// and a [`RpcServer`] handle, which can be used to shut down the RPC server task.
85    ///
86    /// # Panics
87    ///
88    /// - If [`Config::listen_addr`](config::rpc::Config::listen_addr) is `None`.
89    //
90    // TODO:
91    // - replace VersionString with semver::Version, and update the tests to provide valid versions
92    #[allow(clippy::too_many_arguments)]
93    pub async fn start<
94        Mempool,
95        State,
96        ReadState,
97        Tip,
98        BlockVerifierRouter,
99        SyncStatus,
100        AddressBook,
101    >(
102        rpc: RpcImpl<Mempool, State, ReadState, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
103        conf: config::rpc::Config,
104    ) -> Result<ServerTask, tower::BoxError>
105    where
106        Mempool: tower::Service<
107                mempool::Request,
108                Response = mempool::Response,
109                Error = zebra_node_services::BoxError,
110            > + Clone
111            + Send
112            + Sync
113            + 'static,
114        Mempool::Future: Send,
115        State: Service<
116                zebra_state::Request,
117                Response = zebra_state::Response,
118                Error = zebra_state::BoxError,
119            > + Clone
120            + Send
121            + Sync
122            + 'static,
123        State::Future: Send,
124        ReadState: Service<
125                zebra_state::ReadRequest,
126                Response = zebra_state::ReadResponse,
127                Error = zebra_state::BoxError,
128            > + Clone
129            + Send
130            + Sync
131            + 'static,
132        ReadState::Future: Send,
133        Tip: ChainTip + Clone + Send + Sync + 'static,
134        AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
135        BlockVerifierRouter: Service<
136                zebra_consensus::Request,
137                Response = block::Hash,
138                Error = zebra_consensus::BoxError,
139            > + Clone
140            + Send
141            + Sync
142            + 'static,
143        <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
144        SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
145    {
146        let listen_addr = conf
147            .listen_addr
148            .expect("caller should make sure listen_addr is set");
149
150        let http_middleware_layer = if conf.enable_cookie_auth {
151            let cookie = Cookie::default();
152            cookie::write_to_disk(&cookie, &conf.cookie_dir)
153                .expect("Zebra must be able to write the auth cookie to the disk");
154            HttpRequestMiddlewareLayer::new(Some(cookie))
155        } else {
156            HttpRequestMiddlewareLayer::new(None)
157        };
158
159        let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
160
161        let rpc_middleware = RpcServiceBuilder::new()
162            .rpc_logger(1024)
163            .layer_fn(FixRpcResponseMiddleware::new);
164
165        let server = Server::builder()
166            .http_only()
167            .set_http_middleware(http_middleware)
168            .set_rpc_middleware(rpc_middleware)
169            .build(listen_addr)
170            .await?;
171
172        info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
173
174        Ok(tokio::spawn(async move {
175            server.start(rpc.into_rpc()).stopped().await;
176            Ok(())
177        }))
178    }
179
180    /// Shut down this RPC server, blocking the current thread.
181    ///
182    /// This method can be called from within a tokio executor without panicking.
183    /// But it is blocking, so `shutdown()` should be used instead.
184    pub fn shutdown_blocking(&self) {
185        Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
186    }
187
188    /// Shut down this RPC server asynchronously.
189    /// Returns a task that completes when the server is shut down.
190    pub fn shutdown(&self) -> JoinHandle<()> {
191        let close_handle = self.close_handle.clone();
192        let config = self.config.clone();
193        let span = Span::current();
194
195        tokio::task::spawn_blocking(move || {
196            span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
197        })
198    }
199
200    /// Shuts down this RPC server using its `close_handle`.
201    ///
202    /// See `shutdown_blocking()` for details.
203    fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
204        // The server is a blocking task, so it can't run inside a tokio thread.
205        // See the note at wait_on_server.
206        let span = Span::current();
207        let wait_on_shutdown = move || {
208            span.in_scope(|| {
209                if config.enable_cookie_auth {
210                    if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
211                        warn!(
212                            ?err,
213                            "unexpectedly could not remove the rpc auth cookie from the disk"
214                        )
215                    }
216                }
217
218                info!("Stopping RPC server");
219                let _ = close_handle.stop();
220                debug!("Stopped RPC server");
221            })
222        };
223
224        let span = Span::current();
225        let thread_handle = std::thread::spawn(wait_on_shutdown);
226
227        // Propagate panics from the inner std::thread to the outer tokio blocking task
228        span.in_scope(|| match thread_handle.join() {
229            Ok(()) => (),
230            Err(panic_object) => panic::resume_unwind(panic_object),
231        })
232    }
233}
234
235impl Drop for RpcServer {
236    fn drop(&mut self) {
237        // Block on shutting down, propagating panics.
238        // This can take around 150 seconds.
239        //
240        // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
241        self.shutdown_blocking();
242    }
243}