zebra_state/service/watch_receiver.rs
1//! Shared [`tokio::sync::watch`] channel wrappers.
2//!
3//! These wrappers help use watch channels correctly.
4
5use tokio::sync::watch;
6
7/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel,
8/// while avoiding deadlocks.
9///
10/// Returns data from the most recent state,
11/// regardless of how many times you call its methods.
12///
13/// Cloned instances provide identical state data.
14///
15/// # Correctness
16///
17/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
18///
19/// # Note
20///
21/// If a lot of blocks are committed at the same time,
22/// the watch channel will skip some block updates,
23/// even though those updates were committed to the state.
24#[derive(Clone, Debug)]
25pub struct WatchReceiver<T> {
26 /// The receiver for the current state data.
27 receiver: watch::Receiver<T>,
28}
29
30impl<T> WatchReceiver<T> {
31 /// Create a new [`WatchReceiver`] from a watch channel receiver.
32 pub fn new(receiver: watch::Receiver<T>) -> Self {
33 Self { receiver }
34 }
35}
36
37impl<T> WatchReceiver<T>
38where
39 T: Clone,
40{
41 /// Maps the current data `T` to `U` by applying a function to the watched value,
42 /// while holding the receiver lock as briefly as possible.
43 ///
44 /// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
45 /// extract some information from it.
46 ///
47 /// Does not mark the watched data as seen.
48 ///
49 /// # Performance
50 ///
51 /// A single read lock is acquired to clone `T`, and then released after the
52 /// clone. To make this clone efficient, large or expensive `T` can be
53 /// wrapped in an [`std::sync::Arc`]. (Or individual fields can be wrapped
54 /// in an [`std::sync::Arc`].)
55 ///
56 /// # Correctness
57 ///
58 /// To prevent deadlocks:
59 ///
60 /// - `receiver.borrow()` should not be called before this method while in the same scope.
61 ///
62 /// It is important to avoid calling `borrow` more than once in the same scope, which
63 /// effectively tries to acquire two read locks to the shared data in the watch channel. If
64 /// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
65 /// starts acquiring a write-lock, and prevents further read-locks from being acquired until
66 /// the update is finished.
67 ///
68 /// What can happen in that scenario is:
69 ///
70 /// 1. The receiver manages to acquire a read-lock for the first `borrow`
71 /// 2. The sender starts acquiring the write-lock
72 /// 3. The receiver fails to acquire a read-lock for the second `borrow`
73 ///
74 /// Now both the sender and the receivers hang, because the sender won't release the lock until
75 /// it can update the value, and the receiver won't release its first read-lock until it
76 /// acquires the second read-lock and finishes what it's doing.
77 pub fn with_watch_data<U, F>(&self, f: F) -> U
78 where
79 F: FnOnce(T) -> U,
80 {
81 // Make sure that the borrow's watch channel read lock
82 // is dropped before the closure is executed.
83 //
84 // Without this change, an eager reader can repeatedly block the channel writer.
85 // This seems to happen easily in RPC & ReadStateService futures.
86 // (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.)
87 let cloned_data = self.cloned_watch_data();
88
89 f(cloned_data)
90 }
91
92 /// Returns a clone of the watch data in the channel.
93 /// Cloning the watched data helps avoid deadlocks.
94 ///
95 /// Does not mark the watched data as seen.
96 ///
97 /// See `with_watch_data()` for details.
98 pub fn cloned_watch_data(&self) -> T {
99 self.receiver.borrow().clone()
100 }
101
102 /// Calls [`watch::Receiver::changed()`] and returns the result.
103 /// Returns when the inner value has been updated, even if the old and new values are equal.
104 ///
105 /// Marks the watched data as seen.
106 pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
107 self.receiver.changed().await
108 }
109
110 /// Calls [`watch::Receiver::has_changed()`] and returns the result.
111 /// Returns `true` when the inner value has been updated, even if the old and new values are equal.
112 ///
113 /// Does not mark the watched data as seen.
114 pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
115 self.receiver.has_changed()
116 }
117
118 /// Marks the watched data as seen.
119 pub fn mark_as_seen(&mut self) {
120 self.receiver.borrow_and_update();
121 }
122
123 /// Marks the watched data as unseen.
124 /// Calls [`watch::Receiver::mark_changed()`].
125 pub fn mark_changed(&mut self) {
126 self.receiver.mark_changed();
127 }
128}