1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
//! Abstractions that represent "the rest of the network".
//!
//! # Implementation
//!
//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
//!
//! As described in Tower's documentation, it:
//!
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
//! >
//! > As described in the [Finagle Guide][finagle]:
//! >
//! > > The algorithm randomly picks two services from the set of ready endpoints and
//! > > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > > expect a manageable upper bound on the maximum load of any server.
//! > >
//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > > `n` is the number of servers in the cluster.
//!
//! The Power of Two Choices should work well for many network requests, but not all of them.
//! Some requests should only be made to a subset of connected peers.
//! For example, a request for a particular inventory item
//! should be made to a peer that has recently advertised that inventory hash.
//! Other requests require broadcasts, such as transaction diffusion.
//!
//! Implementing this specialized routing logic inside the `PeerSet` -- so that
//! it continues to abstract away "the rest of the network" into one endpoint --
//! is not a problem, as the `PeerSet` can simply maintain more information on
//! its peers and route requests appropriately. However, there is a problem with
//! maintaining accurate backpressure information, because the `Service` trait
//! requires that service readiness is independent of the data in the request.
//!
//! For this reason, in the future, this code will probably be refactored to
//! address this backpressure mismatch. One possibility is to refactor the code
//! so that one entity holds and maintains the peer set and metadata on the
//! peers, and each "backpressure category" of request is assigned to different
//! `Service` impls with specialized `poll_ready()` implementations. Another
//! less-elegant solution (which might be useful as an intermediate step for the
//! inventory case) is to provide a way to borrow a particular backing service,
//! say by address.
//!
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
//! [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
//!
//! # Behavior During Network Upgrades
//!
//! [ZIP-201] specifies peer behavior during network upgrades:
//!
//! > With scheduled network upgrades, at the activation height, nodes on each consensus branch
//! > should disconnect from nodes on other consensus branches and only accept new incoming
//! > connections from nodes on the same consensus branch.
//!
//! Zebra handles this with the help of [`MinimumPeerVersion`], which determines the minimum peer
//! protocol version to accept based on the current best chain tip height. The minimum version is
//! therefore automatically increased when the block height reaches a network upgrade's activation
//! height. The helper type is then used to:
//!
//! - cancel handshakes to outdated peers, in `handshake::negotiate_version`
//! - cancel requests to and disconnect from peers that have become outdated, in
//!   [`PeerSet::push_unready`]
//! - disconnect from peers that have just responded and became outdated, in
//!   [`PeerSet::poll_unready`]
//! - disconnect from idle peers that have become outdated, in
//!   [`PeerSet::disconnect_from_outdated_peers`]
//!
//! ## Network Coalescence
//!
//! [ZIP-201] also specifies how Zcashd behaves [leading up to a activation
//! height][1]. Since Zcashd limits the number of connections to at most eight
//! peers, it will gradually migrate its connections to up-to-date peers as it
//! approaches the activation height.
//!
//! The motivation for this behavior is to avoid an abrupt partitioning the network, which can lead
//! to isolated peers and increases the chance of an eclipse attack on some peers of the network.
//!
//! Zebra does not gradually migrate its peers as it approaches an activation height. This is
//! because Zebra by default can connect to up to 75 peers, as can be seen in [`Config::default`].
//! Since this is a lot larger than the 8 peers Zcashd connects to, an eclipse attack becomes a lot
//! more costly to execute, and the probability of an abrupt network partition that isolates peers
//! is lower.
//!
//! Even if a Zebra node is manually configured to connect to a smaller number
//! of peers, the [`AddressBook`][2] is configured to hold a large number of
//! peer addresses ([`MAX_ADDRS_IN_ADDRESS_BOOK`][3]). Since the address book
//! prioritizes addresses it trusts (like those that it has successfully
//! connected to before), the node should be able to recover and rejoin the
//! network by itself, as long as the address book is populated with enough
//! entries.
//!
//! [1]: https://zips.z.cash/zip-0201#network-coalescence
//! [2]: crate::AddressBook
//! [3]: crate::constants::MAX_ADDRS_IN_ADDRESS_BOOK
//! [ZIP-201]: https://zips.z.cash/zip-0201

use std::{
    collections::{HashMap, HashSet},
    convert,
    fmt::Debug,
    marker::PhantomData,
    net::IpAddr,
    pin::Pin,
    task::{Context, Poll},
    time::Instant,
};

use futures::{
    channel::{mpsc, oneshot},
    future::{FutureExt, TryFutureExt},
    prelude::*,
    stream::FuturesUnordered,
    task::noop_waker,
};
use itertools::Itertools;
use num_integer::div_ceil;
use tokio::{
    sync::{broadcast, watch},
    task::JoinHandle,
};
use tower::{
    discover::{Change, Discover},
    load::Load,
    Service,
};

