zebra_rpc/
server.rs

1//! A JSON-RPC 1.0 & 2.0 endpoint for Zebra.
2//!
3//! This endpoint is compatible with clients that incorrectly send
4//! `"jsonrpc" = 1.0` fields in JSON-RPC 1.0 requests,
5//! such as `lightwalletd`.
6//!
7//! See the full list of
8//! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0)
9
10use std::{fmt, panic};
11
12use cookie::Cookie;
13use jsonrpsee::server::{middleware::rpc::RpcServiceBuilder, Server, ServerHandle};
14use tokio::task::JoinHandle;
15use tower::Service;
16use tracing::*;
17
18use zebra_chain::{
19    block, chain_sync_status::ChainSyncStatus, chain_tip::ChainTip, parameters::Network,
20};
21use zebra_network::AddressBookPeers;
22use zebra_node_services::mempool;
23
24use crate::{
25    config,
26    methods::{RpcImpl, RpcServer as _},
27    server::{
28        http_request_compatibility::HttpRequestMiddlewareLayer,
29        rpc_call_compatibility::FixRpcResponseMiddleware,
30    },
31};
32
33pub mod cookie;
34pub mod error;
35pub mod http_request_compatibility;
36pub mod rpc_call_compatibility;
37
38#[cfg(test)]
39mod tests;
40
41/// Zebra RPC Server
42#[derive(Clone)]
43pub struct RpcServer {
44    /// The RPC config.
45    config: config::rpc::Config,
46
47    /// The configured network.
48    network: Network,
49
50    /// Zebra's application version, with build metadata.
51    build_version: String,
52
53    /// A server handle used to shuts down the RPC server.
54    close_handle: ServerHandle,
55}
56
57impl fmt::Debug for RpcServer {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_struct("RpcServer")
60            .field("config", &self.config)
61            .field("network", &self.network)
62            .field("build_version", &self.build_version)
63            .field(
64                "close_handle",
65                // TODO: when it stabilises, use std::any::type_name_of_val(&self.close_handle)
66                &"ServerHandle",
67            )
68            .finish()
69    }
70}
71
72/// The message to log when logging the RPC server's listen address
73pub const OPENED_RPC_ENDPOINT_MSG: &str = "Opened RPC endpoint at ";
74
75type ServerTask = JoinHandle<Result<(), tower::BoxError>>;
76
77impl RpcServer {
78    /// Starts the RPC server.
79    ///
80    /// `build_version` and `user_agent` are version strings for the application,
81    /// which are used in RPC responses.
82    ///
83    /// Returns [`JoinHandle`]s for the RPC server and `sendrawtransaction` queue tasks,
84    /// and a [`RpcServer`] handle, which can be used to shut down the RPC server task.
85    ///
86    /// # Panics
87    ///
88    /// - If [`Config::listen_addr`](config::rpc::Config::listen_addr) is `None`.
89    //
90    // TODO:
91    // - replace VersionString with semver::Version, and update the tests to provide valid versions
92    #[allow(clippy::too_many_arguments)]
93    pub async fn start<Mempool, State, Tip, BlockVerifierRouter, SyncStatus, AddressBook>(
94        rpc: RpcImpl<Mempool, State, Tip, AddressBook, BlockVerifierRouter, SyncStatus>,
95        conf: config::rpc::Config,
96    ) -> Result<ServerTask, tower::BoxError>
97    where
98        Mempool: tower::Service<
99                mempool::Request,
100                Response = mempool::Response,
101                Error = zebra_node_services::BoxError,
102            > + Clone
103            + Send
104            + Sync
105            + 'static,
106        Mempool::Future: Send,
107        State: Service<
108                zebra_state::ReadRequest,
109                Response = zebra_state::ReadResponse,
110                Error = zebra_state::BoxError,
111            > + Clone
112            + Send
113            + Sync
114            + 'static,
115        State::Future: Send,
116        Tip: ChainTip + Clone + Send + Sync + 'static,
117        AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
118        BlockVerifierRouter: Service<
119                zebra_consensus::Request,
120                Response = block::Hash,
121                Error = zebra_consensus::BoxError,
122            > + Clone
123            + Send
124            + Sync
125            + 'static,
126        <BlockVerifierRouter as Service<zebra_consensus::Request>>::Future: Send,
127        SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
128    {
129        let listen_addr = conf
130            .listen_addr
131            .expect("caller should make sure listen_addr is set");
132
133        let http_middleware_layer = if conf.enable_cookie_auth {
134            let cookie = Cookie::default();
135            cookie::write_to_disk(&cookie, &conf.cookie_dir)
136                .expect("Zebra must be able to write the auth cookie to the disk");
137            HttpRequestMiddlewareLayer::new(Some(cookie))
138        } else {
139            HttpRequestMiddlewareLayer::new(None)
140        };
141
142        let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);
143
144        let rpc_middleware = RpcServiceBuilder::new()
145            .rpc_logger(1024)
146            .layer_fn(FixRpcResponseMiddleware::new);
147
148        let server = Server::builder()
149            .http_only()
150            .set_http_middleware(http_middleware)
151            .set_rpc_middleware(rpc_middleware)
152            .build(listen_addr)
153            .await?;
154
155        info!("{OPENED_RPC_ENDPOINT_MSG}{}", server.local_addr()?);
156
157        Ok(tokio::spawn(async move {
158            server.start(rpc.into_rpc()).stopped().await;
159            Ok(())
160        }))
161    }
162
163    /// Shut down this RPC server, blocking the current thread.
164    ///
165    /// This method can be called from within a tokio executor without panicking.
166    /// But it is blocking, so `shutdown()` should be used instead.
167    pub fn shutdown_blocking(&self) {
168        Self::shutdown_blocking_inner(self.close_handle.clone(), self.config.clone())
169    }
170
171    /// Shut down this RPC server asynchronously.
172    /// Returns a task that completes when the server is shut down.
173    pub fn shutdown(&self) -> JoinHandle<()> {
174        let close_handle = self.close_handle.clone();
175        let config = self.config.clone();
176        let span = Span::current();
177
178        tokio::task::spawn_blocking(move || {
179            span.in_scope(|| Self::shutdown_blocking_inner(close_handle, config))
180        })
181    }
182
183    /// Shuts down this RPC server using its `close_handle`.
184    ///
185    /// See `shutdown_blocking()` for details.
186    fn shutdown_blocking_inner(close_handle: ServerHandle, config: config::rpc::Config) {
187        // The server is a blocking task, so it can't run inside a tokio thread.
188        // See the note at wait_on_server.
189        let span = Span::current();
190        let wait_on_shutdown = move || {
191            span.in_scope(|| {
192                if config.enable_cookie_auth {
193                    if let Err(err) = cookie::remove_from_disk(&config.cookie_dir) {
194                        warn!(
195                            ?err,
196                            "unexpectedly could not remove the rpc auth cookie from the disk"
197                        )
198                    }
199                }
200
201                info!("Stopping RPC server");
202                let _ = close_handle.stop();
203                debug!("Stopped RPC server");
204            })
205        };
206
207        let span = Span::current();
208        let thread_handle = std::thread::spawn(wait_on_shutdown);
209
210        // Propagate panics from the inner std::thread to the outer tokio blocking task
211        span.in_scope(|| match thread_handle.join() {
212            Ok(()) => (),
213            Err(panic_object) => panic::resume_unwind(panic_object),
214        })
215    }
216}
217
218impl Drop for RpcServer {
219    fn drop(&mut self) {
220        // Block on shutting down, propagating panics.
221        // This can take around 150 seconds.
222        //
223        // Without this shutdown, Zebra's RPC unit tests sometimes crashed with memory errors.
224        self.shutdown_blocking();
225    }
226}