zebra_state/service/watch_receiver.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
//! Shared [`tokio::sync::watch`] channel wrappers.
//!
//! These wrappers help use watch channels correctly.
use tokio::sync::watch;
/// Efficient access to state data via a [`tokio`] [`watch::Receiver`] channel,
/// while avoiding deadlocks.
///
/// Returns data from the most recent state,
/// regardless of how many times you call its methods.
///
/// Cloned instances provide identical state data.
///
/// # Correctness
///
/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
///
/// # Note
///
/// If a lot of blocks are committed at the same time,
/// the watch channel will skip some block updates,
/// even though those updates were committed to the state.
#[derive(Clone, Debug)]
pub struct WatchReceiver<T> {
/// The receiver for the current state data.
receiver: watch::Receiver<T>,
}
impl<T> WatchReceiver<T> {
/// Create a new [`WatchReceiver`] from a watch channel receiver.
pub fn new(receiver: watch::Receiver<T>) -> Self {
Self { receiver }
}
}
impl<T> WatchReceiver<T>
where
T: Clone,
{
/// Maps the current data `T` to `U` by applying a function to the watched value,
/// while holding the receiver lock as briefly as possible.
///
/// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
/// extract some information from it.
///
/// Does not mark the watched data as seen.
///
/// # Performance
///
/// A single read lock is acquired to clone `T`, and then released after the
/// clone. To make this clone efficient, large or expensive `T` can be
/// wrapped in an [`std::sync::Arc`]. (Or individual fields can be wrapped
/// in an [`std::sync::Arc`].)
///
/// # Correctness
///
/// To prevent deadlocks:
///
/// - `receiver.borrow()` should not be called before this method while in the same scope.
///
/// It is important to avoid calling `borrow` more than once in the same scope, which
/// effectively tries to acquire two read locks to the shared data in the watch channel. If
/// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
/// starts acquiring a write-lock, and prevents further read-locks from being acquired until
/// the update is finished.
///
/// What can happen in that scenario is:
///
/// 1. The receiver manages to acquire a read-lock for the first `borrow`
/// 2. The sender starts acquiring the write-lock
/// 3. The receiver fails to acquire a read-lock for the second `borrow`
///
/// Now both the sender and the receivers hang, because the sender won't release the lock until
/// it can update the value, and the receiver won't release its first read-lock until it
/// acquires the second read-lock and finishes what it's doing.
pub fn with_watch_data<U, F>(&self, f: F) -> U
where
F: FnOnce(T) -> U,
{
// Make sure that the borrow's watch channel read lock
// is dropped before the closure is executed.
//
// Without this change, an eager reader can repeatedly block the channel writer.
// This seems to happen easily in RPC & ReadStateService futures.
// (For example, when lightwalletd syncs from Zebra, while Zebra syncs from peers.)
let cloned_data = self.cloned_watch_data();
f(cloned_data)
}
/// Returns a clone of the watch data in the channel.
/// Cloning the watched data helps avoid deadlocks.
///
/// Does not mark the watched data as seen.
///
/// See `with_watch_data()` for details.
pub fn cloned_watch_data(&self) -> T {
self.receiver.borrow().clone()
}
/// Calls [`watch::Receiver::changed()`] and returns the result.
/// Returns when the inner value has been updated, even if the old and new values are equal.
///
/// Marks the watched data as seen.
pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> {
self.receiver.changed().await
}
/// Calls [`watch::Receiver::has_changed()`] and returns the result.
/// Returns `true` when the inner value has been updated, even if the old and new values are equal.
///
/// Does not mark the watched data as seen.
pub fn has_changed(&self) -> Result<bool, watch::error::RecvError> {
self.receiver.has_changed()
}
/// Marks the watched data as seen.
pub fn mark_as_seen(&mut self) {
self.receiver.borrow_and_update();
}
/// Marks the watched data as unseen.
/// Calls [`watch::Receiver::mark_changed()`].
pub fn mark_changed(&mut self) {
self.receiver.mark_changed();
}
}