zebra_chain/diagnostic/task/
thread.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//! Diagnostic types and functions for Zebra OS thread tasks:
//! - task handles
//! - errors and panics

use std::{
    panic,
    sync::Arc,
    thread::{self, JoinHandle},
};

use crate::shutdown::is_shutting_down;

use super::{CheckForPanics, WaitForPanics};

impl<T> CheckForPanics for thread::Result<T>
where
    T: std::fmt::Debug,
{
    type Output = T;

    /// # Panics
    ///
    /// - if the thread panicked.
    /// - if the thread is cancelled, `panic_on_unexpected_termination` is true, and
    ///   Zebra is not shutting down.
    ///
    /// Threads can't be cancelled except by using a panic, so there are no thread errors here.
    /// `panic_on_unexpected_termination` is
    #[track_caller]
    fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
        match self {
            // The value returned by the thread when it finished.
            Ok(thread_output) => {
                if !panic_on_unexpected_termination {
                    debug!(?thread_output, "ignoring expected thread exit");

                    thread_output
                } else if is_shutting_down() {
                    debug!(
                        ?thread_output,
                        "ignoring thread exit because Zebra is shutting down"
                    );

                    thread_output
                } else {
                    panic!("thread unexpectedly exited with: {:?}", thread_output)
                }
            }

            // A thread error is always a panic.
            Err(panic_payload) => panic::resume_unwind(panic_payload),
        }
    }
}

impl<T> WaitForPanics for JoinHandle<T>
where
    T: std::fmt::Debug,
{
    type Output = T;

    /// Waits for the thread to finish, then panics if the thread panicked.
    #[track_caller]
    fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
        self.join()
            .check_for_panics_with(panic_on_unexpected_termination)
    }
}

impl<T> WaitForPanics for Arc<JoinHandle<T>>
where
    T: std::fmt::Debug,
{
    type Output = Option<T>;

    /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread
    /// panicked. Otherwise, returns the thread's return value.
    ///
    /// If this is not the final `Arc`, drops the handle and immediately returns `None`.
    #[track_caller]
    fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
        // If we are the last Arc with a reference to this handle,
        // we can wait for it and propagate any panics.
        //
        // We use into_inner() because it guarantees that exactly one of the tasks gets the
        // JoinHandle. try_unwrap() lets us keep the JoinHandle, but it can also miss panics.
        //
        // This is more readable as an expanded statement.
        #[allow(clippy::manual_map)]
        if let Some(handle) = Arc::into_inner(self) {
            Some(handle.wait_for_panics_with(panic_on_unexpected_termination))
        } else {
            None
        }
    }
}

impl<T> CheckForPanics for &mut Option<Arc<JoinHandle<T>>>
where
    T: std::fmt::Debug,
{
    type Output = Option<T>;

    /// If this is the final `Arc`, checks if the thread has finished, then panics if the thread
    /// panicked. Otherwise, returns the thread's return value.
    ///
    /// If the thread has not finished, or this is not the final `Arc`, returns `None`.
    #[track_caller]
    fn check_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
        let handle = self.take()?;

        if handle.is_finished() {
            // This is the same as calling `self.wait_for_panics()`, but we can't do that,
            // because we've taken `self`.
            #[allow(clippy::manual_map)]
            return handle.wait_for_panics_with(panic_on_unexpected_termination);
        }

        *self = Some(handle);

        None
    }
}

impl<T> WaitForPanics for &mut Option<Arc<JoinHandle<T>>>
where
    T: std::fmt::Debug,
{
    type Output = Option<T>;

    /// If this is the final `Arc`, waits for the thread to finish, then panics if the thread
    /// panicked. Otherwise, returns the thread's return value.
    ///
    /// If this is not the final `Arc`, drops the handle and returns `None`.
    #[track_caller]
    fn wait_for_panics_with(self, panic_on_unexpected_termination: bool) -> Self::Output {
        // This is more readable as an expanded statement.
        #[allow(clippy::manual_map)]
        if let Some(output) = self
            .take()?
            .wait_for_panics_with(panic_on_unexpected_termination)
        {
            Some(output)
        } else {
            // Some other task has a reference, so we should give up ours to let them use it.
            None
        }
    }
}