1use std::{
8 collections::{BTreeMap, HashMap, HashSet},
9 convert::Infallible,
10 net::{IpAddr, SocketAddr},
11 pin::Pin,
12 sync::Arc,
13 time::Duration,
14};
15
16use futures::{
17 future::{self, FutureExt},
18 sink::SinkExt,
19 stream::{FuturesUnordered, StreamExt},
20 Future, TryFutureExt,
21};
22use indexmap::IndexMap;
23use rand::seq::SliceRandom;
24use tokio::{
25 net::{TcpListener, TcpStream},
26 sync::{broadcast, mpsc, watch},
27 time::{sleep, Instant},
28};
29use tokio_stream::wrappers::IntervalStream;
30use tower::{
31 buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
32};
33use tracing_futures::Instrument;
34
35use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
36
37use crate::{
38 address_book_updater::{AddressBookUpdater, MIN_CHANNEL_SIZE},
39 constants,
40 meta_addr::{MetaAddr, MetaAddrChange},
41 peer::{
42 self, address_is_valid_for_inbound_listeners, HandshakeRequest, MinimumPeerVersion,
43 OutboundConnectorRequest, PeerPreference,
44 },
45 peer_cache_updater::peer_cache_updater,
46 peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
47 AddressBook, BoxError, Config, PeerSocketAddr, Request, Response,
48};
49
50#[cfg(test)]
51mod tests;
52
53mod recent_by_ip;
54
55type DiscoveredPeer = (PeerSocketAddr, peer::Client);
64
65pub async fn init<S, C>(
98 config: Config,
99 inbound_service: S,
100 latest_chain_tip: C,
101 user_agent: String,
102) -> (
103 Buffer<BoxService<Request, Response, BoxError>, Request>,
104 Arc<std::sync::Mutex<AddressBook>>,
105 mpsc::Sender<(PeerSocketAddr, u32)>,
106)
107where
108 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
109 S::Future: Send + 'static,
110 C: ChainTip + Clone + Send + Sync + 'static,
111{
112 let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
113
114 let (
115 address_book,
116 bans_receiver,
117 address_book_updater,
118 address_metrics,
119 address_book_updater_guard,
120 ) = AddressBookUpdater::spawn(&config, listen_addr);
121
122 let (misbehavior_tx, mut misbehavior_rx) = mpsc::channel(
123 config
126 .peerset_total_connection_limit()
127 .max(MIN_CHANNEL_SIZE),
128 );
129
130 let misbehaviour_updater = address_book_updater.clone();
131 tokio::spawn(
132 async move {
133 let mut misbehaviors: HashMap<PeerSocketAddr, u32> = HashMap::new();
134 let mut flush_timer =
137 IntervalStream::new(tokio::time::interval(Duration::from_secs(30)));
138
139 loop {
140 tokio::select! {
141 msg = misbehavior_rx.recv() => match msg {
142 Some((peer_addr, score_increment)) => *misbehaviors
143 .entry(peer_addr)
144 .or_default()
145 += score_increment,
146 None => break,
147 },
148
149 _ = flush_timer.next() => {
150 for (addr, score_increment) in misbehaviors.drain() {
151 let _ = misbehaviour_updater
152 .send(MetaAddrChange::UpdateMisbehavior {
153 addr,
154 score_increment
155 })
156 .await;
157 }
158 },
159 };
160 }
161
162 tracing::warn!("exiting misbehavior update batch task");
163 }
164 .in_current_span(),
165 );
166
167 let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());
175
176 let (listen_handshaker, outbound_connector) = {
181 use tower::timeout::TimeoutLayer;
182 let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
183 use crate::protocol::external::types::PeerServices;
184 let hs = peer::Handshake::builder()
185 .with_config(config.clone())
186 .with_inbound_service(inbound_service)
187 .with_inventory_collector(inv_sender)
188 .with_address_book_updater(address_book_updater.clone())
189 .with_advertised_services(PeerServices::NODE_NETWORK)
190 .with_user_agent(user_agent)
191 .with_latest_chain_tip(latest_chain_tip.clone())
192 .want_transactions(true)
193 .finish()
194 .expect("configured all required parameters");
195 (
196 hs_timeout.layer(hs.clone()),
197 hs_timeout.layer(peer::Connector::new(hs)),
198 )
199 };
200
201 let (peerset_tx, peerset_rx) =
207 futures::channel::mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit());
208
209 let discovered_peers = peerset_rx.map(|(address, client)| {
210 Result::<_, Infallible>::Ok(Change::Insert(address, client.into()))
211 });
212
213 let (mut demand_tx, demand_rx) =
216 futures::channel::mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());
217
218 let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
220
221 let peer_set = PeerSet::new(
223 &config,
224 discovered_peers,
225 demand_tx.clone(),
226 handle_rx,
227 inv_receiver,
228 bans_receiver.clone(),
229 address_metrics,
230 MinimumPeerVersion::new(latest_chain_tip, &config.network),
231 None,
232 );
233 let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
234
235 let listen_fut = accept_inbound_connections(
239 config.clone(),
240 tcp_listener,
241 constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL,
242 listen_handshaker,
243 peerset_tx.clone(),
244 bans_receiver,
245 );
246 let listen_guard = tokio::spawn(listen_fut.in_current_span());
247
248 let initial_peers_fut = add_initial_peers(
250 config.clone(),
251 outbound_connector.clone(),
252 peerset_tx.clone(),
253 address_book_updater.clone(),
254 );
255 let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
256
257 let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
259
260 let mut active_outbound_connections = initial_peers_join
262 .wait_for_panics()
263 .await
264 .expect("unexpected error connecting to initial peers");
265 let active_initial_peer_count = active_outbound_connections.update_count();
266
267 info!(
275 ?active_initial_peer_count,
276 "sending initial request for peers"
277 );
278 let _ = candidates.update_initial(active_initial_peer_count).await;
279
280 let demand_count = config
282 .peerset_initial_target_size
283 .saturating_sub(active_outbound_connections.update_count());
284
285 for _ in 0..demand_count {
286 let _ = demand_tx.try_send(MorePeers);
287 }
288
289 let crawl_fut = crawl_and_dial(
291 config.clone(),
292 demand_tx,
293 demand_rx,
294 candidates,
295 outbound_connector,
296 peerset_tx,
297 active_outbound_connections,
298 address_book_updater,
299 );
300 let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
301
302 let peer_cache_updater_fut = peer_cache_updater(config, address_book.clone());
304 let peer_cache_updater_guard = tokio::spawn(peer_cache_updater_fut.in_current_span());
305
306 handle_tx
307 .send(vec![
308 listen_guard,
309 crawl_guard,
310 address_book_updater_guard,
311 peer_cache_updater_guard,
312 ])
313 .unwrap();
314
315 (peer_set, address_book, misbehavior_tx)
316}
317
318#[instrument(skip(config, outbound_connector, peerset_tx, address_book_updater))]
323async fn add_initial_peers<S>(
324 config: Config,
325 outbound_connector: S,
326 mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
327 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
328) -> Result<ActiveConnectionCounter, BoxError>
329where
330 S: Service<
331 OutboundConnectorRequest,
332 Response = (PeerSocketAddr, peer::Client),
333 Error = BoxError,
334 > + Clone
335 + Send
336 + 'static,
337 S::Future: Send + 'static,
338{
339 let initial_peers = limit_initial_peers(&config, address_book_updater).await;
340
341 let mut handshake_success_total: usize = 0;
342 let mut handshake_error_total: usize = 0;
343
344 let mut active_outbound_connections = ActiveConnectionCounter::new_counter_with(
345 config.peerset_outbound_connection_limit(),
346 "Outbound Connections",
347 );
348
349 let ipv4_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv4()).count();
351 let ipv6_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv6()).count();
352 info!(
353 ?ipv4_peer_count,
354 ?ipv6_peer_count,
355 "connecting to initial peer set"
356 );
357
358 let mut handshakes: FuturesUnordered<_> = initial_peers
370 .into_iter()
371 .enumerate()
372 .map(|(i, addr)| {
373 let connection_tracker = active_outbound_connections.track_connection();
374 let req = OutboundConnectorRequest {
375 addr,
376 connection_tracker,
377 };
378 let outbound_connector = outbound_connector.clone();
379
380 tokio::spawn(
382 async move {
383 sleep(
387 constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32),
388 )
389 .await;
390
391 outbound_connector
394 .oneshot(req)
395 .map_err(move |e| (addr, e))
396 .await
397 }
398 .in_current_span(),
399 )
400 .wait_for_panics()
401 })
402 .collect();
403
404 while let Some(handshake_result) = handshakes.next().await {
405 match handshake_result {
406 Ok(change) => {
407 handshake_success_total += 1;
408 debug!(
409 ?handshake_success_total,
410 ?handshake_error_total,
411 ?change,
412 "an initial peer handshake succeeded"
413 );
414
415 peerset_tx.send(change).await?;
417 }
418 Err((addr, ref e)) => {
419 handshake_error_total += 1;
420
421 let mut expected_error = false;
423 if let Some(io_error) = e.downcast_ref::<tokio::io::Error>() {
424 if io_error.kind() == tokio::io::ErrorKind::AddrNotAvailable {
427 expected_error = true;
428 }
429 }
430
431 if expected_error {
432 debug!(
433 successes = ?handshake_success_total,
434 errors = ?handshake_error_total,
435 ?addr,
436 ?e,
437 "an initial peer connection failed"
438 );
439 } else {
440 info!(
441 successes = ?handshake_success_total,
442 errors = ?handshake_error_total,
443 ?addr,
444 %e,
445 "an initial peer connection failed"
446 );
447 }
448 }
449 }
450
451 tokio::task::yield_now().await;
455 }
456
457 let outbound_connections = active_outbound_connections.update_count();
458 info!(
459 ?handshake_success_total,
460 ?handshake_error_total,
461 ?outbound_connections,
462 "finished connecting to initial seed and disk cache peers"
463 );
464
465 Ok(active_outbound_connections)
466}
467
468async fn limit_initial_peers(
476 config: &Config,
477 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
478) -> HashSet<PeerSocketAddr> {
479 let all_peers: HashSet<PeerSocketAddr> = config.initial_peers().await;
480 let mut preferred_peers: BTreeMap<PeerPreference, Vec<PeerSocketAddr>> = BTreeMap::new();
481
482 let all_peers_count = all_peers.len();
483 if all_peers_count > config.peerset_initial_target_size {
484 info!(
485 "limiting the initial peers list from {} to {}",
486 all_peers_count, config.peerset_initial_target_size,
487 );
488 }
489
490 for peer_addr in all_peers {
493 let preference = PeerPreference::new(peer_addr, config.network.clone());
494
495 match preference {
496 Ok(preference) => preferred_peers
497 .entry(preference)
498 .or_default()
499 .push(peer_addr),
500 Err(error) => info!(
501 ?peer_addr,
502 ?error,
503 "invalid initial peer from DNS seeder, configured IP address, or disk cache",
504 ),
505 }
506 }
507
508 for peer in preferred_peers.values().flatten() {
517 let peer_addr = MetaAddr::new_initial_peer(*peer);
518 let _ = address_book_updater.send(peer_addr).await;
521 }
522
523 let mut initial_peers: HashSet<PeerSocketAddr> = HashSet::new();
526 for better_peers in preferred_peers.values() {
527 let mut better_peers = better_peers.clone();
528 let (chosen_peers, _unused_peers) = better_peers.partial_shuffle(
529 &mut rand::thread_rng(),
530 config.peerset_initial_target_size - initial_peers.len(),
531 );
532
533 initial_peers.extend(chosen_peers.iter());
534
535 if initial_peers.len() >= config.peerset_initial_target_size {
536 break;
537 }
538 }
539
540 initial_peers
541}
542
543#[instrument(skip(config), fields(addr = ?config.listen_addr))]
553pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
554 if let Err(wrong_addr) =
556 address_is_valid_for_inbound_listeners(config.listen_addr, config.network.clone())
557 {
558 warn!(
559 "We are configured with address {} on {:?}, but it could cause network issues. \
560 The default port for {:?} is {}. Error: {wrong_addr:?}",
561 config.listen_addr,
562 config.network,
563 config.network,
564 config.network.default_port(),
565 );
566 }
567
568 info!(
569 "Trying to open Zcash protocol endpoint at {}...",
570 config.listen_addr
571 );
572 let listener_result = TcpListener::bind(config.listen_addr).await;
573
574 let listener = match listener_result {
575 Ok(l) => l,
576 Err(e) => panic!(
577 "Opening Zcash network protocol listener {:?} failed: {e:?}. \
578 Hint: Check if another zebrad or zcashd process is running. \
579 Try changing the network listen_addr in the Zebra config.",
580 config.listen_addr,
581 ),
582 };
583
584 let local_addr = listener
585 .local_addr()
586 .expect("unexpected missing local addr for open listener");
587 info!("Opened Zcash protocol endpoint at {}", local_addr);
588
589 (listener, local_addr)
590}
591
592#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
601async fn accept_inbound_connections<S>(
602 config: Config,
603 listener: TcpListener,
604 min_inbound_peer_connection_interval: Duration,
605 handshaker: S,
606 peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
607 bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
608) -> Result<(), BoxError>
609where
610 S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
611 + Clone,
612 S::Future: Send + 'static,
613{
614 let mut recent_inbound_connections =
615 recent_by_ip::RecentByIp::new(None, Some(config.max_connections_per_ip));
616
617 let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with(
618 config.peerset_inbound_connection_limit(),
619 "Inbound Connections",
620 );
621
622 let mut handshakes: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
623 FuturesUnordered::new();
624 handshakes.push(future::pending().boxed());
626
627 loop {
628 let inbound_result = tokio::select! {
630 biased;
631 next_handshake_res = handshakes.next() => match next_handshake_res {
632 Some(()) => continue,
634 None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
635 },
636
637 inbound_result = listener.accept() => inbound_result,
639 };
640
641 if let Ok((tcp_stream, addr)) = inbound_result {
642 let addr: PeerSocketAddr = addr.into();
643
644 if bans_receiver.borrow().clone().contains_key(&addr.ip()) {
645 debug!(?addr, "banned inbound connection attempt");
646 std::mem::drop(tcp_stream);
647 continue;
648 }
649
650 if active_inbound_connections.update_count()
651 >= config.peerset_inbound_connection_limit()
652 || recent_inbound_connections.is_past_limit_or_add(addr.ip())
653 {
654 std::mem::drop(tcp_stream);
657 tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
660 continue;
661 }
662
663 let connection_tracker = active_inbound_connections.track_connection();
666 debug!(
667 inbound_connections = ?active_inbound_connections.update_count(),
668 "handshaking on an open inbound peer connection"
669 );
670
671 let handshake_task = accept_inbound_handshake(
672 addr,
673 handshaker.clone(),
674 tcp_stream,
675 connection_tracker,
676 peerset_tx.clone(),
677 )
678 .await?
679 .wait_for_panics();
680
681 handshakes.push(handshake_task);
682
683 tokio::time::sleep(min_inbound_peer_connection_interval).await;
693 } else {
694 debug!(?inbound_result, "error accepting inbound connection");
697 tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
698 }
699
700 tokio::task::yield_now().await;
708 }
709}
710
711#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
719async fn accept_inbound_handshake<S>(
720 addr: PeerSocketAddr,
721 mut handshaker: S,
722 tcp_stream: TcpStream,
723 connection_tracker: ConnectionTracker,
724 peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
725) -> Result<tokio::task::JoinHandle<()>, BoxError>
726where
727 S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
728 + Clone,
729 S::Future: Send + 'static,
730{
731 let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
732
733 debug!("got incoming connection");
734
735 handshaker.ready().await?;
742
743 let handshake = handshaker.call(HandshakeRequest {
745 data_stream: tcp_stream,
746 connected_addr,
747 connection_tracker,
748 });
749 let mut peerset_tx = peerset_tx.clone();
751
752 let handshake_task = tokio::spawn(
753 async move {
754 let handshake_result = handshake.await;
755
756 if let Ok(client) = handshake_result {
757 let _ = peerset_tx.send((addr, client)).await;
759 } else {
760 debug!(?handshake_result, "error handshaking with inbound peer");
761 }
762 }
763 .in_current_span(),
764 );
765
766 Ok(handshake_task)
767}
768
769enum CrawlerAction {
771 DemandDrop,
773 DemandHandshakeOrCrawl,
777 TimerCrawl { tick: Instant },
779 HandshakeFinished,
781 DemandCrawlFinished,
783 TimerCrawlFinished,
785}
786
787#[allow(clippy::too_many_arguments)]
805#[instrument(
806 skip(
807 config,
808 demand_tx,
809 demand_rx,
810 candidates,
811 outbound_connector,
812 peerset_tx,
813 active_outbound_connections,
814 address_book_updater,
815 ),
816 fields(
817 new_peer_interval = ?config.crawl_new_peer_interval,
818 )
819)]
820async fn crawl_and_dial<C, S>(
821 config: Config,
822 demand_tx: futures::channel::mpsc::Sender<MorePeers>,
823 mut demand_rx: futures::channel::mpsc::Receiver<MorePeers>,
824 candidates: CandidateSet<S>,
825 outbound_connector: C,
826 peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
827 mut active_outbound_connections: ActiveConnectionCounter,
828 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
829) -> Result<(), BoxError>
830where
831 C: Service<
832 OutboundConnectorRequest,
833 Response = (PeerSocketAddr, peer::Client),
834 Error = BoxError,
835 > + Clone
836 + Send
837 + 'static,
838 C::Future: Send + 'static,
839 S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
840 S::Future: Send + 'static,
841{
842 use CrawlerAction::*;
843
844 info!(
845 crawl_new_peer_interval = ?config.crawl_new_peer_interval,
846 outbound_connections = ?active_outbound_connections.update_count(),
847 "starting the peer address crawler",
848 );
849
850 let candidates = Arc::new(futures::lock::Mutex::new(candidates));
856
857 let mut handshakes: FuturesUnordered<
859 Pin<Box<dyn Future<Output = Result<CrawlerAction, BoxError>> + Send>>,
860 > = FuturesUnordered::new();
861 handshakes.push(future::pending().boxed());
864
865 let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval);
866 crawl_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
869
870 let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick });
871
872 loop {
877 metrics::gauge!("crawler.in_flight_handshakes").set(
878 handshakes
879 .len()
880 .checked_sub(1)
881 .expect("the pool always contains an unresolved future") as f64,
882 );
883
884 let crawler_action = tokio::select! {
885 biased;
886 next_handshake_res = handshakes.next() => next_handshake_res.expect(
889 "handshakes never terminates, because it contains a future that never resolves"
890 ),
891 next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")),
893 next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{
899 if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
900 DemandDrop
902 } else {
903 DemandHandshakeOrCrawl
904 }
905 })
906 };
907
908 match crawler_action {
909 Ok(DemandDrop) => {
911 trace!("too many open connections or in-flight handshakes, dropping demand signal");
914 }
915
916 Ok(DemandHandshakeOrCrawl) => {
918 let candidates = candidates.clone();
919 let outbound_connector = outbound_connector.clone();
920 let peerset_tx = peerset_tx.clone();
921 let address_book_updater = address_book_updater.clone();
922 let demand_tx = demand_tx.clone();
923
924 let outbound_connection_tracker = active_outbound_connections.track_connection();
926 let outbound_connections = active_outbound_connections.update_count();
927 debug!(?outbound_connections, "opening an outbound peer connection");
928
929 let handshake_or_crawl_handle = tokio::spawn(
938 async move {
939 let candidate = { candidates.lock().await.next().await };
946
947 if let Some(candidate) = candidate {
948 dial(
950 candidate,
951 outbound_connector,
952 outbound_connection_tracker,
953 outbound_connections,
954 peerset_tx,
955 address_book_updater,
956 demand_tx,
957 )
958 .await?;
959
960 Ok(HandshakeFinished)
961 } else {
962 debug!("demand for peers but no available candidates");
964
965 crawl(candidates, demand_tx, false).await?;
966
967 Ok(DemandCrawlFinished)
968 }
969 }
970 .in_current_span(),
971 )
972 .wait_for_panics();
973
974 handshakes.push(handshake_or_crawl_handle);
975 }
976 Ok(TimerCrawl { tick }) => {
977 let candidates = candidates.clone();
978 let demand_tx = demand_tx.clone();
979 let should_always_dial = active_outbound_connections.update_count() == 0;
980
981 let crawl_handle = tokio::spawn(
982 async move {
983 debug!(
984 ?tick,
985 "crawling for more peers in response to the crawl timer"
986 );
987
988 crawl(candidates, demand_tx, should_always_dial).await?;
989
990 Ok(TimerCrawlFinished)
991 }
992 .in_current_span(),
993 )
994 .wait_for_panics();
995
996 handshakes.push(crawl_handle);
997 }
998
999 Ok(HandshakeFinished) => {
1001 }
1003 Ok(DemandCrawlFinished) => {
1004 trace!("demand-based crawl finished");
1007 }
1008 Ok(TimerCrawlFinished) => {
1009 debug!("timer-based crawl finished");
1010 }
1011
1012 Err(error) => {
1014 info!(?error, "crawler task exiting due to an error");
1015 return Err(error);
1016 }
1017 }
1018
1019 tokio::task::yield_now().await;
1023 }
1024}
1025
1026#[instrument(skip(candidates, demand_tx))]
1029async fn crawl<S>(
1030 candidates: Arc<futures::lock::Mutex<CandidateSet<S>>>,
1031 mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1032 should_always_dial: bool,
1033) -> Result<(), BoxError>
1034where
1035 S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
1036 S::Future: Send + 'static,
1037{
1038 let result = {
1042 let result = candidates.lock().await.update().await;
1043 std::mem::drop(candidates);
1044 result
1045 };
1046 let more_peers = match result {
1047 Ok(more_peers) => more_peers.or_else(|| should_always_dial.then_some(MorePeers)),
1048 Err(e) => {
1049 info!(
1050 ?e,
1051 "candidate set returned an error, is Zebra shutting down?"
1052 );
1053 return Err(e);
1054 }
1055 };
1056
1057 if let Some(more_peers) = more_peers {
1068 if let Err(send_error) = demand_tx.try_send(more_peers) {
1069 if send_error.is_disconnected() {
1070 return Err(send_error.into());
1072 }
1073 }
1074 }
1075
1076 Ok(())
1077}
1078
1079#[instrument(skip(
1086 outbound_connector,
1087 outbound_connection_tracker,
1088 outbound_connections,
1089 peerset_tx,
1090 address_book_updater,
1091 demand_tx
1092))]
1093async fn dial<C>(
1094 candidate: MetaAddr,
1095 mut outbound_connector: C,
1096 outbound_connection_tracker: ConnectionTracker,
1097 outbound_connections: usize,
1098 mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
1099 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1100 mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1101) -> Result<(), BoxError>
1102where
1103 C: Service<
1104 OutboundConnectorRequest,
1105 Response = (PeerSocketAddr, peer::Client),
1106 Error = BoxError,
1107 > + Clone
1108 + Send
1109 + 'static,
1110 C::Future: Send + 'static,
1111{
1112 const MAX_CONNECTIONS_FOR_INFO_LOG: usize = 5;
1115
1116 debug!(?candidate.addr, "attempting outbound connection in response to demand");
1123
1124 let outbound_connector = outbound_connector.ready().await?;
1126
1127 let req = OutboundConnectorRequest {
1128 addr: candidate.addr,
1129 connection_tracker: outbound_connection_tracker,
1130 };
1131
1132 let handshake_result = outbound_connector.call(req).map(Into::into).await;
1134
1135 match handshake_result {
1136 Ok((address, client)) => {
1137 debug!(?candidate.addr, "successfully dialed new peer");
1138
1139 peerset_tx.send((address, client)).await?;
1141 }
1142 Err(error) => {
1144 if outbound_connections <= MAX_CONNECTIONS_FOR_INFO_LOG && !cfg!(test) {
1147 info!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1148 } else {
1149 debug!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1150 }
1151 report_failed(address_book_updater.clone(), candidate).await;
1152
1153 if let Err(send_error) = demand_tx.try_send(MorePeers) {
1160 if send_error.is_disconnected() {
1161 return Err(send_error.into());
1163 }
1164 }
1165 }
1166 }
1167
1168 Ok(())
1169}
1170
1171#[instrument(skip(address_book_updater))]
1173async fn report_failed(
1174 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1175 addr: MetaAddr,
1176) {
1177 let addr = MetaAddr::new_errored(addr.addr, None);
1179
1180 let _ = address_book_updater.send(addr).await;
1182}