use zebra_chain::chain_tip::ChainTip;

use crate::{
    address_book::AddressMetrics,
    constants::MIN_PEER_SET_LOG_INTERVAL,
    peer::{LoadTrackedClient, MinimumPeerVersion},
    peer_set::{
        unready_service::{Error as UnreadyError, UnreadyService},
        InventoryChange, InventoryRegistry,
    },
    protocol::{
        external::InventoryHash,
        internal::{Request, Response},
    },
    BoxError, Config, PeerError, PeerSocketAddr, SharedPeerError,
};

#[cfg(test)]
mod tests;

/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
///
/// In response to this signal, the crawler tries to open more peer connections.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct MorePeers;

/// A signal sent by the [`PeerSet`] to cancel a [`Client`][1]'s current request
/// or response.
///
/// When it receives this signal, the [`Client`][1] stops processing and exits.
///
/// [1]: crate::peer::Client
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct CancelClientWork;

/// A [`tower::Service`] that abstractly represents "the rest of the network".
///
/// # Security
///
/// The `Discover::Key` must be the transient remote address of each peer. This
/// address may only be valid for the duration of a single connection. (For
/// example, inbound connections have an ephemeral remote port, and proxy
/// connections have an ephemeral local or proxy port.)
///
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
pub struct PeerSet<D, C>
where
    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
    D::Error: Into<BoxError>,
    C: ChainTip,
{
    // Peer Tracking: New Peers
    //
    /// Provides new and deleted peer [`Change`]s to the peer set,
    /// via the [`Discover`] trait implementation.
    discover: D,

    /// A channel that asks the peer crawler task to connect to more peers.
    demand_signal: mpsc::Sender<MorePeers>,

    // Peer Tracking: Ready Peers
    //
    /// Connected peers that are ready to receive requests from Zebra,
    /// or send requests to Zebra.
    ready_services: HashMap<D::Key, D::Service>,

    // Request Routing
    //
    /// Stores gossiped inventory hashes from connected peers.
    ///
    /// Used to route inventory requests to peers that are likely to have it.
    inventory_registry: InventoryRegistry,

    // Peer Tracking: Busy Peers
    //
    /// Connected peers that are handling a Zebra request,
    /// or Zebra is handling one of their requests.
    unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,

    /// Channels used to cancel the request that an unready service is doing.
    cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,

    // Peer Validation
    //
    /// An endpoint to see the minimum peer protocol version in real time.
    ///
    /// The minimum version depends on the block height, and [`MinimumPeerVersion`] listens for
    /// height changes and determines the correct minimum version.
    minimum_peer_version: MinimumPeerVersion<C>,

    /// The configured limit for inbound and outbound connections.
    ///
    /// The peer set panics if this size is exceeded.
    /// If that happens, our connection limit code has a bug.
    peerset_total_connection_limit: usize,

    // Background Tasks
    //
    /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
    ///
    /// The join handles passed into the PeerSet are used populate the `guards` member
    handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,

    /// Unordered set of handles to background tasks associated with the `PeerSet`
    ///
    /// These guards are checked for errors as part of `poll_ready` which lets
    /// the `PeerSet` propagate errors from background tasks back to the user
    guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,

    // Metrics and Logging
    //
    /// Address book metrics watch channel.
    ///
    /// Used for logging diagnostics.
    address_metrics: watch::Receiver<AddressMetrics>,

    /// The last time we logged a message about the peer set size
    last_peer_log: Option<Instant>,

    /// The configured maximum number of peers that can be in the
    /// peer set per IP, defaults to [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`]
    max_conns_per_ip: usize,
}

impl<D, C> Drop for PeerSet<D, C>
where
    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
    D::Error: Into<BoxError>,
    C: ChainTip,
{
    fn drop(&mut self) {
        // We don't have access to the current task (if any), so we just drop everything we can.
        let waker = noop_waker();
        let mut cx = Context::from_waker(&waker);

        self.shut_down_tasks_and_channels(&mut cx);
    }
}

