From 0cd3a120d056eb29b91c202e409494e2ddef892b Mon Sep 17 00:00:00 2001 From: Rayhaan Jaufeerally Date: Thu, 25 Jul 2024 09:57:05 +0000 Subject: [PATCH] Cleanups and filtering. --- Cargo.toml | 2 +- bin/src/bgp_server/main.rs | 2 +- crates/bgp_packet/src/messages.rs | 4 +- crates/bgp_packet/src/path_attributes.rs | 41 +++-- crates/route_client/proto/route_service.proto | 14 +- crates/route_client/src/lib.rs | 8 +- crates/server/Cargo.toml | 3 +- crates/server/proto/route_service.proto | 13 +- crates/server/src/config.rs | 2 +- crates/server/src/data_structures.rs | 21 +-- crates/server/src/peer.rs | 157 +++++++++-------- crates/server/src/rib_manager.rs | 161 +++++++++++------- crates/server/src/route_server.rs | 28 ++- 13 files changed, 262 insertions(+), 194 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 663fc08..e79074a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ netlink-packet-utils = "0.5.2" nom = "7.1" prost = "0.8" rtnetlink = "0.14.1" -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0.64" stderrlog = "0.5.1" tokio = { version = "1.13.0", features = ["full"] } diff --git a/bin/src/bgp_server/main.rs b/bin/src/bgp_server/main.rs index 754509b..f21b3c4 100644 --- a/bin/src/bgp_server/main.rs +++ b/bin/src/bgp_server/main.rs @@ -30,7 +30,7 @@ use tracing::info; #[tokio::main] async fn main() -> Result<(), Box> { - let subscriber = tracing_subscriber::fmt().with_env_filter("bgpd=info"); + let subscriber = tracing_subscriber::fmt(); match subscriber.try_init() { Ok(()) => {} diff --git a/crates/bgp_packet/src/messages.rs b/crates/bgp_packet/src/messages.rs index 72959fa..d9017ef 100644 --- a/crates/bgp_packet/src/messages.rs +++ b/crates/bgp_packet/src/messages.rs @@ -532,9 +532,11 @@ impl Display for UpdateMessage { for withdrawn_nlri in &self.withdrawn_nlri { fmt::Display::fmt(withdrawn_nlri, f)?; } + write!(f, " announced: ")?; for announced_nlri in &self.announced_nlri { fmt::Display::fmt(announced_nlri, f)?; } + write!(f, " path attributes: ")?; for path_attr in &self.path_attributes { fmt::Display::fmt(path_attr, f)?; } @@ -611,7 +613,7 @@ mod tests { let (buf, result) = BGPMessage::from_wire(ctx, update_msg_bytes).unwrap(); assert_eq!(buf.len(), 0); - let want_str = "UpdateMessage [ withdrawn: 203.1.78.0/24Origin: UnknownAS Path: { Segment [ Type: AS_SEGMENT 39540 57118 29691 1299 4739 ]] }NextHop: 185.95.219.36Communities: [ 1299:35000, 29691:4000, 29691:4021, 39540:4000, 39540:4010, 57118:2000, 57118:2010, ] LargeCommunities: [ 57118:20:0, 57118:20:10, ] ]"; + let want_str = "UpdateMessage [ withdrawn: announced: 203.1.78.0/24 path attributes: OriginPathAttribute::INCOMPLETEAS Path: { Segment [ Type: AS_SEGMENT 39540 57118 29691 1299 4739 ]] }NextHop: 185.95.219.36Communities: [ 1299:35000, 29691:4000, 29691:4021, 39540:4000, 39540:4010, 57118:2000, 57118:2010, ] LargeCommunities: [ 57118:20:0, 57118:20:10, ] ]"; assert_eq!(format!("{}", result), want_str); let reencoded = result.to_wire(&ctx).unwrap(); diff --git a/crates/bgp_packet/src/path_attributes.rs b/crates/bgp_packet/src/path_attributes.rs index 6cc7d1c..bdace09 100644 --- a/crates/bgp_packet/src/path_attributes.rs +++ b/crates/bgp_packet/src/path_attributes.rs @@ -21,6 +21,7 @@ use crate::traits::ReadablePacket; use crate::traits::WritablePacket; use byteorder::ByteOrder; use byteorder::NetworkEndian; +use eyre::bail; use nom::number::complete::{be_u16, be_u32, be_u8}; use nom::Err::Failure; use nom::IResult; @@ -264,15 +265,25 @@ impl fmt::Display for PathAttribute { // Path attribute implementations. /// Origin path attribute is a mandatory attribute defined in RFC4271. -#[derive(Debug, PartialEq, Eq, Clone, Serialize)] -pub struct OriginPathAttribute(pub u8); +#[derive(Debug, PartialEq, Eq, Copy, Clone, Serialize)] +#[repr(u8)] +pub enum OriginPathAttribute { + IGP = 0, + EGP = 1, + INCOMPLETE = 2, +} -pub mod origin_path_attribute_values { - use super::OriginPathAttribute; +impl TryFrom for OriginPathAttribute { + type Error = eyre::ErrReport; - pub const IGP: OriginPathAttribute = OriginPathAttribute(0); - pub const EGP: OriginPathAttribute = OriginPathAttribute(1); - pub const UNKNOWN: OriginPathAttribute = OriginPathAttribute(2); + fn try_from(value: u8) -> eyre::Result { + match value { + 0 => Ok(Self::IGP), + 1 => Ok(Self::EGP), + 2 => Ok(Self::INCOMPLETE), + other => bail!("Unexpected origin code {}", other), + } + } } impl ReadablePacket for OriginPathAttribute { @@ -281,13 +292,15 @@ impl ReadablePacket for OriginPathAttribute { buf: &'a [u8], ) -> IResult<&'a [u8], Self, BGPParserError<&'a [u8]>> { let (buf, opa) = be_u8(buf)?; - Ok((buf, OriginPathAttribute(opa))) + let origin_attr = OriginPathAttribute::try_from(opa) + .map_err(|e| Failure(BGPParserError::CustomText(e.to_string())))?; + Ok((buf, origin_attr)) } } impl WritablePacket for OriginPathAttribute { fn to_wire(&self, _: &ParserContext) -> Result, &'static str> { - Ok(vec![self.0]) + Ok(vec![(*self) as u8]) } fn wire_len(&self, _: &ParserContext) -> Result { Ok(1) @@ -296,12 +309,10 @@ impl WritablePacket for OriginPathAttribute { impl fmt::Display for OriginPathAttribute { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use origin_path_attribute_values::*; match self { - &IGP => write!(f, "Origin: IGP"), - &EGP => write!(f, "Origin: EGP"), - &UNKNOWN => write!(f, "Origin: Unknown"), - _ => write!(f, "Origin: invalid value"), + OriginPathAttribute::IGP => write!(f, "OriginPathAttribute::IGP"), + OriginPathAttribute::EGP => write!(f, "OriginPathAttribute::EGP"), + OriginPathAttribute::INCOMPLETE => write!(f, "OriginPathAttribute::INCOMPLETE"), } } } @@ -1206,7 +1217,7 @@ mod tests { nom::multi::many0(|buf: &'a [u8]| PathAttribute::from_wire(ctx, buf))(path_attr_bytes) .unwrap(); assert_eq!(buf.len(), 0); - let expected_str = "[OriginPathAttribute(OriginPathAttribute(0)), \ + let expected_str = "[OriginPathAttribute(IGP), \ ASPathAttribute(ASPathAttribute { segments: \ [ASPathSegment { ordered: true, path: [39540, 25091, 2914, 6453, 8346, 13335] }] }), \ NextHopPathAttribute(NextHopPathAttribute(185.95.219.36)), \ diff --git a/crates/route_client/proto/route_service.proto b/crates/route_client/proto/route_service.proto index 43b8e4c..e6f113a 100644 --- a/crates/route_client/proto/route_service.proto +++ b/crates/route_client/proto/route_service.proto @@ -32,7 +32,7 @@ message Prefix { // prefix. message Path { bytes nexthop = 1; - string peer_name = 2; + bytes peer_id = 2; uint32 local_pref = 3; uint32 med = 4; repeated uint32 as_path = 5; @@ -67,12 +67,12 @@ message PeerStatusRequest {} message PeerStatus { string peer_name = 1; - string state = 2; - // The following fields are only populated when the session is established. - optional uint64 session_established_time = 3; - optional uint64 last_messaage_time = 4; - optional uint64 route_updates_in = 5; - optional uint64 route_updates_out = 6; + bytes peer_id = 2; + string state = 3; + optional uint64 session_established_time = 4; + optional uint64 last_messaage_time = 5; + optional uint64 route_updates_in = 6; + optional uint64 route_updates_out = 7; } message PeerStatusResponse { repeated PeerStatus peer_status = 1; } diff --git a/crates/route_client/src/lib.rs b/crates/route_client/src/lib.rs index 249f68e..59b3d6e 100644 --- a/crates/route_client/src/lib.rs +++ b/crates/route_client/src/lib.rs @@ -120,9 +120,9 @@ pub async fn run_connector_v4( let nexthop_bytes: [u8; 4] = path.nexthop.clone().try_into().unwrap(); let nexthop: Ipv4Addr = nexthop_bytes.into(); trace!( - "nexthop: {}, peer: {}, local_pref: {}, med: {}, as_path: {:?}", + "nexthop: {}, peer_id: {:x?}, local_pref: {}, med: {}, as_path: {:?}", nexthop, - path.peer_name, + path.peer_id, path.local_pref, path.med, path.as_path @@ -190,9 +190,9 @@ pub async fn run_connector_v6( let nexthop_bytes: [u8; 16] = path.nexthop.clone().try_into().unwrap(); let nexthop: Ipv6Addr = nexthop_bytes.into(); trace!( - "nexthop: {}, peer: {}, local_pref: {}, med: {}, as_path: {:?}", + "nexthop: {}, peer_id: {:x?}, local_pref: {}, med: {}, as_path: {:?}", nexthop, - path.peer_name, + path.peer_id, path.local_pref, path.med, path.as_path diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index e6c4004..fc1b838 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -15,13 +15,14 @@ workspace = true bgp_packet.workspace = true byteorder = "1.4.3" bytes.workspace = true -chrono = "0.4.38" +chrono = { version = "0.4.38", features = ["serde"] } eyre.workspace = true ip_network_table-deps-treebitmap.workspace = true log.workspace = true nom = "7.1" prost.workspace = true serde.workspace = true +serde_json.workspace = true tokio-stream = "0.1.14" tokio-util = { version = "0.7.10", features = ["codec"] } tokio.workspace = true diff --git a/crates/server/proto/route_service.proto b/crates/server/proto/route_service.proto index e972d8e..e6f113a 100644 --- a/crates/server/proto/route_service.proto +++ b/crates/server/proto/route_service.proto @@ -32,7 +32,7 @@ message Prefix { // prefix. message Path { bytes nexthop = 1; - string peer_name = 2; + bytes peer_id = 2; uint32 local_pref = 3; uint32 med = 4; repeated uint32 as_path = 5; @@ -67,11 +67,12 @@ message PeerStatusRequest {} message PeerStatus { string peer_name = 1; - string state = 2; - optional uint64 session_established_time = 3; - optional uint64 last_messaage_time = 4; - optional uint64 route_updates_in = 5; - optional uint64 route_updates_out = 6; + bytes peer_id = 2; + string state = 3; + optional uint64 session_established_time = 4; + optional uint64 last_messaage_time = 5; + optional uint64 route_updates_in = 6; + optional uint64 route_updates_out = 7; } message PeerStatusResponse { repeated PeerStatus peer_status = 1; } diff --git a/crates/server/src/config.rs b/crates/server/src/config.rs index 7df2ed5..50b97e0 100644 --- a/crates/server/src/config.rs +++ b/crates/server/src/config.rs @@ -15,7 +15,7 @@ use bgp_packet::{ constants::{AddressFamilyIdentifier, SubsequentAddressFamilyIdentifier}, nlri::NLRI, - path_attributes::{LargeCommunitiesPathAttribute, LargeCommunitiesPayload}, + path_attributes::LargeCommunitiesPayload, }; use serde::{Deserialize, Serialize}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; diff --git a/crates/server/src/data_structures.rs b/crates/server/src/data_structures.rs index deb341f..2c449e2 100644 --- a/crates/server/src/data_structures.rs +++ b/crates/server/src/data_structures.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::{net::Ipv4Addr, sync::Arc}; + use bgp_packet::nlri::NLRI; use bgp_packet::path_attributes::PathAttribute; use chrono::{DateTime, Utc}; +use crate::rib_manager::PathData; + /// RouteInfo encapsulates information received about a particular BGP route. #[derive(Clone, Debug)] pub struct RouteInfo { @@ -43,25 +47,12 @@ pub struct RouteInfo { /// RouteUpdate is a type which encapsulates a newly learned, modified, or removed set of prefixes. #[derive(Debug)] pub enum RouteUpdate { - Announce(RouteAnnounce), + Announce((Vec, Arc)), Withdraw(RouteWithdraw), } -#[derive(Debug)] -pub struct RouteAnnounce { - pub peer: String, - pub prefixes: Vec, - - pub local_pref: u32, - pub med: u32, - pub as_path: Vec, - pub nexthop: Vec, - - pub path_attributes: Vec, -} - #[derive(Debug)] pub struct RouteWithdraw { - pub peer: String, + pub peer_id: Ipv4Addr, pub prefixes: Vec, } diff --git a/crates/server/src/peer.rs b/crates/server/src/peer.rs index 6ed11e6..97e43c7 100644 --- a/crates/server/src/peer.rs +++ b/crates/server/src/peer.rs @@ -14,11 +14,10 @@ use crate::config::PrefixAnnouncement; use crate::config::{PeerConfig, ServerConfig}; -use crate::data_structures::RouteAnnounce; use crate::data_structures::RouteWithdraw; use crate::data_structures::{RouteInfo, RouteUpdate}; use crate::filter_eval::FilterEvaluator; -use crate::rib_manager::RouteManagerCommands; +use crate::rib_manager::{PathData, PathSource, RouteManagerCommands}; use crate::route_server::route_server::PeerStatus; use bgp_packet::capabilities::{ BGPCapability, BGPCapabilityTypeValues, BGPCapabilityValue, BGPOpenOptionTypeValues, @@ -576,6 +575,10 @@ where PeerCommands::GetStatus(sender) => { let state = PeerStatus { peer_name: self.config.name.clone(), + peer_id: match &self.peer_open_msg { + Some(peer_open_msg) => peer_open_msg.identifier.octets().to_vec(), + None => vec![], + }, state: format!("{:?}", self.state), session_established_time: self.established_time.map(|t| t.timestamp() as u64), last_messaage_time: Some(self.last_msg_time.read().unwrap().timestamp() as u64), @@ -619,7 +622,7 @@ where // It deallocates the resources in this peer, unsets the TCP connection, removes the // routes from the inner structure as well as the routes that were propagated into the // RIB. - async fn connection_closed(&mut self) -> Result<(), std::io::Error> { + async fn connection_closed(&mut self) -> eyre::Result<()> { info!("Connection closed on peer {}", self.config.name); // Cancel keepalive timer. @@ -647,10 +650,15 @@ where } } + let peer_id = match &self.peer_open_msg { + Some(peer_open_msg) => peer_open_msg.identifier, + None => bail!("Missing peer open msg"), + }; + // Iterate over every route that we've announced to the route manager // and withdraw it. let mut route_withdraw = RouteWithdraw { - peer: self.config.name.clone(), + peer_id, prefixes: vec![], }; @@ -694,13 +702,18 @@ where /// process_withdrawals creates a RouteUpdate from withdrawal announcments and sends /// them to the rib_in channel to be consumed by the route processor. - fn process_withdrawals(&mut self, withdrawals: Vec) -> Result<(), String> { + fn process_withdrawals(&mut self, withdrawals: Vec) -> eyre::Result<()> { + let peer_id = match &self.peer_open_msg { + Some(peer_open_msg) => peer_open_msg.identifier, + None => bail!("Missing peer open msg"), + }; + let mut route_withdraw = RouteWithdraw { - peer: self.config.name.clone(), + peer_id, prefixes: vec![], }; for nlri in withdrawals { - let addr: A = nlri.clone().try_into().map_err(|e| e.to_string())?; + let addr: A = nlri.clone().try_into().map_err(|e| eyre!(e.to_string()))?; // remove from prefixes if present. self.prefixes_in.remove(addr, nlri.prefixlen.into()); @@ -713,7 +726,7 @@ where .send(RouteManagerCommands::Update(RouteUpdate::Withdraw( route_withdraw, ))) - .map_err(|e| e.to_string())?; + .map_err(|e| eyre!(e.to_string()))?; } Ok(()) @@ -726,7 +739,7 @@ where nexthop: Vec, announcements: Vec, path_attributes: Vec, - ) -> Result<(), String> { + ) -> eyre::Result<()> { // Extract the as_path and med from the attributes. let mut as_path: Vec = vec![]; let mut med: u32 = 0; @@ -746,22 +759,33 @@ where } } - let mut route_update = RouteAnnounce { - local_pref: self.config.local_pref, - med, - nexthop, - as_path, - path_attributes, - peer: self.config.name.clone(), - prefixes: vec![], + let peer_id = match &self.peer_open_msg { + Some(peer_open_msg) => peer_open_msg.identifier, + None => bail!("missing peer open msg"), }; + let mut path_data = PathData { + origin: OriginPathAttribute::EGP, + nexthop, + path_source: PathSource::BGPPeer(peer_id), + local_pref: self.config.local_pref, + med, + as_path, + path_attributes, + learn_time: Utc::now(), + }; + + let mut accepted_nlris = vec![]; + for announcement in announcements { - let addr: A = announcement.clone().try_into().map_err(|e| e.to_string())?; + let addr: A = announcement + .clone() + .try_into() + .map_err(|e| eyre!(e.to_string()))?; // Should we accept this prefix? let accepted = self.filter_evaluator.evaluate_in( - &mut route_update.path_attributes, - &route_update.as_path, + &mut path_data.path_attributes, + &path_data.as_path, &announcement, ); let rejection_reason: Option = match accepted { @@ -779,22 +803,23 @@ where Some(route_info) => { // Update the route_info, we need to clone it then reassign. let mut new_route_info: RouteInfo = route_info.clone(); - new_route_info.path_attributes = route_update.path_attributes.clone(); + new_route_info.path_attributes = path_data.path_attributes.clone(); new_route_info.updated = Utc::now(); self.prefixes_in .insert(addr, announcement.prefixlen.into(), new_route_info); } None => { // Insert new RouteInfo + // TODO: Maybe RouteInfo should be replaced with PathData after adding an accepted/rejected to it. let route_info = RouteInfo:: { prefix: addr, prefixlen: announcement.prefixlen, nlri: announcement.clone(), accepted, rejection_reason, - learned: Utc::now(), - updated: Utc::now(), - path_attributes: route_update.path_attributes.clone(), + learned: path_data.learn_time, + updated: path_data.learn_time, + path_attributes: path_data.path_attributes.clone(), }; self.prefixes_in .insert(addr, announcement.prefixlen.into(), route_info); @@ -802,16 +827,17 @@ where } if accepted { - route_update.prefixes.push(announcement); + accepted_nlris.push(announcement); } } - if !route_update.prefixes.is_empty() { + if !accepted_nlris.is_empty() { self.route_manager - .send(RouteManagerCommands::Update(RouteUpdate::Announce( - route_update, - ))) - .map_err(|e| e.to_string())?; + .send(RouteManagerCommands::Update(RouteUpdate::Announce(( + accepted_nlris, + Arc::new(path_data), + )))) + .map_err(|e| eyre!(e.to_string()))?; } Ok(()) @@ -861,7 +887,7 @@ where } /// handle_msg processes incoming messages and updates the state in PeerStateMachine. - async fn handle_msg(&mut self, msg: BGPSubmessage) -> Result<(), String> { + async fn handle_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> { match &self.state { BGPState::Idle => self.handle_idle_msg().await, BGPState::Active => self.handle_active_msg(msg).await, @@ -872,32 +898,26 @@ where } } - async fn handle_idle_msg(&mut self) -> Result<(), String> { - Err("Peer cannot process messages when in the Idle state".to_string()) + async fn handle_idle_msg(&mut self) -> eyre::Result<()> { + bail!("Peer cannot process messages when in the Idle state") } - async fn handle_active_msg(&mut self, msg: BGPSubmessage) -> Result<(), String> { + async fn handle_active_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> { // In the active state a new connection should come in via the NewConnection // message on the PSM channel, or if we establish a connection out, then that // logic should handle the messages until OpenSent. - return Err(format!( - "Discarding message received in ACTIVE state: {:?}", - msg - )); + bail!("Discarding message received in ACTIVE state: {:?}", msg) } - async fn handle_connect_msg(&mut self, msg: BGPSubmessage) -> Result<(), String> { + async fn handle_connect_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> { // In the connect state a new connection should come in via the NewConnection // message on the PSM channel, or if we establish a connection out, then that // logic should handle the messages until OpenSent. - return Err(format!( - "Discarding message received in CONNECT state: {:?}", - msg - )); + bail!("Discarding message received in CONNECT state: {:?}", msg) } // In the opensent state we still need to get the OPEN message from the peer - async fn handle_opensent_msg(&mut self, msg: BGPSubmessage) -> Result<(), String> { + async fn handle_opensent_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> { info!("Handling message in OpenSent state: {:?}", msg); match msg { BGPSubmessage::OpenMessage(o) => { @@ -910,7 +930,7 @@ where self.state = BGPState::Active; self.established_time = None; if let Some(stream) = self.tcp_stream.as_mut() { - stream.shutdown().await.map_err(|e| e.to_string())?; + stream.shutdown().await.map_err(|e| eyre!(e.to_string()))?; } } @@ -932,7 +952,7 @@ where error_code: u8, error_subcode: u8, iface_tx: &mut mpsc::UnboundedSender, - ) -> Result<(), String> { + ) -> eyre::Result<()> { let notification = NotificationMessage { error_code, error_subcode, @@ -940,10 +960,10 @@ where }; iface_tx .send(PeerCommands::SendNotification(notification)) - .map_err(|e| e.to_string())?; + .map_err(|e| eyre!(e.to_string()))?; iface_tx .send(PeerCommands::ConnectionClosed()) - .map_err(|e| e.to_string())?; + .map_err(|e| eyre!(e.to_string()))?; Ok(()) } @@ -1016,27 +1036,26 @@ where self.peer_open_msg = Some(o); // Send the Keepalive message and transition to OpenConfirm. - self.send_keepalive().await.map_err(|e| e.to_string())?; + self.send_keepalive() + .await + .map_err(|e| eyre!(e.to_string()))?; self.state = BGPState::OpenConfirm; Ok(()) } - _ => Err("Got non-open message in state opensent".to_string()), + _ => bail!("Got non-open message in state opensent"), } } // In the openconfirm state we are waiting for a KEEPALIVE from the peer. - async fn handle_openconfirm_msg(&mut self, msg: BGPSubmessage) -> Result<(), String> { + async fn handle_openconfirm_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> { // In the openconfirm state we wait for a keepalive message from the peer. // We also compute the timer expiry time for the keepalive timer. // Hold time of 0 means no keepalive and hold timer. let hold_time = match &self.peer_open_msg { Some(o) => o.hold_time, None => { - return Err( - "Logic error: reached handle_openconfirm without a open message set" - .to_string(), - ); + bail!("Logic error: reached handle_openconfirm without a open message set"); } }; match msg { @@ -1097,14 +1116,14 @@ where Ok(()) } - _ => Err(format!( + _ => bail!( "Got unsupported message type in handle_openconfirm_msg: {:?}", msg - )), + ), } } - async fn hold_timer_expired(&mut self) -> Result<(), std::io::Error> { + async fn hold_timer_expired(&mut self) -> eyre::Result<()> { let notification = NotificationMessage { error_code: 4, error_subcode: 0, @@ -1117,7 +1136,7 @@ where Ok(()) } - async fn announce_static(&mut self, announcement: &PrefixAnnouncement) -> Result<(), String> { + async fn announce_static(&mut self, announcement: &PrefixAnnouncement) -> eyre::Result<()> { let mut bgp_update_msg = UpdateMessage { withdrawn_nlri: vec![], announced_nlri: vec![], @@ -1127,7 +1146,7 @@ where // Origin, TODO: configure this based on i/eBGP bgp_update_msg .path_attributes - .push(PathAttribute::OriginPathAttribute(OriginPathAttribute(1))); + .push(PathAttribute::OriginPathAttribute(OriginPathAttribute::EGP)); bgp_update_msg .path_attributes @@ -1143,20 +1162,22 @@ where nh, ))) } - _ => return Err("Found non IPv4 nexthop in announcement".to_string()), + _ => bail!("Found non IPv4 nexthop in announcement"), } - let nlri = NLRI::try_from(announcement.prefix.as_str())?; + let nlri = NLRI::try_from(announcement.prefix.as_str()) + .map_err(|e| eyre!(e.to_string()))?; bgp_update_msg.announced_nlri.push(nlri); } AddressFamilyIdentifier::Ipv6 => { let nexthop_octets = match announcement.nexthop { IpAddr::V6(nh) => nh.octets().to_vec(), _ => { - return Err("Found non IPv6 nexthop in announcement".to_string()); + bail!("Found non IPv6 nexthop in announcement"); } }; - let nlri = NLRI::try_from(announcement.prefix.as_str())?; + let nlri = NLRI::try_from(announcement.prefix.as_str()) + .map_err(|e| eyre!(e.to_string()))?; let mp_reach = MPReachNLRIPathAttribute { afi: AddressFamilyIdentifier::Ipv6, safi: SubsequentAddressFamilyIdentifier::Unicast, @@ -1205,19 +1226,19 @@ where .lock() .await .encode(bgp_message, &mut buf) - .map_err(|e| format!("failed to encode BGP message: {}", e))?; + .map_err(|e| eyre!("failed to encode BGP message: {}", e))?; if let Some(stream) = self.tcp_stream.as_mut() { stream .write(&buf) .await - .map_err(|e| format!("Failed to write msg to peer: {}", e))?; + .map_err(|e| eyre!("Failed to write msg to peer: {}", e))?; } Ok(()) } // In the established state we accept Update, Keepalive and Notification messages. - async fn handle_established_msg(&mut self, msg: BGPSubmessage) -> Result<(), String> { + async fn handle_established_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> { match msg { BGPSubmessage::UpdateMessage(u) => { if !self.decide_accept_message(&u.path_attributes) { @@ -1289,7 +1310,7 @@ where } BGPSubmessage::KeepaliveMessage(_) => Ok(()), - _ => Err(format!("Got unexpected message from peer: {:?}", msg)), + _ => bail!("Got unexpected message from peer: {:?}", msg), } } } diff --git a/crates/server/src/rib_manager.rs b/crates/server/src/rib_manager.rs index 533e0c9..f2112f9 100644 --- a/crates/server/src/rib_manager.rs +++ b/crates/server/src/rib_manager.rs @@ -13,7 +13,6 @@ // limitations under the License. use crate::config::PeerConfig; -use crate::data_structures::RouteAnnounce; use crate::data_structures::RouteUpdate; use crate::peer::PeerCommands; @@ -21,10 +20,15 @@ use std::cmp::Eq; use std::collections::BTreeMap; use std::collections::HashMap; use std::convert::TryInto; +use std::net::Ipv4Addr; +use std::sync::Arc; use std::sync::Mutex; use bgp_packet::nlri::NLRI; +use bgp_packet::path_attributes::OriginPathAttribute; use bgp_packet::path_attributes::PathAttribute; +use chrono::{DateTime, Utc}; +use eyre::{bail, eyre}; use ip_network_table_deps_treebitmap::address::Address; use serde::Serialize; use tokio::sync::broadcast; @@ -37,53 +41,84 @@ use super::data_structures::RouteWithdraw; type PeerInterface = mpsc::UnboundedSender; -/// Path is a structure to contain a specific route via one nexthop. +#[derive(Debug, Clone, Serialize)] +pub enum PathSource { + LocallyConfigured, + /// BGPPeer represents a path that has been learned from a BGP peer, + /// and contains the Router ID of the peer. + BGPPeer(Ipv4Addr), +} + +/// PathData is a structure to contain a specific route via one nexthop. /// Note that currently there is an assumption that there is only /// one route per peer per prefix, but when ADD-PATH support is added /// this will no longer hold true. #[derive(Debug, Clone, Serialize)] -pub struct Path { +pub struct PathData { + /// The origin through which this path was learned. This is set to EGP when learned from + /// another peer, set to IGP when statically configured or from another control plane. + pub origin: OriginPathAttribute, + /// The nexthop that traffic can be sent to. pub nexthop: Vec, - pub peer_name: String, + /// Where this path was learned from. + pub path_source: PathSource, + /// The local pref of this path. pub local_pref: u32, + /// The multi exit discriminator of this path. pub med: u32, + /// The path of autonomous systems to the destination along this path. pub as_path: Vec, + /// Path attributes received from the peer. pub path_attributes: Vec, + /// When the path was learned. + pub learn_time: DateTime, } -impl PartialEq for Path { - fn eq(&self, other: &Path) -> bool { - // Local pref +impl PartialEq for PathData { + fn eq(&self, other: &PathData) -> bool { + // Local pref. if self.local_pref > other.local_pref { return true; } - // AS path length + + // Prefer paths that are locally originated. + if matches!(self.path_source, PathSource::LocallyConfigured) { + return true; + } + + // AS path length. if self.as_path.len() < other.as_path.len() { return true; } - // TODO: Origin - - // MED lower is better - if self.med < other.med { + // IGP < EGP < INCOMPLETE + if (self.origin as u8) < (other.origin as u8) { return true; } - // Use peer name as discriminator of last resort - self.peer_name < other.peer_name + // MED lower is better, only checked if the announcing ASN is the same. + if let (Some(announcing_as_self), Some(announcing_as_other)) = + (self.as_path.last(), other.as_path.last()) + { + if announcing_as_self == announcing_as_other && self.med < other.med { + return true; + } + } + + // Pick the oldest path to prefer more stable ones. + self.learn_time < other.learn_time } } -impl Eq for Path {} +impl Eq for PathData {} #[derive(Debug, Clone, Serialize)] pub struct PathSet { pub addr: A, pub prefixlen: u8, pub nlri: NLRI, - // paths is stored in a BTreeMap which is sorted and allows us to efficiently - // find the best path. - pub paths: BTreeMap, + /// Sorted map keyed by the BGP Identifier of the peer that sent the route. + pub paths: BTreeMap>, } /// RibSnapshot contians a version number and the dump of all the routes. @@ -141,7 +176,7 @@ where }) } - pub async fn run(&mut self) -> Result<(), String> { + pub async fn run(&mut self) -> eyre::Result<()> { loop { let next = tokio::select! { cmd = self.mgr_rx.recv() => cmd, @@ -162,7 +197,7 @@ where }, None => { warn!("All senders of the manager channel have been dropped, manager exiting!"); - return Err("Manager exited due to channel closure".to_string()); + bail!("Manager exited due to channel closure"); } } } @@ -206,26 +241,30 @@ where } } - fn handle_update(&mut self, update: RouteUpdate) -> Result<(), String> { + fn handle_update(&mut self, update: RouteUpdate) -> eyre::Result<()> { match update { RouteUpdate::Announce(announce) => self.handle_announce(announce), RouteUpdate::Withdraw(withdraw) => self.handle_withdraw(withdraw), } } - fn handle_announce(&mut self, update: RouteAnnounce) -> Result<(), String> { - let peer_name = update.peer.clone(); - let nexthop = update.nexthop; - for nlri in update.prefixes { + fn handle_announce(&mut self, update: (Vec, Arc)) -> eyre::Result<()> { + let peer_router_id = match update.1.path_source { + PathSource::LocallyConfigured => { + bail!("handle_announce should not be called with a LocallyConfigured route") + } + PathSource::BGPPeer(peer_id) => peer_id, + }; + for nlri in update.0 { // Increment the epoch on every NLRI processed. self.epoch += 1; - let addr: A = nlri.clone().try_into().map_err(|e| e.to_string())?; + let addr: A = nlri.clone().try_into().map_err(|e| eyre!(e.to_string()))?; let prefixlen = nlri.prefixlen; if let Some(path_set_wrapped) = self.rib.exact_match(addr, prefixlen.into()) { let mut path_set = path_set_wrapped.lock().unwrap(); // There is already this prefix in the RIB, check if this is a // reannouncement or fresh announcement. - match path_set.paths.get_mut(&update.peer) { + match path_set.paths.get_mut(&peer_router_id) { // Peer already announced this route before. Some(existing) => { trace!( @@ -233,20 +272,11 @@ where addr, prefixlen ); - existing.nexthop = nexthop.clone(); - existing.path_attributes = update.path_attributes.clone(); + *existing = update.1.clone(); } // First time that this peer is announcing the route. None => { - let path = Path { - nexthop: nexthop.clone(), - peer_name: peer_name.clone(), - local_pref: update.local_pref, - med: update.med, - as_path: update.as_path.clone(), - path_attributes: update.path_attributes.clone(), - }; - path_set.paths.insert(update.peer.clone(), path); + path_set.paths.insert(peer_router_id, update.1.clone()); } } @@ -265,15 +295,7 @@ where nlri, paths: BTreeMap::new(), }; - let path = Path { - nexthop: nexthop.clone(), - peer_name: peer_name.clone(), - local_pref: update.local_pref, - med: update.med, - as_path: update.as_path.clone(), - path_attributes: update.path_attributes.clone(), - }; - path_set.paths.insert(peer_name.clone(), path); + path_set.paths.insert(peer_router_id, update.1.clone()); self.rib .insert(addr, prefixlen.into(), Mutex::new(path_set.clone())); @@ -285,18 +307,18 @@ where Ok(()) } - fn handle_withdraw(&mut self, update: RouteWithdraw) -> Result<(), String> { + fn handle_withdraw(&mut self, update: RouteWithdraw) -> eyre::Result<()> { for nlri in update.prefixes { self.epoch += 1; - let addr: A = nlri.clone().try_into().map_err(|e| e.to_string())?; + let addr: A = nlri.clone().try_into().map_err(|e| eyre!(e.to_string()))?; let mut pathset_empty = false; if let Some(path_set_wrapped) = self.rib.exact_match(addr, nlri.prefixlen.into()) { let mut path_set = path_set_wrapped.lock().unwrap(); - let removed = path_set.paths.remove(&update.peer); + let removed = path_set.paths.remove(&update.peer_id); if removed.is_none() { warn!( "Got a withdrawal for route {} from {}, which was not in RIB", - nlri, update.peer + nlri, update.peer_id ); } // Ignore errors sending due to no active receivers on the channel. @@ -309,7 +331,7 @@ where } else { warn!( "Got a withdrawal for route {} from {}, which was not in RIB", - nlri, update.peer + nlri, update.peer_id ); } if pathset_empty { @@ -329,16 +351,20 @@ where #[cfg(test)] mod tests { + use crate::rib_manager::PathData; + use crate::rib_manager::PathSource; use crate::rib_manager::RibManager; - use crate::rib_manager::RouteAnnounce; use crate::rib_manager::RouteManagerCommands; use crate::rib_manager::RouteUpdate; use bgp_packet::constants::AddressFamilyIdentifier; use bgp_packet::nlri::NLRI; + use bgp_packet::path_attributes::OriginPathAttribute; + use chrono::Utc; use std::net::Ipv6Addr; use std::str::FromStr; + use std::sync::Arc; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -351,32 +377,37 @@ mod tests { let nexthop = Ipv6Addr::new(0x20, 0x01, 0xd, 0xb8, 0, 0, 0, 0x1); // Send an update to the manager and check that it adds it to the RIB. - let announce = RouteAnnounce { - peer: "Some peer".to_string(), - prefixes: vec![NLRI { - afi: AddressFamilyIdentifier::Ipv6, - prefixlen: 32, - prefix: vec![0x20, 0x01, 0xd, 0xb8], - }], + let path_data = PathData { as_path: vec![65536], local_pref: 0, med: 0, nexthop: nexthop.octets().to_vec(), path_attributes: vec![], + origin: OriginPathAttribute::EGP, + path_source: PathSource::BGPPeer("1.2.3.4".parse().unwrap()), + learn_time: Utc::now(), }; + let prefixes = vec![NLRI { + afi: AddressFamilyIdentifier::Ipv6, + prefixlen: 32, + prefix: vec![0x20, 0x01, 0xd, 0xb8], + }]; + // Manually drive the manager instead of calling run to not deal with async in tests. - assert_eq!( - rib_manager.handle_update(RouteUpdate::Announce(announce)), - Ok(()) - ); + assert!(rib_manager + .handle_update(RouteUpdate::Announce((prefixes, Arc::new(path_data)))) + .is_ok()); let addr = Ipv6Addr::from_str("2001:db8::").unwrap(); let prefixlen: u32 = 32; let lookup_result = rib_manager.lookup_path_exact(addr, prefixlen).unwrap(); assert_eq!(lookup_result.paths.len(), 1); - let path_result = lookup_result.paths.get("Some peer").unwrap(); + let path_result = lookup_result + .paths + .get(&"1.2.3.4".parse().unwrap()) + .unwrap(); assert_eq!(path_result.nexthop, nexthop.octets().to_vec()); } } diff --git a/crates/server/src/route_server.rs b/crates/server/src/route_server.rs index b9178be..5f50442 100644 --- a/crates/server/src/route_server.rs +++ b/crates/server/src/route_server.rs @@ -14,6 +14,7 @@ use crate::peer::PeerCommands; use crate::rib_manager; +use crate::rib_manager::PathSource; use crate::rib_manager::RibSnapshot; use crate::rib_manager::RouteManagerCommands; use crate::route_server::route_server::bgp_server_admin_service_server::BgpServerAdminService; @@ -90,11 +91,14 @@ impl RouteServer { }; for (_, path) in mgr_ps.1.paths { let proto_path = Path { - as_path: path.as_path, + as_path: path.as_path.clone(), local_pref: path.local_pref, med: path.med, - nexthop: path.nexthop, - peer_name: path.peer_name, + nexthop: path.nexthop.clone(), + peer_id: match path.path_source { + PathSource::LocallyConfigured => vec![], + PathSource::BGPPeer(peer) => peer.octets().to_vec(), + }, }; proto_pathset.paths.push(proto_path); } @@ -164,11 +168,14 @@ impl RouteService for RouteServer { }; for (_, path) in pathset.paths { let proto_path = Path { - as_path: path.as_path, + as_path: path.as_path.clone(), local_pref: path.local_pref, med: path.med, - nexthop: path.nexthop, - peer_name: path.peer_name, + nexthop: path.nexthop.clone(), + peer_id: match path.path_source { + PathSource::LocallyConfigured => vec![], + PathSource::BGPPeer(peer) => peer.octets().to_vec(), + }, }; proto_pathset.paths.push(proto_path); } @@ -208,11 +215,14 @@ impl RouteService for RouteServer { }; for (_, path) in pathset.paths { let proto_path = Path { - as_path: path.as_path, + as_path: path.as_path.clone(), local_pref: path.local_pref, med: path.med, - nexthop: path.nexthop, - peer_name: path.peer_name, + nexthop: path.nexthop.clone(), + peer_id: match path.path_source { + PathSource::LocallyConfigured => vec![], + PathSource::BGPPeer(peer) => peer.octets().to_vec(), + }, }; proto_pathset.paths.push(proto_path); }