zebra_network/
peer_cache_updater.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//! An async task that regularly updates the peer cache on disk from the current address book.

use std::{
    io,
    sync::{Arc, Mutex},
};

use chrono::Utc;
use tokio::time::sleep;

use crate::{
    constants::{DNS_LOOKUP_TIMEOUT, PEER_DISK_CACHE_UPDATE_INTERVAL},
    meta_addr::MetaAddr,
    AddressBook, BoxError, Config,
};

/// An ongoing task that regularly caches the current `address_book` to disk, based on `config`.
#[instrument(skip(config, address_book))]
pub async fn peer_cache_updater(
    config: Config,
    address_book: Arc<Mutex<AddressBook>>,
) -> Result<(), BoxError> {
    // Wait until we've queried DNS and (hopefully) sent peers to the address book.
    // Ideally we'd wait for at least one peer crawl, but that makes tests very slow.
    //
    // TODO: turn the initial sleep time into a parameter of this function,
    //       and allow it to be set in tests
    sleep(DNS_LOOKUP_TIMEOUT * 4).await;

    loop {
        // Ignore errors because updating the cache is optional.
        // Errors are already logged by the functions we're calling.
        let _ = update_peer_cache_once(&config, &address_book).await;

        sleep(PEER_DISK_CACHE_UPDATE_INTERVAL).await;
    }
}

/// Caches peers from the current `address_book` to disk, based on `config`.
pub async fn update_peer_cache_once(
    config: &Config,
    address_book: &Arc<Mutex<AddressBook>>,
) -> io::Result<()> {
    let peer_list = cacheable_peers(address_book)
        .iter()
        .map(|meta_addr| meta_addr.addr)
        .collect();

    config.update_peer_cache(peer_list).await
}

/// Returns a list of cacheable peers, blocking for as short a time as possible.
fn cacheable_peers(address_book: &Arc<Mutex<AddressBook>>) -> Vec<MetaAddr> {
    // TODO: use spawn_blocking() here, if needed to handle address book mutex load
    let now = Utc::now();

    // # Concurrency
    //
    // We return from this function immediately to make sure the address book is unlocked.
    address_book
        .lock()
        .expect("unexpected panic in previous thread while accessing the address book")
        .cacheable(now)
}