impl<D, C> PeerSet<D, C>
where
    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
    D::Error: Into<BoxError>,
    C: ChainTip,
{
    #[allow(clippy::too_many_arguments)]
    /// Construct a peerset which uses `discover` to manage peer connections.
    ///
    /// Arguments:
    /// - `config`: configures the peer set connection limit;
    /// - `discover`: handles peer connects and disconnects;
    /// - `demand_signal`: requests more peers when all peers are busy (unready);
    /// - `handle_rx`: receives background task handles,
    ///                monitors them to make sure they're still running,
    ///                and shuts down all the tasks as soon as one task exits;
    /// - `inv_stream`: receives inventory changes from peers,
    ///                 allowing the peer set to direct inventory requests;
    /// - `address_book`: when peer set is busy, it logs address book diagnostics.
    /// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
    /// - `max_conns_per_ip`: configured maximum number of peers that can be in the
    ///                       peer set per IP, defaults to the config value or to
    ///                       [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`].
    pub fn new(
        config: &Config,
        discover: D,
        demand_signal: mpsc::Sender<MorePeers>,
        handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
        inv_stream: broadcast::Receiver<InventoryChange>,
        address_metrics: watch::Receiver<AddressMetrics>,
        minimum_peer_version: MinimumPeerVersion<C>,
        max_conns_per_ip: Option<usize>,
    ) -> Self {
        Self {
            // New peers
            discover,
            demand_signal,

            // Ready peers
            ready_services: HashMap::new(),
            // Request Routing
            inventory_registry: InventoryRegistry::new(inv_stream),

            // Busy peers
            unready_services: FuturesUnordered::new(),
            cancel_handles: HashMap::new(),

            // Peer validation
            minimum_peer_version,
            peerset_total_connection_limit: config.peerset_total_connection_limit(),

            // Background tasks
            handle_rx,
            guards: futures::stream::FuturesUnordered::new(),

            // Metrics
            last_peer_log: None,
            address_metrics,

            max_conns_per_ip: max_conns_per_ip.unwrap_or(config.max_connections_per_ip),
        }
    }

    /// Check background task handles to make sure they're still running.
    ///
    /// Never returns `Ok`.
    ///
    /// If any background task exits, shuts down all other background tasks,
    /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for
    /// receiving the background tasks, or the background tasks exiting.
    fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
        futures::ready!(self.receive_tasks_if_needed(cx))?;

        // Return Pending if all background tasks are still running.
        match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) {
            Some(res) => {
                info!(
                    background_tasks = %self.guards.len(),
                    "a peer set background task exited, shutting down other peer set tasks"
                );

                self.shut_down_tasks_and_channels(cx);

                // Flatten the join result and inner result, and return any errors.
                res.map_err(Into::into)
                    // TODO: replace with Result::flatten when it stabilises (#70142)
                    .and_then(convert::identity)?;

                // Turn Ok() task exits into errors.
                Poll::Ready(Err("a peer set background task exited".into()))
            }

            None => {
                self.shut_down_tasks_and_channels(cx);
                Poll::Ready(Err("all peer set background tasks have exited".into()))
            }
        }
    }

    /// Receive background tasks, if they've been sent on the channel, but not consumed yet.
    ///
    /// Returns a result representing the current task state, or `Poll::Pending` if the background
    /// tasks should be polled again to check their state.
    fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
        if self.guards.is_empty() {
            // Return Pending if the tasks have not been sent yet.
            let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx));

            match handles {
                // The tasks have been sent, but not consumed yet.
                Ok(handles) => {
                    // Currently, the peer set treats an empty background task set as an error.
                    //
                    // TODO: refactor `handle_rx` and `guards` into an enum
                    //       for the background task state: Waiting/Running/Shutdown.
                    assert!(
                        !handles.is_empty(),
                        "the peer set requires at least one background task"
                    );

                    self.guards.extend(handles);

                    Poll::Ready(Ok(()))
                }

                // The sender was dropped without sending the tasks.
                Err(_) => Poll::Ready(Err(
                    "sender did not send peer background tasks before it was dropped".into(),
                )),
            }
        } else {
            Poll::Ready(Ok(()))
        }
    }

    /// Shut down:
    /// - services by dropping the service lists
    /// - background tasks via their join handles or cancel handles
    /// - channels by closing the channel
    fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) {
        // Drop services and cancel their background tasks.
        self.ready_services = HashMap::new();

        for (_peer_key, handle) in self.cancel_handles.drain() {
            let _ = handle.send(CancelClientWork);
        }
        self.unready_services = FuturesUnordered::new();

        // Close the MorePeers channel for all senders,
        // so we don't add more peers to a shut down peer set.
        self.demand_signal.close_channel();

        // Shut down background tasks, ignoring pending polls.
        self.handle_rx.close();
        let _ = self.receive_tasks_if_needed(cx);
        for guard in self.guards.iter() {
            guard.abort();
        }
    }

    /// Check busy peer services for request completion or errors.
    ///
    /// Move newly ready services to the ready list if they are for peers with supported protocol
    /// versions, otherwise they are dropped. Also drop failed services.
    ///
    /// Never returns an error.
    ///
    /// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are
    /// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty.
    ///
    /// If there are any remaining unready peers, registers a wakeup for the next time one becomes
    /// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come
    /// from peers, there needs to be at least one peer to register a wakeup.)
    fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, BoxError>> {
        let mut result = Poll::Pending;

        // # Correctness
        //
        // `poll_next()` must always be called, because `self.unready_services` could have been
        // empty before the call to `self.poll_ready()`.
        //
        // > When new futures are added, `poll_next` must be called in order to begin receiving
        // > wake-ups for new futures.
        //
        // <https://docs.rs/futures/latest/futures/stream/futures_unordered/struct.FuturesUnordered.html>
        //
        // Returns Pending if we've finished processing the unready service changes,
        // but there are still some unready services.
        loop {
            // No ready peers left, but there are some unready peers pending.
            let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else {
                break;
            };

            match ready_peer {
                // No unready peers in the list.
                None => {
                    // If we've finished processing the unready service changes, and there are no
                    // unready services left, it doesn't make sense to return Pending, because
                    // their stream is terminated. But when we add more unready peers and call
                    // `poll_next()`, its termination status will be reset, and it will receive
                    // wakeups again.
                    if result.is_pending() {
                        result = Poll::Ready(Ok(None));
                    }

                    break;
                }

                // Unready -> Ready
                Some(Ok((key, svc))) => {
                    trace!(?key, "service became ready");

                    self.push_ready(true, key, svc);

                    // Return Ok if at least one peer became ready.
                    result = Poll::Ready(Ok(Some(())));
                }

                // Unready -> Canceled
                Some(Err((key, UnreadyError::Canceled))) => {
                    // A service be canceled because we've connected to the same service twice.
                    // In that case, there is a cancel handle for the peer address,
                    // but it belongs to the service for the newer connection.
                    trace!(
                        ?key,
                        duplicate_connection = self.cancel_handles.contains_key(&key),
                        "service was canceled, dropping service"
                    );
                }
                Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => {
                    // Similarly, services with dropped cancel handes can have duplicates.
                    trace!(
                        ?key,
                        duplicate_connection = self.cancel_handles.contains_key(&key),
                        "cancel handle was dropped, dropping service"
                    );
                }

                // Unready -> Errored
                Some(Err((key, UnreadyError::Inner(error)))) => {
                    debug!(%error, "service failed while unready, dropping service");

                    let cancel = self.cancel_handles.remove(&key);
                    assert!(cancel.is_some(), "missing cancel handle");
                }
            }
        }

        result
    }

    /// Checks previously ready peer services for errors.
    ///
    /// The only way these peer `Client`s can become unready is when we send them a request,
    /// because the peer set has exclusive access to send requests to each peer. (If an inbound
    /// request is in progress, it will be handled, then our request will be sent by the connection
    /// task.)
    ///
    /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no
    /// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error.
    ///
    /// # Panics
    ///
    /// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests
    /// are sent to peers without putting them in `unready_peers`.
    fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> {
        let mut previous = HashMap::new();
        std::mem::swap(&mut previous, &mut self.ready_services);

        // TODO: consider only checking some peers each poll (for performance reasons),
        //       but make sure we eventually check all of them.
        for (key, mut svc) in previous.drain() {
            let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else {
                unreachable!(
                    "unexpected unready peer: peers must be put into the unready_peers list \
                     after sending them a request"
                );
            };

            match peer_readiness {
                // Still ready, add it back to the list.
                Ok(()) => self.push_ready(false, key, svc),

                // Ready -> Errored
                Err(error) => {
                    debug!(%error, "service failed while ready, dropping service");

                    // Ready services can just be dropped, they don't need any cleanup.
                    std::mem::drop(svc);
                }
            }
        }

        if self.ready_services.is_empty() {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }

    /// Returns the number of peer connections Zebra already has with
    /// the provided IP address
    ///
    /// # Performance
    ///
    /// This method is `O(connected peers)`, so it should not be called from a loop
    /// that is already iterating through the peer set.
    fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
        self.ready_services
            .keys()
            .chain(self.cancel_handles.keys())
            .filter(|addr| addr.ip() == ip)
            .count()
    }

    /// Returns `true` if Zebra is already connected to the IP and port in `addr`.
    fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
        self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
    }

    /// Processes the entire list of newly inserted or removed services.
    ///
    /// Puts inserted services in the unready list.
    /// Drops removed services, after cancelling any pending requests.
    ///
    /// If the peer connector channel is closed, returns an error.
    ///
    /// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't
    /// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`.
    fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
        // Return pending if there are no peers in the list.
        let mut result = Poll::Pending;

        loop {
            // If we've emptied the list, finish looping, otherwise process the new peer.
            let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else {
                break;
            };

            // If the change channel has a permanent error, return that error.
            let change = discovered
                .ok_or("discovery stream closed")?
                .map_err(Into::into)?;

            // Otherwise we have successfully processed a peer.
            result = Poll::Ready(Ok(()));

            // Process each change.
            match change {
                Change::Remove(key) => {
                    trace!(?key, "got Change::Remove from Discover");
                    self.remove(&key);
                }
                Change::Insert(key, svc) => {
                    // We add peers as unready, so that we:
                    // - always do the same checks on every ready peer, and
                    // - check for any errors that happened right after the handshake
                    trace!(?key, "got Change::Insert from Discover");

                    // # Security
                    //
                    // Drop the new peer if we are already connected to it.
                    // Preferring old connections avoids connection thrashing.
                    if self.has_peer_with_addr(key) {
                        std::mem::drop(svc);
                        continue;
                    }

                    // # Security
                    //
                    // drop the new peer if there are already `max_conns_per_ip` peers with
                    // the same IP address in the peer set.
                    if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
                        std::mem::drop(svc);
                        continue;
                    }

                    self.push_unready(key, svc);
                }
            }
        }

        result
    }

    /// Checks if the minimum peer version has changed, and disconnects from outdated peers.
    fn disconnect_from_outdated_peers(&mut self) {
        if let Some(minimum_version) = self.minimum_peer_version.changed() {
            // It is ok to drop ready services, they don't need anything cancelled.
            self.ready_services
                .retain(|_address, peer| peer.remote_version() >= minimum_version);
        }
    }

    /// Takes a ready service by key.
    fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> {
        if let Some(svc) = self.ready_services.remove(key) {
            assert!(
                !self.cancel_handles.contains_key(key),
                "cancel handles are only used for unready service work"
            );

            Some(svc)
        } else {
            None
        }
    }

    /// Remove the service corresponding to `key` from the peer set.
    ///
    /// Drops the service, cancelling any pending request or response to that peer.
    /// If the peer does not exist, does nothing.
    fn remove(&mut self, key: &D::Key) {
        if let Some(ready_service) = self.take_ready_service(key) {
            // A ready service has no work to cancel, so just drop it.
            std::mem::drop(ready_service);
        } else if let Some(handle) = self.cancel_handles.remove(key) {
            // Cancel the work, implicitly dropping the cancel handle.
            // The service future returns a `Canceled` error,
            // making `poll_unready` drop the service.
            let _ = handle.send(CancelClientWork);
        }
    }

    /// Adds a ready service to the ready list if it's for a peer with a supported version.
    /// If `was_unready` is true, also removes the peer's cancel handle.
    ///
    /// If the service is for a connection to an outdated peer, the service is dropped.
    fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) {
        let cancel = self.cancel_handles.remove(&key);
        assert_eq!(
            cancel.is_some(),
            was_unready,
            "missing or unexpected cancel handle"
        );

        if svc.remote_version() >= self.minimum_peer_version.current() {
            self.ready_services.insert(key, svc);
        } else {
            std::mem::drop(svc);
        }
    }

    /// Adds a busy service to the unready list if it's for a peer with a supported version,
    /// and adds a cancel handle for the service's current request.
    ///
    /// If the service is for a connection to an outdated peer, the request is cancelled and the
    /// service is dropped.
    fn push_unready(&mut self, key: D::Key, svc: D::Service) {
        let peer_version = svc.remote_version();
        let (tx, rx) = oneshot::channel();

        self.unready_services.push(UnreadyService {
            key: Some(key),
            service: Some(svc),
            cancel: rx,
            _req: PhantomData,
        });

        if peer_version >= self.minimum_peer_version.current() {
            self.cancel_handles.insert(key, tx);
        } else {
            // Cancel any request made to the service because it is using an outdated protocol
            // version.
            let _ = tx.send(CancelClientWork);
        }
    }

    /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service.
    fn select_ready_p2c_peer(&self) -> Option<D::Key> {
        self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect())
    }

    /// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service.
    #[allow(clippy::unwrap_in_result)]
    fn select_p2c_peer_from_list(&self, ready_service_list: &HashSet<D::Key>) -> Option<D::Key> {
        match ready_service_list.len() {
            0 => None,
            1 => Some(
                *ready_service_list
                    .iter()
                    .next()
                    .expect("just checked there is one service"),
            ),
            len => {
                // Choose 2 random peers, then return the least loaded of those 2 peers.
                let (a, b) = {
                    let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
                    let a = idxs.index(0);
                    let b = idxs.index(1);

                    let a = *ready_service_list
                        .iter()
                        .nth(a)
                        .expect("sample returns valid indexes");
                    let b = *ready_service_list
                        .iter()
                        .nth(b)
                        .expect("sample returns valid indexes");

                    (a, b)
                };

                let a_load = self.query_load(&a).expect("supplied services are ready");
                let b_load = self.query_load(&b).expect("supplied services are ready");

                let selected = if a_load <= b_load { a } else { b };

                trace!(
                    a.key = ?a,
                    a.load = ?a_load,
                    b.key = ?b,
                    b.load = ?b_load,
                    selected = ?selected,
                    ?len,
                    "selected service by p2c"
                );

                Some(selected)
            }
        }
    }

    /// Randomly chooses `max_peers` ready services, ignoring service load.
    ///
    /// The chosen peers are unique, but their order is not fully random.
    fn select_random_ready_peers(&self, max_peers: usize) -> Vec<D::Key> {
        use rand::seq::IteratorRandom;

        self.ready_services
            .keys()
            .copied()
            .choose_multiple(&mut rand::thread_rng(), max_peers)
    }

    /// Accesses a ready endpoint by `key` and returns its current load.
    ///
    /// Returns `None` if the service is not in the ready service list.
    fn query_load(&self, key: &D::Key) -> Option<<D::Service as Load>::Metric> {
        let svc = self.ready_services.get(key);
        svc.map(|svc| svc.load())
    }

    /// Routes a request using P2C load-balancing.
    fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
        if let Some(p2c_key) = self.select_ready_p2c_peer() {
            tracing::trace!(?p2c_key, "routing based on p2c");

            let mut svc = self
                .take_ready_service(&p2c_key)
                .expect("selected peer must be ready");

            let fut = svc.call(req);
            self.push_unready(p2c_key, svc);

            return fut.map_err(Into::into).boxed();
        }

        async move {
            // Let other tasks run, so a retry request might get different ready peers.
            tokio::task::yield_now().await;

            // # Security
            //
            // Avoid routing requests to peers that are missing inventory.
            // If we kept trying doomed requests, peers that are missing our requested inventory
            // could take up a large amount of our bandwidth and retry limits.
            Err(SharedPeerError::from(PeerError::NoReadyPeers))
        }
        .map_err(Into::into)
        .boxed()
    }

    /// Tries to route a request to a ready peer that advertised that inventory,
    /// falling back to a ready peer that isn't missing the inventory.
    ///
    /// If all ready peers are missing the inventory,
    /// returns a synthetic [`NotFoundRegistry`](PeerError::NotFoundRegistry) error.
    ///
    /// Uses P2C to route requests to the least loaded peer in each list.
    fn route_inv(
        &mut self,
        req: Request,
        hash: InventoryHash,
    ) -> <Self as tower::Service<Request>>::Future {
        let advertising_peer_list = self
            .inventory_registry
            .advertising_peers(hash)
            .filter(|&addr| self.ready_services.contains_key(addr))
            .copied()
            .collect();

        // # Security
        //
        // Choose a random, less-loaded peer with the inventory.
        //
        // If we chose the first peer in HashMap order,
        // peers would be able to influence our choice by switching addresses.
        // But we need the choice to be random,
        // so that a peer can't provide all our inventory responses.
        let peer = self.select_p2c_peer_from_list(&advertising_peer_list);

        if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
            let peer = peer.expect("just checked peer is Some");
            tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory");
            let fut = svc.call(req);
            self.push_unready(peer, svc);
            return fut.map_err(Into::into).boxed();
        }

        let missing_peer_list: HashSet<PeerSocketAddr> = self
            .inventory_registry
            .missing_peers(hash)
            .copied()
            .collect();
        let maybe_peer_list = self
            .ready_services
            .keys()
            .filter(|addr| !missing_peer_list.contains(addr))
            .copied()
            .collect();

        // Security: choose a random, less-loaded peer that might have the inventory.
        let peer = self.select_p2c_peer_from_list(&maybe_peer_list);

        if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
            let peer = peer.expect("just checked peer is Some");
            tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory");
            let fut = svc.call(req);
            self.push_unready(peer, svc);
            return fut.map_err(Into::into).boxed();
        }

        tracing::debug!(
            ?hash,
            "all ready peers are missing inventory, failing request"
        );

        async move {
            // Let other tasks run, so a retry request might get different ready peers.
            tokio::task::yield_now().await;

            // # Security
            //
            // Avoid routing requests to peers that are missing inventory.
            // If we kept trying doomed requests, peers that are missing our requested inventory
            // could take up a large amount of our bandwidth and retry limits.
            Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
                hash,
            ])))
        }
        .map_err(Into::into)
        .boxed()
    }

    /// Routes the same request to up to `max_peers` ready peers, ignoring return values.
    ///
    /// `max_peers` must be at least one, and at most the number of ready peers.
    fn route_multiple(
        &mut self,
        req: Request,
        max_peers: usize,
    ) -> <Self as tower::Service<Request>>::Future {
        assert!(
            max_peers > 0,
            "requests must be routed to at least one peer"
        );
        assert!(
            max_peers <= self.ready_services.len(),
            "requests can only be routed to ready peers"
        );

        // # Security
        //
        // We choose peers randomly, ignoring load.
        // This avoids favouring malicious peers, because peers can influence their own load.
        //
        // The order of peers isn't completely random,
        // but peer request order is not security-sensitive.

        let futs = FuturesUnordered::new();
        for key in self.select_random_ready_peers(max_peers) {
            let mut svc = self
                .take_ready_service(&key)
                .expect("selected peers are ready");
            futs.push(svc.call(req.clone()).map_err(|_| ()));
            self.push_unready(key, svc);
        }

        async move {
            let results = futs.collect::<Vec<Result<_, _>>>().await;
            tracing::debug!(
                ok.len = results.iter().filter(|r| r.is_ok()).count(),
                err.len = results.iter().filter(|r| r.is_err()).count(),
                "sent peer request to multiple peers"
            );
            Ok(Response::Nil)
        }
        .boxed()
    }

    /// Broadcasts the same request to lots of ready peers, ignoring return values.
    fn route_broadcast(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
        // Broadcasts ignore the response
        self.route_multiple(req, self.number_of_peers_to_broadcast())
    }

    /// Given a number of ready peers calculate to how many of them Zebra will
    /// actually send the request to. Return this number.
    pub(crate) fn number_of_peers_to_broadcast(&self) -> usize {
        // We are currently sending broadcast messages to a third of the total peers.
        const PEER_FRACTION_TO_BROADCAST: usize = 3;

        // Round up, so that if we have one ready peer, it gets the request.
        div_ceil(self.ready_services.len(), PEER_FRACTION_TO_BROADCAST)
    }

    /// Returns the list of addresses in the peer set.
    fn peer_set_addresses(&self) -> Vec<PeerSocketAddr> {
        self.ready_services
            .keys()
            .chain(self.cancel_handles.keys())
            .cloned()
            .collect()
    }

    /// Logs the peer set size, and any potential connectivity issues.
    fn log_peer_set_size(&mut self) {
        let ready_services_len = self.ready_services.len();
        let unready_services_len = self.unready_services.len();
        trace!(ready_peers = ?ready_services_len, unready_peers = ?unready_services_len);

        let now = Instant::now();

        // These logs are designed to be human-readable in a terminal, at the
        // default Zebra log level. If you need to know the peer set size for
        // every request, use the trace-level logs, or the metrics exporter.
        if let Some(last_peer_log) = self.last_peer_log {
            // Avoid duplicate peer set logs
            if now.duration_since(last_peer_log) < MIN_PEER_SET_LOG_INTERVAL {
                return;
            }
        } else {
            // Suppress initial logs until the peer set has started up.
            // There can be multiple initial requests before the first peer is
            // ready.
            self.last_peer_log = Some(now);
            return;
        }

        self.last_peer_log = Some(now);

        // Log potential duplicate connections.
        let peers = self.peer_set_addresses();

        // Check for duplicates by address and port: these are unexpected and represent a bug.
        let duplicates: Vec<PeerSocketAddr> = peers.iter().duplicates().cloned().collect();

        let mut peer_counts = peers.iter().counts();
        peer_counts.retain(|peer, _count| duplicates.contains(peer));

        if !peer_counts.is_empty() {
            let duplicate_connections: usize = peer_counts.values().sum();

            warn!(
                ?duplicate_connections,
                duplicated_peers = ?peer_counts.len(),
                peers = ?peers.len(),
                "duplicate peer connections in peer set"
            );
        }

        // Check for duplicates by address: these can happen if there are multiple nodes
        // behind a NAT or on a single server.
        let peers: Vec<IpAddr> = peers.iter().map(|addr| addr.ip()).collect();
        let duplicates: Vec<IpAddr> = peers.iter().duplicates().cloned().collect();

        let mut peer_counts = peers.iter().counts();
        peer_counts.retain(|peer, _count| duplicates.contains(peer));

        if !peer_counts.is_empty() {
            let duplicate_connections: usize = peer_counts.values().sum();

            info!(
                ?duplicate_connections,
                duplicated_peers = ?peer_counts.len(),
                peers = ?peers.len(),
                "duplicate IP addresses in peer set"
            );
        }

        // Only log connectivity warnings if all our peers are busy (or there are no peers).
        if ready_services_len > 0 {
            return;
        }

        let address_metrics = *self.address_metrics.borrow();
        if unready_services_len == 0 {
            warn!(
                ?address_metrics,
                "network request with no peer connections. Hint: check your network connection"
            );
        } else {
            info!(?address_metrics, "network request with no ready peers: finding more peers, waiting for {} peers to answer requests",
                  unready_services_len);
        }
    }

    /// Updates the peer set metrics.
    ///
    /// # Panics
    ///
    /// If the peer set size exceeds the connection limit.
    fn update_metrics(&self) {
        let num_ready = self.ready_services.len();
        let num_unready = self.unready_services.len();
        let num_peers = num_ready + num_unready;
        metrics::gauge!("pool.num_ready").set(num_ready as f64);
        metrics::gauge!("pool.num_unready").set(num_unready as f64);
        metrics::gauge!("zcash.net.peers").set(num_peers as f64);

        // Security: make sure we haven't exceeded the connection limit
        if num_peers > self.peerset_total_connection_limit {
            let address_metrics = *self.address_metrics.borrow();
            panic!(
                "unexpectedly exceeded configured peer set connection limit: \n\
                 peers: {num_peers:?}, ready: {num_ready:?}, unready: {num_unready:?}, \n\
                 address_metrics: {address_metrics:?}",
            );
        }
    }
}

