From 0e45b96440a2e724d84b73d7f5f3fbeaba092287 Mon Sep 17 00:00:00 2001 From: Rayhaan Jaufeerally Date: Sun, 18 Aug 2024 13:59:28 +0000 Subject: [PATCH] Misc cleanups --- crates/bgp_packet/src/nlri.rs | 16 +- crates/route_client/src/netlink.rs | 1 - crates/server/src/bgp_server.rs | 27 ---- crates/server/src/peer.rs | 229 +++++++++++++++-------------- crates/server/src/rib_manager.rs | 3 +- crates/server/src/route_server.rs | 20 ++- 6 files changed, 132 insertions(+), 164 deletions(-) diff --git a/crates/bgp_packet/src/nlri.rs b/crates/bgp_packet/src/nlri.rs index 8ae52e4..3836f1a 100644 --- a/crates/bgp_packet/src/nlri.rs +++ b/crates/bgp_packet/src/nlri.rs @@ -28,7 +28,6 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::convert::TryInto; use std::fmt; -use std::io::ErrorKind; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; @@ -283,7 +282,7 @@ mod tests { use std::convert::TryFrom; use super::NLRI; - use crate::constants::AddressFamilyIdentifier::{Ipv4, Ipv6}; + use crate::constants::AddressFamilyIdentifier::{self, Ipv4, Ipv6}; use crate::traits::ParserContext; use crate::traits::ReadablePacket; use crate::traits::WritablePacket; @@ -373,17 +372,4 @@ mod tests { assert_eq!(reparsed, parsed_nlri); } } - - // #[test] - // fn test_to_string_invalids() { - // let invalid_v4 = NLRI { - // afi: AddressFamilyIdentifier::Ipv4, - // prefix: vec![1, 2, 3, 4, 5], - // prefixlen: 16, - // }; - // assert_eq!( - // "a formatting trait implementation returned an error: Error", - // format!("{}", invalid_v4) - // ); - // } } diff --git a/crates/route_client/src/netlink.rs b/crates/route_client/src/netlink.rs index d0ba170..dc35784 100644 --- a/crates/route_client/src/netlink.rs +++ b/crates/route_client/src/netlink.rs @@ -6,7 +6,6 @@ use netlink_packet_route::route::RouteAddress; use netlink_packet_route::route::RouteAttribute; use netlink_packet_route::route::RouteHeader; use netlink_packet_route::route::RouteMessage; -use netlink_packet_route::route::RouteProtocol; use netlink_packet_route::route::RouteType; use netlink_packet_route::AddressFamily as NetlinkAddressFamily; use rtnetlink::IpVersion; diff --git a/crates/server/src/bgp_server.rs b/crates/server/src/bgp_server.rs index a7403b2..585b06a 100644 --- a/crates/server/src/bgp_server.rs +++ b/crates/server/src/bgp_server.rs @@ -185,33 +185,6 @@ async fn start_http_server( Ok(warp::http::Response::builder().body(result).into_response()) } - /* - async fn modify_community_fn( - add: bool, - peers: HashMap>, - name: String, - ld1: u32, - ld2: u32, - ) -> Result { - if let Some(chan) = peers.get(&name) { - if let Err(e) = func(chan.clone(), ld1, ld2).await { - warn!("Failed to add large community: {:?}", e); - return Err(warp::reject()); - } - } else { - return Err(warp::reject()); - } - Ok(warp::reply::with_status("Ok", warp::http::StatusCode::OK)) - } - - let add_community_filter = warp::post() - .map(move || true) - .and(warp::path::param()) - .and(warp::path!(u32 / u32)) - .and_then(modify_community_fn); - - */ - // Start the web server that has access to the rib managers so that it can expose the state. let v4_mgr_filter = warp::any().map(move || manager4.clone()); diff --git a/crates/server/src/peer.rs b/crates/server/src/peer.rs index 138d8fa..770f964 100644 --- a/crates/server/src/peer.rs +++ b/crates/server/src/peer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::config::PrefixAnnouncement; use crate::config::{PeerConfig, ServerConfig}; use crate::data_structures::RouteWithdraw; use crate::data_structures::{RouteInfo, RouteUpdate}; @@ -45,6 +44,7 @@ use bgp_packet::path_attributes::{ }; use bgp_packet::traits::ParserContext; use bytes::BytesMut; +use chrono::TimeZone; use chrono::{DateTime, Utc}; use eyre::{bail, eyre}; use ip_network_table_deps_treebitmap::address::Address; @@ -53,8 +53,8 @@ use std::convert::TryFrom; use std::convert::TryInto; use std::net::IpAddr; use std::net::SocketAddr; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; -use std::sync::RwLock; use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -165,8 +165,8 @@ async fn run_timer( async fn check_hold_timer( cancel_token: CancellationToken, iface: PeerInterface, - last_msg_time: Arc>>, - hold_time: std::time::Duration, + last_msg_time: Arc, + hold_time: chrono::Duration, ) { loop { tokio::select! { @@ -175,9 +175,17 @@ async fn check_hold_timer( return; } _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { - let last = last_msg_time.read().unwrap(); - let elapsed_time = Utc::now() - *last; - if elapsed_time.num_seconds() as u64 > hold_time.as_secs() { + let now = Utc::now(); + let last_msg = match Utc.timestamp_millis_opt(last_msg_time.load(Ordering::Relaxed)) { + chrono::offset::LocalResult::Single(t) => t, + chrono::offset::LocalResult::Ambiguous(earliest, _) => earliest, + chrono::offset::LocalResult::None => { + warn!("Failed to parse last msg time"); + continue; + }, + }; + + if (now - last_msg) > hold_time { match iface.send(PeerCommands::TimerEvent(PeerTimerEvent::HoldTimerExpire())) { Ok(()) => {}, Err(e) => { @@ -187,6 +195,7 @@ async fn check_hold_timer( // Exit the hold timer task since it's expired already and is not needed anymore. return; } + } } @@ -336,7 +345,7 @@ pub struct PeerStateMachine { // Keep track of the time of the last message to efficiently implement // the hold timer. - last_msg_time: Arc>>, + last_msg_time: Arc, // Timers and cancellation token to spawned tasks connect_timer: Option<(JoinHandle<()>, CancellationToken)>, @@ -380,7 +389,7 @@ where iface_tx, route_manager, established_time: None, - last_msg_time: Arc::new(RwLock::new(DateTime::from_timestamp(0, 0).unwrap())), + last_msg_time: Arc::new(AtomicI64::default()), connect_timer: None, hold_timer: None, keepalive_timer: None, @@ -519,13 +528,9 @@ where } PeerCommands::MessageFromPeer(msg) => match self.handle_msg(msg).await { - Ok(_) => { - let mut last_time = self - .last_msg_time - .write() - .map_err(|e| eyre!(e.to_string()))?; - *last_time = Utc::now(); - } + Ok(_) => self + .last_msg_time + .store(Utc::now().timestamp_millis(), Ordering::Relaxed), Err(e) => { bail!(e); } @@ -580,7 +585,7 @@ where }, state: format!("{:?}", self.state), session_established_time: self.established_time.map(|t| t.timestamp() as u64), - last_messaage_time: Some(self.last_msg_time.read().unwrap().timestamp() as u64), + last_messaage_time: Some(self.last_msg_time.load(Ordering::Relaxed) as u64), route_updates_in: Some(0), /* todo */ route_updates_out: Some(0), /* todo */ }; @@ -1066,9 +1071,10 @@ where if hold_time > 0 { // Set keepalive timer. let keepalive_duration = hold_time / 3; - info!( + trace!( "Using keepalive duration of {} for peer {}", - keepalive_duration, self.config.name + keepalive_duration, + self.config.name ); { let token = CancellationToken::new(); @@ -1098,7 +1104,7 @@ where token_copy, chan, last_msg_time, - std::time::Duration::from_secs(hold_time.into()), + chrono::Duration::seconds(hold_time.into()), ) .await }); @@ -1107,11 +1113,7 @@ where } }; - // TODO: Should not have to clone here? - let announcements: Vec = self.config.announcements.clone(); - for announcement in announcements { - self.announce_static(&announcement).await?; - } + self.announce_static().await?; Ok(()) } @@ -1135,103 +1137,102 @@ where Ok(()) } - async fn announce_static(&mut self, announcement: &PrefixAnnouncement) -> eyre::Result<()> { - let mut bgp_update_msg = UpdateMessage { - withdrawn_nlri: vec![], - announced_nlri: vec![], - path_attributes: vec![], - }; + /// Generates and sends announcements for all statically defined routes. + async fn announce_static(&mut self) -> eyre::Result<()> { + for announcement in &self.config.announcements { + let mut bgp_update_msg = UpdateMessage { + withdrawn_nlri: vec![], + announced_nlri: vec![], + path_attributes: vec![], + }; - // Origin, TODO: configure this based on i/eBGP - bgp_update_msg - .path_attributes - .push(PathAttribute::OriginPathAttribute(OriginPathAttribute::EGP)); - - bgp_update_msg - .path_attributes - .push(ASPathAttribute::from_asns(vec![self.server_config.asn])); - - match self.config.afi { - AddressFamilyIdentifier::Ipv4 => { - match announcement.nexthop { - IpAddr::V4(nh) => { - bgp_update_msg - .path_attributes - .push(PathAttribute::NextHopPathAttribute(NextHopPathAttribute( - nh, - ))) - } - _ => bail!("Found non IPv4 nexthop in announcement"), - } - - let nlri = NLRI::try_from(announcement.prefix.as_str()) - .map_err(|e| eyre!(e.to_string()))?; - bgp_update_msg.announced_nlri.push(nlri); - } - AddressFamilyIdentifier::Ipv6 => { - let nexthop_octets = match announcement.nexthop { - IpAddr::V6(nh) => nh.octets().to_vec(), - _ => { - bail!("Found non IPv6 nexthop in announcement"); - } - }; - let nlri = NLRI::try_from(announcement.prefix.as_str()) - .map_err(|e| eyre!(e.to_string()))?; - let mp_reach = MPReachNLRIPathAttribute { - afi: AddressFamilyIdentifier::Ipv6, - safi: SubsequentAddressFamilyIdentifier::Unicast, - nexthop: nexthop_octets, - nlris: vec![nlri], - }; - bgp_update_msg - .path_attributes - .push(PathAttribute::MPReachNLRIPathAttribute(mp_reach)); - } - } - - if let Some(large_communities) = &announcement.large_communities { - let mut large_communities_attr = LargeCommunitiesPathAttribute { values: vec![] }; - for large_community in large_communities { - let parts: Vec = large_community - .split(':') - .flat_map(|x| x.parse::()) - .collect(); - if parts.len() != 3 { - warn!("Failed to parse large community: {}", large_community); - } - let payload = LargeCommunitiesPayload { - global_admin: parts[0], - ld1: parts[1], - ld2: parts[2], - }; - large_communities_attr.values.push(payload); - } + // Origin, TODO: configure this based on i/eBGP bgp_update_msg .path_attributes - .push(PathAttribute::LargeCommunitiesPathAttribute( - large_communities_attr, - )); - } + .push(PathAttribute::OriginPathAttribute(OriginPathAttribute::EGP)); - let bgp_message = BGPMessage { - msg_type: UPDATE_MESSAGE, - payload: BGPSubmessage::UpdateMessage(bgp_update_msg), - }; + bgp_update_msg + .path_attributes + .push(ASPathAttribute::from_asns(vec![self.server_config.asn])); - info!("Sending static announcement to peer: {:?}", bgp_message); + match self.config.afi { + AddressFamilyIdentifier::Ipv4 => { + match announcement.nexthop { + IpAddr::V4(nh) => bgp_update_msg.path_attributes.push( + PathAttribute::NextHopPathAttribute(NextHopPathAttribute(nh)), + ), + _ => bail!("Found non IPv4 nexthop in announcement"), + } - let mut buf = BytesMut::new(); - self.codec - .lock() - .await - .encode(bgp_message, &mut buf) - .map_err(|e| eyre!("failed to encode BGP message: {}", e))?; + let nlri = NLRI::try_from(announcement.prefix.as_str()) + .map_err(|e| eyre!(e.to_string()))?; + bgp_update_msg.announced_nlri.push(nlri); + } + AddressFamilyIdentifier::Ipv6 => { + let nexthop_octets = match announcement.nexthop { + IpAddr::V6(nh) => nh.octets().to_vec(), + _ => { + bail!("Found non IPv6 nexthop in announcement"); + } + }; + let nlri = NLRI::try_from(announcement.prefix.as_str()) + .map_err(|e| eyre!(e.to_string()))?; + let mp_reach = MPReachNLRIPathAttribute { + afi: AddressFamilyIdentifier::Ipv6, + safi: SubsequentAddressFamilyIdentifier::Unicast, + nexthop: nexthop_octets, + nlris: vec![nlri], + }; + bgp_update_msg + .path_attributes + .push(PathAttribute::MPReachNLRIPathAttribute(mp_reach)); + } + } - if let Some(stream) = self.tcp_stream.as_mut() { - stream - .write(&buf) + if let Some(large_communities) = &announcement.large_communities { + let mut large_communities_attr = LargeCommunitiesPathAttribute { values: vec![] }; + for large_community in large_communities { + let parts: Vec = large_community + .split(':') + .flat_map(|x| x.parse::()) + .collect(); + if parts.len() != 3 { + warn!("Failed to parse large community: {}", large_community); + } + let payload = LargeCommunitiesPayload { + global_admin: parts[0], + ld1: parts[1], + ld2: parts[2], + }; + large_communities_attr.values.push(payload); + } + bgp_update_msg + .path_attributes + .push(PathAttribute::LargeCommunitiesPathAttribute( + large_communities_attr, + )); + } + + let bgp_message = BGPMessage { + msg_type: UPDATE_MESSAGE, + payload: BGPSubmessage::UpdateMessage(bgp_update_msg), + }; + + info!("Sending static announcement to peer: {:?}", bgp_message); + + let mut buf = BytesMut::new(); + self.codec + .lock() .await - .map_err(|e| eyre!("Failed to write msg to peer: {}", e))?; + .encode(bgp_message, &mut buf) + .map_err(|e| eyre!("failed to encode BGP message: {}", e))?; + + if let Some(stream) = self.tcp_stream.as_mut() { + stream + .write(&buf) + .await + .map_err(|e| eyre!("Failed to write msg to peer: {}", e))?; + } } Ok(()) } diff --git a/crates/server/src/rib_manager.rs b/crates/server/src/rib_manager.rs index f2112f9..0328055 100644 --- a/crates/server/src/rib_manager.rs +++ b/crates/server/src/rib_manager.rs @@ -213,9 +213,8 @@ where for pathset in self.rib.iter() { snapshot.routes.push(pathset.2.lock().unwrap().clone()); } - // TODO: handle an error here. if let Err(e) = sender.send(snapshot) { - warn!("Failed to send snapshot of RIB: {:?}", e); + trace!("Failed to send snapshot of RIB: {:?}", e); } info!("Done RIB dump"); } diff --git a/crates/server/src/route_server.rs b/crates/server/src/route_server.rs index 5f50442..0ac15fa 100644 --- a/crates/server/src/route_server.rs +++ b/crates/server/src/route_server.rs @@ -110,21 +110,31 @@ impl RouteServer { impl BgpServerAdminService for RouteServer { async fn peer_status( &self, - request: tonic::Request, + _request: tonic::Request, ) -> Result, Status> { let mut result = PeerStatusResponse::default(); + // Store the pending futures in a JoinSet to parallelize fetching. + let mut join_set = tokio::task::JoinSet::new(); + for peer in &self.peer_state_machines { let (tx, rx) = oneshot::channel(); - if let Err(e) = peer.1.send(PeerCommands::GetStatus(tx)) { + if let Err(_e) = peer.1.send(PeerCommands::GetStatus(tx)) { warn!( peer = peer.0, - "Peer channel dead when trying to send state request" + "Failed to send GetStatus request to PeerStateMachine" ); continue; } - let resp = rx.await.map_err(|e| Status::internal(format!("{}", e)))?; - result.peer_status.push(resp); + join_set.spawn(rx); + } + + while let Some(next) = join_set.join_next().await { + match next { + Ok(Ok(peer_status)) => result.peer_status.push(peer_status), + Ok(Err(e)) => return Err(Status::internal(format!("{}", e))), + Err(e) => return Err(Status::internal(format!("{}", e))), + } } Ok(Response::new(result))