zebra_rpc/indexer/
methods.rs

1//! Implements `Indexer` methods on the `IndexerRPC` type
2
3use std::pin::Pin;
4
5use futures::Stream;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::{Response, Status};
8use tower::{util::ServiceExt, BoxError};
9
10use tracing::Span;
11use zebra_chain::chain_tip::ChainTip;
12use zebra_node_services::mempool::MempoolChangeKind;
13use zebra_state::{ReadRequest, ReadResponse};
14
15use super::{
16    indexer_server::Indexer, server::IndexerRPC, BlockAndHash, BlockHashAndHeight, Empty,
17    MempoolChangeMessage,
18};
19
20/// The maximum number of messages that can be queued to be streamed to a client
21const RESPONSE_BUFFER_SIZE: usize = 4_000;
22
23#[tonic::async_trait]
24impl<ReadStateService, Tip> Indexer for IndexerRPC<ReadStateService, Tip>
25where
26    ReadStateService: tower::Service<
27            zebra_state::ReadRequest,
28            Response = zebra_state::ReadResponse,
29            Error = BoxError,
30        > + Clone
31        + Send
32        + Sync
33        + 'static,
34    <ReadStateService as tower::Service<zebra_state::ReadRequest>>::Future: Send,
35    Tip: ChainTip + Clone + Send + Sync + 'static,
36{
37    type ChainTipChangeStream =
38        Pin<Box<dyn Stream<Item = Result<BlockHashAndHeight, Status>> + Send>>;
39    type NonFinalizedStateChangeStream =
40        Pin<Box<dyn Stream<Item = Result<BlockAndHash, Status>> + Send>>;
41    type MempoolChangeStream =
42        Pin<Box<dyn Stream<Item = Result<MempoolChangeMessage, Status>> + Send>>;
43
44    async fn chain_tip_change(
45        &self,
46        _: tonic::Request<Empty>,
47    ) -> Result<Response<Self::ChainTipChangeStream>, Status> {
48        let span = Span::current();
49        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
50        let response_stream = ReceiverStream::new(response_receiver);
51        let mut chain_tip_change = self.chain_tip_change.clone();
52
53        tokio::spawn(async move {
54            // Notify the client of chain tip changes until the channel is closed
55            while let Ok(()) = chain_tip_change.best_tip_changed().await {
56                let Some((tip_height, tip_hash)) = chain_tip_change.best_tip_height_and_hash()
57                else {
58                    continue;
59                };
60
61                if let Err(error) = response_sender
62                    .send(Ok(BlockHashAndHeight::new(tip_hash, tip_height)))
63                    .await
64                {
65                    span.in_scope(|| {
66                        tracing::info!(?error, "failed to send chain tip change, dropping task");
67                    });
68                    return;
69                }
70            }
71
72            span.in_scope(|| {
73                tracing::warn!("chain_tip_change channel has closed");
74            });
75
76            let _ = response_sender
77                .send(Err(Status::unavailable(
78                    "chain_tip_change channel has closed",
79                )))
80                .await;
81        });
82
83        Ok(Response::new(Box::pin(response_stream)))
84    }
85
86    async fn non_finalized_state_change(
87        &self,
88        _: tonic::Request<Empty>,
89    ) -> Result<Response<Self::NonFinalizedStateChangeStream>, Status> {
90        let span = Span::current();
91        let read_state = self.read_state.clone();
92        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
93        let response_stream = ReceiverStream::new(response_receiver);
94
95        tokio::spawn(async move {
96            let mut non_finalized_state_change = match read_state
97                .oneshot(ReadRequest::NonFinalizedBlocksListener)
98                .await
99            {
100                Ok(ReadResponse::NonFinalizedBlocksListener(listener)) => listener.unwrap(),
101                Ok(_) => unreachable!("unexpected response type from ReadStateService"),
102                Err(error) => {
103                    span.in_scope(|| {
104                        tracing::error!(
105                            ?error,
106                            "failed to subscribe to non-finalized state changes"
107                        );
108                    });
109
110                    let _ = response_sender
111                        .send(Err(Status::unavailable(
112                            "failed to subscribe to non-finalized state changes",
113                        )))
114                        .await;
115                    return;
116                }
117            };
118
119            // Notify the client of chain tip changes until the channel is closed
120            while let Some((hash, block)) = non_finalized_state_change.recv().await {
121                if let Err(error) = response_sender
122                    .send(Ok(BlockAndHash::new(hash, block)))
123                    .await
124                {
125                    span.in_scope(|| {
126                        tracing::info!(
127                            ?error,
128                            "failed to send non-finalized state change, dropping task"
129                        );
130                    });
131                    return;
132                }
133            }
134
135            span.in_scope(|| {
136                tracing::warn!("non-finalized state change channel has closed");
137            });
138
139            let _ = response_sender
140                .send(Err(Status::unavailable(
141                    "non-finalized state change channel has closed",
142                )))
143                .await;
144        });
145
146        Ok(Response::new(Box::pin(response_stream)))
147    }
148
149    async fn mempool_change(
150        &self,
151        _: tonic::Request<Empty>,
152    ) -> Result<Response<Self::MempoolChangeStream>, Status> {
153        let span = Span::current();
154        let (response_sender, response_receiver) = tokio::sync::mpsc::channel(RESPONSE_BUFFER_SIZE);
155        let response_stream = ReceiverStream::new(response_receiver);
156        let mut mempool_change = self.mempool_change.subscribe();
157
158        tokio::spawn(async move {
159            // Notify the client of chain tip changes until the channel is closed
160            while let Ok(change) = mempool_change.recv().await {
161                for tx_id in change.tx_ids() {
162                    span.in_scope(|| {
163                        tracing::debug!("mempool change: {:?}", change);
164                    });
165
166                    if let Err(error) = response_sender
167                        .send(Ok(MempoolChangeMessage {
168                            change_type: match change.kind() {
169                                MempoolChangeKind::Added => 0,
170                                MempoolChangeKind::Invalidated => 1,
171                                MempoolChangeKind::Mined => 2,
172                            },
173                            tx_hash: tx_id.mined_id().bytes_in_display_order().to_vec(),
174                            auth_digest: tx_id
175                                .auth_digest()
176                                .map(|d| d.bytes_in_display_order().to_vec())
177                                .unwrap_or_default(),
178                        }))
179                        .await
180                    {
181                        span.in_scope(|| {
182                            tracing::info!(?error, "failed to send mempool change, dropping task");
183                        });
184                        return;
185                    }
186                }
187            }
188
189            span.in_scope(|| {
190                tracing::warn!("mempool_change channel has closed");
191            });
192
193            let _ = response_sender
194                .send(Err(Status::unavailable(
195                    "mempool_change channel has closed",
196                )))
197                .await;
198        });
199
200        Ok(Response::new(Box::pin(response_stream)))
201    }
202}