diff --git a/bin/src/client/main.rs b/bin/src/client/main.rs index d660dd3..d8dde2c 100644 --- a/bin/src/client/main.rs +++ b/bin/src/client/main.rs @@ -1,10 +1,6 @@ use clap::Parser; use eyre::Result; -use tracing::instrument::WithSubscriber; -use tracing::log::LevelFilter; use tracing::{info, warn}; -use tracing_subscriber::filter; -use tracing_subscriber::prelude::*; use route_client::netlink::NetlinkConnector; use route_client::{run_connector_v4, run_connector_v6}; diff --git a/crates/bgp_packet/src/capabilities.rs b/crates/bgp_packet/src/capabilities.rs index 11e5249..c2f94ea 100644 --- a/crates/bgp_packet/src/capabilities.rs +++ b/crates/bgp_packet/src/capabilities.rs @@ -48,9 +48,6 @@ pub mod BGPOpenOptionTypeValues { pub const CAPABILITIES: BGPOpenOptionType = BGPOpenOptionType(2); } -/// OpenOptionValue represents something which can be in the payload of OpenOption. -trait OpenOptionValue: ReadablePacket + WritablePacket + fmt::Debug {} - #[derive(Debug, PartialEq)] pub struct OpenOption { pub option_type: BGPOpenOptionType, diff --git a/crates/bgp_packet/src/messages.rs b/crates/bgp_packet/src/messages.rs index 323403a..72959fa 100644 --- a/crates/bgp_packet/src/messages.rs +++ b/crates/bgp_packet/src/messages.rs @@ -21,6 +21,7 @@ use crate::traits::BGPParserError; use crate::traits::ParserContext; use crate::traits::ReadablePacket; use crate::traits::WritablePacket; + use byteorder::{ByteOrder, NetworkEndian}; use bytes::Buf; use bytes::BufMut; diff --git a/crates/route_client/proto/route_service.proto b/crates/route_client/proto/route_service.proto index 5f4606c..43b8e4c 100644 --- a/crates/route_client/proto/route_service.proto +++ b/crates/route_client/proto/route_service.proto @@ -68,10 +68,11 @@ message PeerStatusRequest {} message PeerStatus { string peer_name = 1; string state = 2; - uint64 session_established_time = 3; - uint64 last_messaage_time = 4; - uint64 route_updates_in = 5; - uint64 route_updates_out = 6; + // The following fields are only populated when the session is established. + optional uint64 session_established_time = 3; + optional uint64 last_messaage_time = 4; + optional uint64 route_updates_in = 5; + optional uint64 route_updates_out = 6; } message PeerStatusResponse { repeated PeerStatus peer_status = 1; } diff --git a/crates/route_client/src/lib.rs b/crates/route_client/src/lib.rs index 0dd95b4..249f68e 100644 --- a/crates/route_client/src/lib.rs +++ b/crates/route_client/src/lib.rs @@ -32,10 +32,9 @@ use eyre::{anyhow, Result}; use ip_network_table_deps_treebitmap::IpLookupTable; use tonic::transport::Endpoint; use tonic::transport::Uri; -use tracing::{info, warn}; +use tracing::info; use crate::fib_state::FibState; -use crate::netlink::NetlinkConnector; use crate::proto::route_service_client::RouteServiceClient; use crate::southbound_interface::SouthboundInterface; diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 23f95ac..e6c4004 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -15,6 +15,8 @@ workspace = true bgp_packet.workspace = true byteorder = "1.4.3" bytes.workspace = true +chrono = "0.4.38" +eyre.workspace = true ip_network_table-deps-treebitmap.workspace = true log.workspace = true nom = "7.1" diff --git a/crates/server/proto/route_service.proto b/crates/server/proto/route_service.proto index 5f4606c..e972d8e 100644 --- a/crates/server/proto/route_service.proto +++ b/crates/server/proto/route_service.proto @@ -68,10 +68,10 @@ message PeerStatusRequest {} message PeerStatus { string peer_name = 1; string state = 2; - uint64 session_established_time = 3; - uint64 last_messaage_time = 4; - uint64 route_updates_in = 5; - uint64 route_updates_out = 6; + optional uint64 session_established_time = 3; + optional uint64 last_messaage_time = 4; + optional uint64 route_updates_in = 5; + optional uint64 route_updates_out = 6; } message PeerStatusResponse { repeated PeerStatus peer_status = 1; } diff --git a/crates/server/src/bgp_server.rs b/crates/server/src/bgp_server.rs index 84e95ee..0a555a5 100644 --- a/crates/server/src/bgp_server.rs +++ b/crates/server/src/bgp_server.rs @@ -20,6 +20,7 @@ use crate::rib_manager::RibManager; use crate::rib_manager::RibSnapshot; use crate::rib_manager::RouteManagerCommands; use crate::route_server; +use crate::route_server::route_server::bgp_server_admin_service_server::BgpServerAdminServiceServer; use crate::route_server::route_server::route_service_server::RouteServiceServer; use bgp_packet::constants::AddressFamilyIdentifier; use std::collections::HashMap; @@ -212,7 +213,6 @@ async fn start_http_server( match rx.await { Ok(resp) => { result += &format!("Peer state: {:?}
", resp.state); - result += &format!("{:?}", resp.config); } Err(e) => { warn!("error on rx from peer channel: {}", e); @@ -461,10 +461,12 @@ impl Server { peer_state_machines: peer_chan_map, }; - let svc = RouteServiceServer::new(rs); + let rs_svc = RouteServiceServer::new(rs.clone()); + let adm_svc = BgpServerAdminServiceServer::new(rs); tokio::spawn(async move { if let Err(e) = tonic::transport::Server::builder() - .add_service(svc) + .add_service(rs_svc) + .add_service(adm_svc) .serve(addr) .await { diff --git a/crates/server/src/config.rs b/crates/server/src/config.rs index f51ca00..f1e379b 100644 --- a/crates/server/src/config.rs +++ b/crates/server/src/config.rs @@ -65,9 +65,26 @@ pub struct PrefixAnnouncement { /// Linklocal nexthop to be used for IPv6 announcements. pub llnh: Option, - // Path attributes + /// Path attributes pub local_pref: Option, + /// Multi exit discriminator pub med: Option, + /// Legacy communities [RFC 1997] pub communities: Option>, + /// Large communities [RFC 8092] pub large_communities: Option>, } + +impl Default for PrefixAnnouncement { + fn default() -> Self { + Self { + prefix: "::/0".to_owned(), + nexthop: IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + llnh: Default::default(), + local_pref: Default::default(), + med: Default::default(), + communities: Default::default(), + large_communities: Default::default(), + } + } +} diff --git a/crates/server/src/peer.rs b/crates/server/src/peer.rs index e8eeac8..3ea7ed2 100644 --- a/crates/server/src/peer.rs +++ b/crates/server/src/peer.rs @@ -18,6 +18,7 @@ use crate::data_structures::RouteAnnounce; use crate::data_structures::RouteWithdraw; use crate::data_structures::{RouteInfo, RouteUpdate}; use crate::rib_manager::RouteManagerCommands; +use crate::route_server::route_server::PeerStatus; use bgp_packet::capabilities::{ BGPCapability, BGPCapabilityTypeValues, BGPCapabilityValue, BGPOpenOptionTypeValues, FourByteASNCapability, MultiprotocolCapability, OpenOption, OpenOptionCapabilities, @@ -44,10 +45,13 @@ use bgp_packet::path_attributes::{ }; use bgp_packet::traits::ParserContext; use bytes::BytesMut; +use chrono::{DateTime, NaiveDateTime, Offset, TimeZone, Utc}; +use eyre::{bail, eyre}; use ip_network_table_deps_treebitmap::address::Address; use ip_network_table_deps_treebitmap::IpLookupTable; use std::convert::TryFrom; use std::convert::TryInto; +use std::io::ErrorKind; use std::net::IpAddr; use std::net::SocketAddr; use std::sync::Arc; @@ -73,15 +77,6 @@ type PeerInterface = mpsc::UnboundedSender; // not be expensive, and other tasks such as picking the best route // will be done in a different threading model. -/// PeerStatus contians the current state of the PSM for monitoring -/// and debugging. -#[derive(Clone, Debug)] -pub struct PeerStatus { - pub name: String, - pub config: PeerConfig, - pub state: BGPState, -} - /// BGPState represents which state of the BGP state machine the peer /// is currently in. #[derive(Copy, Clone, Debug, PartialEq)] @@ -175,7 +170,7 @@ async fn run_timer( async fn check_hold_timer( cancel_token: CancellationToken, iface: PeerInterface, - last_msg_time: Arc>, + last_msg_time: Arc>>, hold_time: std::time::Duration, ) { loop { @@ -186,23 +181,16 @@ async fn check_hold_timer( } _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => { let last = last_msg_time.read().unwrap(); - let elapsed_time = std::time::SystemTime::now().duration_since(*last); - match elapsed_time { - Ok(duration) => { - if duration > hold_time { - match iface.send(PeerCommands::TimerEvent(PeerTimerEvent::HoldTimerExpire())) { - Ok(()) => {}, - Err(e) => { - warn!("Failed to send HoldTimerExpire message: {}", e); - } - } - // Exit the hold timer task since it's expired already and is not needed anymore. - return; + let elapsed_time = Utc::now() - *last; + if elapsed_time.num_seconds() as u64 > hold_time.as_secs() { + match iface.send(PeerCommands::TimerEvent(PeerTimerEvent::HoldTimerExpire())) { + Ok(()) => {}, + Err(e) => { + warn!("Failed to send HoldTimerExpire message: {}", e); } } - Err(e) => { - warn!("Failed to check duration since last message: {}", e); - } + // Exit the hold timer task since it's expired already and is not needed anymore. + return; } } @@ -342,9 +330,12 @@ pub struct PeerStateMachine { /// updates from the peer go to rib_in. route_manager: mpsc::UnboundedSender>, + // The time at which the session was established. + established_time: Option>, + // Keep track of the time of the last message to efficiently implement // the hold timer. - last_msg_time: Arc>, + last_msg_time: Arc>>, // Timers and cancellation token to spawned tasks connect_timer: Option<(JoinHandle<()>, CancellationToken)>, @@ -386,7 +377,8 @@ where iface_rx, iface_tx, route_manager, - last_msg_time: Arc::new(RwLock::new(std::time::SystemTime::UNIX_EPOCH)), + established_time: None, + last_msg_time: Arc::new(RwLock::new(DateTime::from_timestamp(0, 0).unwrap())), connect_timer: None, hold_timer: None, keepalive_timer: None, @@ -443,7 +435,7 @@ where } } - async fn handle_chan_msg(&mut self, c: PeerCommands) -> Result<(), std::io::Error> { + async fn handle_chan_msg(&mut self, c: PeerCommands) -> eyre::Result<()> { match c { PeerCommands::NewConnection(mut conn) => { let peer_addr = conn.peer_addr()?; @@ -567,16 +559,14 @@ where PeerCommands::MessageFromPeer(msg) => match self.handle_msg(msg).await { Ok(_) => { - // Update the last time counter - // We call unwrap here because it indicates that some other thread which - // was accessing the lock had a panic. - // TODO: This should be handled more gracefully, maybe by shutting down the - // peer and starting it up again. - let mut last_time_lock = (*self.last_msg_time).write().unwrap(); - *last_time_lock = std::time::SystemTime::now(); + let mut last_time = self + .last_msg_time + .write() + .map_err(|e| eyre!(e.to_string()))?; + *last_time = Utc::now(); } Err(e) => { - return Err(std::io::Error::new(std::io::ErrorKind::Other, e)); + bail!(e); } }, PeerCommands::TimerEvent(timer_event) => match timer_event { @@ -623,9 +613,12 @@ where }, PeerCommands::GetStatus(sender) => { let state = PeerStatus { - name: self.config.name.clone(), - config: self.config.clone(), - state: self.state, + peer_name: self.config.name.clone(), + state: format!("{:?}", self.state), + session_established_time: self.established_time.map(|t| t.timestamp() as u64), + last_messaage_time: Some(self.last_msg_time.read().unwrap().timestamp() as u64), + route_updates_in: Some(0), /* todo */ + route_updates_out: Some(0), /* todo */ }; match sender.send(state) { Ok(()) => {} @@ -714,6 +707,7 @@ where // Set the state machine back to the expected. self.state = BGPState::Active; + self.established_time = None; // Restart the connect timer to try and connect periodically. { @@ -952,6 +946,7 @@ where self.config.name, o.asn ); self.state = BGPState::Active; + self.established_time = None; if let Some(stream) = self.tcp_stream.as_mut() { stream.shutdown().await.map_err(|e| e.to_string())?; } @@ -1086,6 +1081,7 @@ where BGPSubmessage::KeepaliveMessage(_) => { // Switch the state from OpenConfirm to ESTABLISHED. self.state = BGPState::Established; + self.established_time = Some(Utc::now()); if hold_time > 0 { // Set keepalive timer. diff --git a/crates/server/src/rib_manager.rs b/crates/server/src/rib_manager.rs index 448cf7f..533e0c9 100644 --- a/crates/server/src/rib_manager.rs +++ b/crates/server/src/rib_manager.rs @@ -106,9 +106,10 @@ pub enum RouteManagerCommands { pub struct RibManager { mgr_rx: mpsc::UnboundedReceiver>, + + /// Peers configured on this server instance. peers: HashMap, - // We need to use a mutex for PathSet because IpLookupTable does not return a mut ptr. rib: ip_network_table_deps_treebitmap::IpLookupTable>>, epoch: u64, diff --git a/crates/server/src/route_server.rs b/crates/server/src/route_server.rs index 4e4531e..b9178be 100644 --- a/crates/server/src/route_server.rs +++ b/crates/server/src/route_server.rs @@ -16,6 +16,7 @@ use crate::peer::PeerCommands; use crate::rib_manager; use crate::rib_manager::RibSnapshot; use crate::rib_manager::RouteManagerCommands; +use crate::route_server::route_server::bgp_server_admin_service_server::BgpServerAdminService; use crate::route_server::route_server::route_service_server::RouteService; use crate::route_server::route_server::AddressFamily; use crate::route_server::route_server::DumpPathsRequest; @@ -25,7 +26,8 @@ use crate::route_server::route_server::PathSet; use crate::route_server::route_server::Prefix; use crate::route_server::route_server::StreamPathsRequest; use bgp_packet::constants::AddressFamilyIdentifier; -use log::warn; +use route_server::PeerStatusRequest; +use route_server::PeerStatusResponse; use std::collections::HashMap; use std::net::Ipv4Addr; use std::net::Ipv6Addr; @@ -36,11 +38,13 @@ use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; use tonic::Response; use tonic::Status; +use tracing::warn; pub mod route_server { tonic::include_proto!("bgpd.grpc"); } +#[derive(Clone)] pub struct RouteServer { pub ip4_manager: UnboundedSender>, pub ip6_manager: UnboundedSender>, @@ -98,6 +102,31 @@ impl RouteServer { } } +#[tonic::async_trait] +impl BgpServerAdminService for RouteServer { + async fn peer_status( + &self, + request: tonic::Request, + ) -> Result, Status> { + let mut result = PeerStatusResponse::default(); + + for peer in &self.peer_state_machines { + let (tx, rx) = oneshot::channel(); + if let Err(e) = peer.1.send(PeerCommands::GetStatus(tx)) { + warn!( + peer = peer.0, + "Peer channel dead when trying to send state request" + ); + continue; + } + let resp = rx.await.map_err(|e| Status::internal(format!("{}", e)))?; + result.peer_status.push(resp); + } + + Ok(Response::new(result)) + } +} + #[tonic::async_trait] impl RouteService for RouteServer { async fn dump_paths( diff --git a/tests/integration_tests/Cargo.toml b/tests/integration_tests/Cargo.toml index 9a0abf5..df58967 100644 --- a/tests/integration_tests/Cargo.toml +++ b/tests/integration_tests/Cargo.toml @@ -6,14 +6,15 @@ name = "integration_tests" version = "0.1.0" [dependencies] -bgp_packet.workspace = true -bgp_server.workspace = true -bytes = "1.*" -libc = "0.2.126" -tokio = { version = "1.6.1", features = ["full"] } -tokio-util = { version = "0.7.10", features = ["codec"] } -tracing = "0.1" -tracing-subscriber = "0.2" +bgp_packet.workspace = true +bgp_server.workspace = true +bytes = "1.*" +libc = "0.2.126" +route_client.workspace = true +tokio = { version = "1.6.1", features = ["full"] } +tokio-util = { version = "0.7.10", features = ["codec"] } +tracing = "0.1" +tracing-subscriber = "0.2" [dev-dependencies] serial_test = "0.5.1" diff --git a/tests/integration_tests/tests/basic_startup.rs b/tests/integration_tests/tests/basic_startup.rs index 408cad9..343fb3c 100644 --- a/tests/integration_tests/tests/basic_startup.rs +++ b/tests/integration_tests/tests/basic_startup.rs @@ -12,11 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use bgp_packet::constants::{AddressFamilyIdentifier, SubsequentAddressFamilyIdentifier}; -use bgp_packet::messages::BGPSubmessage; -use bgp_packet::traits::ParserContext; -use bgp_server::bgp_server::Server; -use bgp_server::config::{PeerConfig, ServerConfig}; use std::io::{Read, Write}; use std::mem::size_of; use std::net::Ipv4Addr; @@ -26,16 +21,23 @@ use std::net::TcpStream; use std::net::{IpAddr, SocketAddrV6}; use std::os::unix::io::AsRawFd; use std::time::Duration; -use tokio::io::AsyncReadExt; use tokio_util::codec::Decoder; use tracing::info; +use bgp_packet::constants::{AddressFamilyIdentifier, SubsequentAddressFamilyIdentifier}; +use bgp_packet::messages::BGPSubmessage; +use bgp_packet::traits::ParserContext; +use bgp_server::bgp_server::Server; +use bgp_server::config::{PeerConfig, PrefixAnnouncement, ServerConfig}; +use bgp_server::route_server::route_server::bgp_server_admin_service_client::BgpServerAdminServiceClient; +use bgp_server::route_server::route_server::PeerStatusRequest; + #[macro_use] extern crate serial_test; fn init() { match tracing_subscriber::fmt() - .with_env_filter("bgpd=trace,tokio=trace,basic_startup=trace") + // .with_env_filter("server=trace,tokio=trace,basic_startup=trace") .try_init() { Ok(()) => {} @@ -513,7 +515,6 @@ async fn test_bgp_listener_known_peer_inbound_reconnection() { } } - // conn.shutdown(std::net::Shutdown::Both).unwrap(); drop(conn); // Try to connect to localhost:9179 and it should connect and send the OPEN message. @@ -556,3 +557,74 @@ async fn test_bgp_listener_known_peer_inbound_reconnection() { bgp_server.shutdown().await; } + +// Spawn two instances of the BGP server and make them peer with each other and exchange a route. +// We then check that the route is accepted by the receiving peer. +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn test_multi_instance_announce() { + init(); + + let v6_addr: Ipv6Addr = "::1".parse().unwrap(); + let config_a = ServerConfig { + asn: 65535, + hold_time: 3, + identifier: Ipv4Addr::new(127, 0, 0, 1), + grpc_addr: Some("[::]:9181".to_owned()), + http_addr: None, + listen_addrs: vec!["[::]:9179".to_owned()], + peers: vec![PeerConfig { + afi: AddressFamilyIdentifier::Ipv6, + safi: SubsequentAddressFamilyIdentifier::Unicast, + asn: 65536, + ip: IpAddr::V6(v6_addr), + port: Some(9180), + announcements: vec![PrefixAnnouncement { + prefix: "2001:db8:babe::/48".to_owned(), + nexthop: "2001:db8::1".parse().unwrap(), + local_pref: Some(100), + med: Some(100), + ..Default::default() + }], + name: "config-b-peer".to_string(), + local_pref: 100, + }], + }; + + let mut config_b = config_a.clone(); + config_b.asn = 65536; + config_b.listen_addrs = vec!["[::]:9180".to_owned()]; + config_b.grpc_addr = Some("[::]:9182".to_owned()); + config_b.peers[0].asn = 65535; + config_b.peers[0].port = Some(9179); + config_b.peers[0].name = "config-a-peer".to_owned(); + + let mut server_a = Server::new(config_a); + server_a.start(true).await.unwrap(); + + let mut server_b = Server::new(config_b); + server_b.start(true).await.unwrap(); + + // Connect to the grpc endpoint of server_b and check until the peer is healthy. + let mut stub = BgpServerAdminServiceClient::connect("http://[::1]:9182") + .await + .unwrap(); + + let mut established = false; + + for _ in 1..10 { + let response = stub + .peer_status(PeerStatusRequest::default()) + .await + .unwrap() + .into_inner(); + info!(?response); + + if response.peer_status[0].state == "Established" { + established = true; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + assert!(established); +}