zebra_state/service/
watch_receiver.rs

1//! Shared [`tokio::sync::watch`] channel wrappers.
2//!
3//! These wrappers help use watch channels correctly.
4
5use tokio::sync::watch;
6
7/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel,
8/// while avoiding deadlocks.
9///
10/// Returns data from the most recent state,
11/// regardless of how many times you call its methods.
12///
13/// Cloned instances provide identical state data.
14///
15/// # Correctness
16///
17/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
18///
19/// # Note
20///
21/// If a lot of blocks are committed at the same time,
22/// the watch channel will skip some block updates,
23/// even though those updates were committed to the state.
24#[derive(Clone, Debug)]
25pub struct WatchReceiver<T> {
26    /// The receiver for the current state data.
27    receiver: watch::Receiver<T>,
28}
29
30impl<T> WatchReceiver<T> {
31    /// Create a new [`WatchReceiver`] from a watch channel receiver.
32    pub fn new(receiver: watch::Receiver<T>) -> Self {
33        Self { receiver }
34    }
35}
36
37impl<T> WatchReceiver<T>
38where
39    T: Clone,
40{
41    /// Maps the current data `T` to `U` by applying a function to the watched value,
42    /// while holding the receiver lock as briefly as possible.
43    ///
44    /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
45    /// extract some information from it.
46    ///
47    /// Does not mark the watched data as seen.
48    ///
49    /// # Performance
50    ///
51    /// A single read lock is acquired to clone `T`, and then released after the
52    /// clone. To make this clone efficient, large or expensive `T` can be
53    /// wrapped in an [`std::sync::Arc`]. (Or individual fields can be wrapped
54    /// in an [`std::sync::Arc`].)
55    ///
56    /// # Correctness
57    ///
58    /// To prevent deadlocks:
59    ///
60    /// - `receiver.borrow()` should not be called before this method while in the same scope.
61    ///
62    /// It is important to avoid calling `borrow` more than once in the same scope, which
63    /// effectively tries to acquire two read locks to the shared data in the watch channel. If
64    /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
65    /// starts acquiring a write-lock, and prevents further read-locks from being acquired until
66    /// the update is finished.
67    ///
68    /// What can happen in that scenario is:
69    ///
70    /// 1. The receiver manages to acquire a read-lock for the first `borrow`
71    /// 2. The sender starts acquiring the write-lock
72    /// 3. The receiver fails to acquire a read-lock for the second `borrow`
73    ///
74    /// Now both the sender and the receivers hang, because the sender won't release the lock until
75    /// it can update the value, and the receiver won't release its first read-lock until it
76    /// acquires the second read-lock and finishes what it's doing.
77    pub fn with_watch_data<U, F>(&self, f: F) -> U
78    where
79        F: FnOnce(T) -> U,
80    {
81        // Make sure that the borrow's watch channel read lock
82        // is dropped before the closure is executed.
83        //
84        // Without this change, an eager reader can repeatedly block the channel writer.
85        // This seems to happen easily in RPC & ReadStateService futures.
86        // (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.)
87        let cloned_data = self.cloned_watch_data();
88
89        f(cloned_data)
90    }
91
92    /// Returns a clone of the watch data in the channel.
93    /// Cloning the watched data helps avoid deadlocks.
94    ///
95    /// Does not mark the watched data as seen.
96    ///
97    /// See `with_watch_data()` for details.
98    pub fn cloned_watch_data(&self) -> T {
99        self.receiver.borrow().clone()
100    }
101
102    /// Calls [`watch::Receiver::changed()`] and returns the result.
103    /// Returns when the inner value has been updated, even if the old and new values are equal.
104    ///
105    /// Marks the watched data as seen.
106    pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
107        self.receiver.changed().await
108    }
109
110    /// Calls [`watch::Receiver::has_changed()`] and returns the result.
111    /// Returns `true` when the inner value has been updated, even if the old and new values are equal.
112    ///
113    /// Does not mark the watched data as seen.
114    pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
115        self.receiver.has_changed()
116    }
117
118    /// Marks the watched data as seen.
119    pub fn mark_as_seen(&mut self) {
120        self.receiver.borrow_and_update();
121    }
122
123    /// Marks the watched data as unseen.
124    /// Calls [`watch::Receiver::mark_changed()`].
125    pub fn mark_changed(&mut self) {
126        self.receiver.mark_changed();
127    }
128}