zebra_rpc/indexer/
methods.rs
1use std::pin::Pin;
4
5use futures::Stream;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Response, Status};
8use tower::{util::ServiceExt, BoxError};
9
10use tracing::Span;
11use zebra_chain::chain_tip::ChainTip;
12use zebra_node_services::mempool::MempoolChangeKind;
13use zebra_state::{ReadRequest, ReadResponse};
14
15use super::{
16 indexer_server::Indexer, server::IndexerRPC, BlockAndHash, BlockHashAndHeight, Empty,
17 MempoolChangeMessage,
18};
19
20const RESPONSE_BUFFER_SIZE: usize = 4_000;
22
23#[tonic::async_trait]
24impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
25where
26 ReadStateService: tower::Service<
27 zebra_state::ReadRequest,
28 Response = zebra_state::ReadResponse,
29 Error = BoxError,
30 > + Clone
31 + Send
32 + Sync
33 + 'static,
34 <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
35 Tip: ChainTip + Clone + Send + Sync + 'static,
36{
37 type ChainTipChangeStream =
38 Pin<Box<dyn Stream<Item = Result<BlockHashAndHeight, Status>> + Send>>;
39 type NonFinalizedStateChangeStream =
40 Pin<Box<dyn Stream<Item = Result<BlockAndHash, Status>> + Send>>;
41 type MempoolChangeStream =
42 Pin<Box<dyn Stream<Item = Result<MempoolChangeMessage, Status>> + Send>>;
43
44 async fn chain_tip_change(
45 &self,
46 _: tonic::Request<Empty>,
47 ) -> Result<Response<Self::ChainTipChangeStream>, Status> {
48 let span = Span::current();
49 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
50 let response_stream = ReceiverStream::new(response_receiver);
51 let mut chain_tip_change = self.chain_tip_change.clone();
52
53 tokio::spawn(async move {
54 while let Ok(()) = chain_tip_change.best_tip_changed().await {
56 let Some((tip_height, tip_hash)) = chain_tip_change.best_tip_height_and_hash()
57 else {
58 continue;
59 };
60
61 if let Err(error) = response_sender
62 .send(Ok(BlockHashAndHeight::new(tip_hash, tip_height)))
63 .await
64 {
65 span.in_scope(|| {
66 tracing::info!(?error, "failed to send chain tip change, dropping task");
67 });
68 return;
69 }
70 }
71
72 span.in_scope(|| {
73 tracing::warn!("chain_tip_change channel has closed");
74 });
75
76 let _ = response_sender
77 .send(Err(Status::unavailable(
78 "chain_tip_change channel has closed",
79 )))
80 .await;
81 });
82
83 Ok(Response::new(Box::pin(response_stream)))
84 }
85
86 async fn non_finalized_state_change(
87 &self,
88 _: tonic::Request<Empty>,
89 ) -> Result<Response<Self::NonFinalizedStateChangeStream>, Status> {
90 let span = Span::current();
91 let read_state = self.read_state.clone();
92 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
93 let response_stream = ReceiverStream::new(response_receiver);
94
95 tokio::spawn(async move {
96 let mut non_finalized_state_change = match read_state
97 .oneshot(ReadRequest::NonFinalizedBlocksListener)
98 .await
99 {
100 Ok(ReadResponse::NonFinalizedBlocksListener(listener)) => listener.unwrap(),
101 Ok(_) => unreachable!("unexpected response type from ReadStateService"),
102 Err(error) => {
103 span.in_scope(|| {
104 tracing::error!(
105 ?error,
106 "failed to subscribe to non-finalized state changes"
107 );
108 });
109
110 let _ = response_sender
111 .send(Err(Status::unavailable(
112 "failed to subscribe to non-finalized state changes",
113 )))
114 .await;
115 return;
116 }
117 };
118
119 while let Some((hash, block)) = non_finalized_state_change.recv().await {
121 if let Err(error) = response_sender
122 .send(Ok(BlockAndHash::new(hash, block)))
123 .await
124 {
125 span.in_scope(|| {
126 tracing::info!(
127 ?error,
128 "failed to send non-finalized state change, dropping task"
129 );
130 });
131 return;
132 }
133 }
134
135 span.in_scope(|| {
136 tracing::warn!("non-finalized state change channel has closed");
137 });
138
139 let _ = response_sender
140 .send(Err(Status::unavailable(
141 "non-finalized state change channel has closed",
142 )))
143 .await;
144 });
145
146 Ok(Response::new(Box::pin(response_stream)))
147 }
148
149 async fn mempool_change(
150 &self,
151 _: tonic::Request<Empty>,
152 ) -> Result<Response<Self::MempoolChangeStream>, Status> {
153 let span = Span::current();
154 let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
155 let response_stream = ReceiverStream::new(response_receiver);
156 let mut mempool_change = self.mempool_change.subscribe();
157
158 tokio::spawn(async move {
159 while let Ok(change) = mempool_change.recv().await {
161 for tx_id in change.tx_ids() {
162 span.in_scope(|| {
163 tracing::debug!("mempool change: {:?}", change);
164 });
165
166 if let Err(error) = response_sender
167 .send(Ok(MempoolChangeMessage {
168 change_type: match change.kind() {
169 MempoolChangeKind::Added => 0,
170 MempoolChangeKind::Invalidated => 1,
171 MempoolChangeKind::Mined => 2,
172 },
173 tx_hash: tx_id.mined_id().bytes_in_display_order().to_vec(),
174 auth_digest: tx_id
175 .auth_digest()
176 .map(|d| d.bytes_in_display_order().to_vec())
177 .unwrap_or_default(),
178 }))
179 .await
180 {
181 span.in_scope(|| {
182 tracing::info!(?error, "failed to send mempool change, dropping task");
183 });
184 return;
185 }
186 }
187 }
188
189 span.in_scope(|| {
190 tracing::warn!("mempool_change channel has closed");
191 });
192
193 let _ = response_sender
194 .send(Err(Status::unavailable(
195 "mempool_change channel has closed",
196 )))
197 .await;
198 });
199
200 Ok(Response::new(Box::pin(response_stream)))
201 }
202}