impl<D, C> Service<Request> for PeerSet<D, C>
where
    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
    D::Error: Into<BoxError>,
    C: ChainTip,
{
    type Response = Response;
    type Error = BoxError;
    type Future =
        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Update service and peer statuses.
        //
        // # Correctness
        //
        // All of the futures that receive a context from this method can wake the peer set buffer
        // task. If there are no ready peers, and no new peers, network requests will pause until:
        // - an unready peer becomes ready, or
        // - a new peer arrives.

        // Check for new peers, and register a task wakeup when the next new peers arrive. New peers
        // can be infrequent if our connection slots are full, or we're connected to all
        // available/useful peers.
        let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?;

        // These tasks don't provide new peers or newly ready peers.
        let _poll_pending: Poll<()> = self.poll_background_errors(cx)?;
        let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?;

        // Check for newly ready peers, including newly added peers (which are added as unready).
        // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready
        // peers.
        //
        // Each connected peer should become ready within a few minutes, or timeout, close the
        // connection, and release its connection slot.
        //
        // TODO: drop peers that overload us with inbound messages and never become ready (#7822)
        let _poll_pending_or_ready: Poll<Option<()>> = self.poll_unready(cx)?;

        // Cleanup and metrics.

        // Only checks the versions of ready peers, so it needs to run after `poll_unready()`.
        self.disconnect_from_outdated_peers();

        // These metrics should run last, to report the most up-to-date information.
        self.log_peer_set_size();
        self.update_metrics();

        // Check for failures in ready peers, removing newly errored or disconnected peers.
        // So it needs to run after `poll_unready()`.
        let ready_peers: Poll<()> = self.poll_ready_peer_errors(cx);

        if ready_peers.is_pending() {
            // # Correctness
            //
            // If the channel is full, drop the demand signal rather than waiting. If we waited
            // here, the crawler could deadlock sending a request to fetch more peers, because it
            // also empties the channel.
            trace!("no ready services, sending demand signal");
            let _ = self.demand_signal.try_send(MorePeers);

            // # Correctness
            //
            // The current task must be scheduled for wakeup every time we return `Poll::Pending`.
            //
            // As long as there are unready or new peers, this task will run, because:
            // - `poll_discover` schedules this task for wakeup when new peers arrive.
            // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this
            //   task for wakeup when peer services become ready.
            //
            // To avoid peers blocking on a full peer status/error channel:
            // - `poll_background_errors` schedules this task for wakeup when the peer status
            //   update task exits.
            Poll::Pending
        } else {
            Poll::Ready(Ok(()))
        }
    }

    fn call(&mut self, req: Request) -> Self::Future {
        let fut = match req {
            // Only do inventory-aware routing on individual items.
            Request::BlocksByHash(ref hashes) if hashes.len() == 1 => {
                let hash = InventoryHash::from(*hashes.iter().next().unwrap());
                self.route_inv(req, hash)
            }
            Request::TransactionsById(ref hashes) if hashes.len() == 1 => {
                let hash = InventoryHash::from(*hashes.iter().next().unwrap());
                self.route_inv(req, hash)
            }

            // Broadcast advertisements to lots of peers
            Request::AdvertiseTransactionIds(_) => self.route_broadcast(req),
            Request::AdvertiseBlock(_) => self.route_broadcast(req),

            // Choose a random less-loaded peer for all other requests
            _ => self.route_p2c(req),
        };
        self.update_metrics();

        fut
    }
}