1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
//! Type-safe column family access.

// When these types aren't exported, they become dead code.
#![cfg_attr(
    not(any(test, feature = "proptest-impl", feature = "shielded-scan")),
    allow(dead_code)
)]

use std::{
    any::type_name,
    collections::{BTreeMap, HashMap},
    fmt::Debug,
    hash::Hash,
    marker::PhantomData,
    ops::RangeBounds,
};

use crate::service::finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, WriteDisk};

use super::DiskDb;

/// A type-safe read-only column family reference.
///
/// Use this struct instead of raw [`ReadDisk`] access, because it is type-safe.
/// So you only have to define the types once, and you can't accidentally use different types for
/// reading and writing. (Which is a source of subtle database bugs.)
#[derive(Clone)]
pub struct TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
{
    /// The database.
    db: DiskDb,

    /// The column family reference in the database.
    cf: rocksdb::ColumnFamilyRef<'cf>,

    /// The column family name, only used for debugging and equality checking.
    _cf_name: String,

    /// A marker type used to bind the key and value types to the struct.
    _marker: PhantomData<(Key, Value)>,
}

/// A type-safe and drop-safe batch write to a column family.
///
/// Use this struct instead of raw [`WriteDisk`] access, because it is type-safe.
/// So you only have to define the types once, and you can't accidentally use different types for
/// reading and writing. (Which is a source of subtle database bugs.)
///
/// This type is also drop-safe: unwritten batches have to be specifically ignored.
#[must_use = "batches must be written to the database"]
#[derive(Debug, Eq, PartialEq)]
pub struct WriteTypedBatch<'cf, Key, Value, Batch>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
    Batch: WriteDisk,
{
    inner: TypedColumnFamily<'cf, Key, Value>,

    batch: Batch,
}

impl<'cf, Key, Value> Debug for TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct(&format!(
            "TypedColumnFamily<{}, {}>",
            type_name::<Key>(),
            type_name::<Value>()
        ))
        .field("db", &self.db)
        .field("cf", &self._cf_name)
        .finish()
    }
}

impl<'cf, Key, Value> PartialEq for TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
{
    fn eq(&self, other: &Self) -> bool {
        self.db == other.db && self._cf_name == other._cf_name
    }
}

impl<'cf, Key, Value> Eq for TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
{
}

impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
{
    // Creation

    /// Returns a new typed column family, if it exists in the database.
    pub fn new(db: &'cf DiskDb, cf_name: &str) -> Option<Self> {
        let cf = db.cf_handle(cf_name)?;

        Some(Self {
            db: db.clone(),
            cf,
            _cf_name: cf_name.to_string(),
            _marker: PhantomData,
        })
    }

    // Writing

    /// Returns a typed writer for this column family for a new batch.
    ///
    /// These methods are the only way to get a `WriteTypedBatch`, which ensures
    /// that the read and write types are consistent.
    pub fn new_batch_for_writing(self) -> WriteTypedBatch<'cf, Key, Value, DiskWriteBatch> {
        WriteTypedBatch {
            inner: self,
            batch: DiskWriteBatch::new(),
        }
    }

    /// Wraps an existing write batch, and returns a typed writer for this column family.
    ///
    /// These methods are the only way to get a `WriteTypedBatch`, which ensures
    /// that the read and write types are consistent.
    pub fn take_batch_for_writing(
        self,
        batch: DiskWriteBatch,
    ) -> WriteTypedBatch<'cf, Key, Value, DiskWriteBatch> {
        WriteTypedBatch { inner: self, batch }
    }

    /// Wraps an existing write batch reference, and returns a typed writer for this column family.
    ///
    /// These methods are the only way to get a `WriteTypedBatch`, which ensures
    /// that the read and write types are consistent.
    pub fn with_batch_for_writing(
        self,
        batch: &mut DiskWriteBatch,
    ) -> WriteTypedBatch<'cf, Key, Value, &mut DiskWriteBatch> {
        WriteTypedBatch { inner: self, batch }
    }

    // Reading

    /// Returns true if this rocksdb column family does not contain any entries.
    pub fn zs_is_empty(&self) -> bool {
        self.db.zs_is_empty(&self.cf)
    }

    /// Returns the value for `key` in this rocksdb column family, if present.
    pub fn zs_get(&self, key: &Key) -> Option<Value> {
        self.db.zs_get(&self.cf, key)
    }

    /// Check if this rocksdb column family contains the serialized form of `key`.
    pub fn zs_contains(&self, key: &Key) -> bool {
        self.db.zs_contains(&self.cf, key)
    }

    /// Returns the lowest key in this column family, and the corresponding value.
    ///
    /// Returns `None` if this column family is empty.
    pub fn zs_first_key_value(&self) -> Option<(Key, Value)> {
        self.db.zs_first_key_value(&self.cf)
    }

    /// Returns the highest key in this column family, and the corresponding value.
    ///
    /// Returns `None` if this column family is empty.
    pub fn zs_last_key_value(&self) -> Option<(Key, Value)> {
        self.db.zs_last_key_value(&self.cf)
    }

    /// Returns the first key greater than or equal to `lower_bound` in this column family,
    /// and the corresponding value.
    ///
    /// Returns `None` if there are no keys greater than or equal to `lower_bound`.
    pub fn zs_next_key_value_from(&self, lower_bound: &Key) -> Option<(Key, Value)> {
        self.db.zs_next_key_value_from(&self.cf, lower_bound)
    }

    /// Returns the first key strictly greater than `lower_bound` in this column family,
    /// and the corresponding value.
    ///
    /// Returns `None` if there are no keys greater than `lower_bound`.
    pub fn zs_next_key_value_strictly_after(&self, lower_bound: &Key) -> Option<(Key, Value)> {
        self.db
            .zs_next_key_value_strictly_after(&self.cf, lower_bound)
    }

    /// Returns the first key less than or equal to `upper_bound` in this column family,
    /// and the corresponding value.
    ///
    /// Returns `None` if there are no keys less than or equal to `upper_bound`.
    pub fn zs_prev_key_value_back_from(&self, upper_bound: &Key) -> Option<(Key, Value)> {
        self.db.zs_prev_key_value_back_from(&self.cf, upper_bound)
    }

    /// Returns the first key strictly less than `upper_bound` in this column family,
    /// and the corresponding value.
    ///
    /// Returns `None` if there are no keys less than `upper_bound`.
    pub fn zs_prev_key_value_strictly_before(&self, upper_bound: &Key) -> Option<(Key, Value)> {
        self.db
            .zs_prev_key_value_strictly_before(&self.cf, upper_bound)
    }

    /// Returns a forward iterator over the items in this column family in `range`.
    ///
    /// Holding this iterator open might delay block commit transactions.
    pub fn zs_forward_range_iter<Range>(
        &self,
        range: Range,
    ) -> impl Iterator<Item = (Key, Value)> + '_
    where
        Range: RangeBounds<Key>,
    {
        self.db.zs_forward_range_iter(&self.cf, range)
    }

    /// Returns a reverse iterator over the items in this column family in `range`.
    ///
    /// Holding this iterator open might delay block commit transactions.
    pub fn zs_reverse_range_iter<Range>(
        &self,
        range: Range,
    ) -> impl Iterator<Item = (Key, Value)> + '_
    where
        Range: RangeBounds<Key>,
    {
        self.db.zs_reverse_range_iter(&self.cf, range)
    }
}

impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug + Ord,
    Value: IntoDisk + FromDisk,
{
    /// Returns the keys and values in this column family in `range`, in an ordered `BTreeMap`.
    ///
    /// Holding this iterator open might delay block commit transactions.
    pub fn zs_items_in_range_ordered<Range>(&self, range: Range) -> BTreeMap<Key, Value>
    where
        Range: RangeBounds<Key>,
    {
        self.db.zs_items_in_range_ordered(&self.cf, range)
    }
}

impl<'cf, Key, Value> TypedColumnFamily<'cf, Key, Value>
where
    Key: IntoDisk + FromDisk + Debug + Hash + Eq,
    Value: IntoDisk + FromDisk,
{
    /// Returns the keys and values in this column family in `range`, in an unordered `HashMap`.
    ///
    /// Holding this iterator open might delay block commit transactions.
    pub fn zs_items_in_range_unordered<Range>(&self, range: Range) -> HashMap<Key, Value>
    where
        Range: RangeBounds<Key>,
    {
        self.db.zs_items_in_range_unordered(&self.cf, range)
    }
}

impl<'cf, Key, Value, Batch> WriteTypedBatch<'cf, Key, Value, Batch>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
    Batch: WriteDisk,
{
    // Batching before writing

    /// Serialize and insert the given key and value into this column family,
    /// overwriting any existing `value` for `key`.
    pub fn zs_insert(mut self, key: &Key, value: &Value) -> Self {
        self.batch.zs_insert(&self.inner.cf, key, value);

        self
    }

    /// Remove the given key from this column family, if it exists.
    pub fn zs_delete(mut self, key: &Key) -> Self {
        self.batch.zs_delete(&self.inner.cf, key);

        self
    }

    /// Delete the given key range from this rocksdb column family, if it exists, including `from`
    /// and excluding `until_strictly_before`.
    //.
    // TODO: convert zs_delete_range() to take std::ops::RangeBounds
    //       see zs_range_iter() for an example of the edge cases
    pub fn zs_delete_range(mut self, from: &Key, until_strictly_before: &Key) -> Self {
        self.batch
            .zs_delete_range(&self.inner.cf, from, until_strictly_before);

        self
    }
}

// Writing a batch to the database requires an owned batch.
impl<'cf, Key, Value> WriteTypedBatch<'cf, Key, Value, DiskWriteBatch>
where
    Key: IntoDisk + FromDisk + Debug,
    Value: IntoDisk + FromDisk,
{
    // Writing batches

    /// Writes this batch to this column family in the database,
    /// taking ownership and consuming it.
    pub fn write_batch(self) -> Result<(), rocksdb::Error> {
        self.inner.db.write(self.batch)
    }
}