Misc cleanups
Some checks are pending
Rust / build (push) Waiting to run

This commit is contained in:
Rayhaan Jaufeerally
2024-08-18 13:59:28 +00:00
parent 82d052ca33
commit 0e45b96440
6 changed files with 132 additions and 164 deletions

View File

@ -28,7 +28,6 @@ use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::convert::TryInto;
use std::fmt;
use std::io::ErrorKind;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;
@ -283,7 +282,7 @@ mod tests {
use std::convert::TryFrom;
use super::NLRI;
use crate::constants::AddressFamilyIdentifier::{Ipv4, Ipv6};
use crate::constants::AddressFamilyIdentifier::{self, Ipv4, Ipv6};
use crate::traits::ParserContext;
use crate::traits::ReadablePacket;
use crate::traits::WritablePacket;
@ -373,17 +372,4 @@ mod tests {
assert_eq!(reparsed, parsed_nlri);
}
}
// #[test]
// fn test_to_string_invalids() {
// let invalid_v4 = NLRI {
// afi: AddressFamilyIdentifier::Ipv4,
// prefix: vec![1, 2, 3, 4, 5],
// prefixlen: 16,
// };
// assert_eq!(
// "a formatting trait implementation returned an error: Error",
// format!("{}", invalid_v4)
// );
// }
}

View File

@ -6,7 +6,6 @@ use netlink_packet_route::route::RouteAddress;
use netlink_packet_route::route::RouteAttribute;
use netlink_packet_route::route::RouteHeader;
use netlink_packet_route::route::RouteMessage;
use netlink_packet_route::route::RouteProtocol;
use netlink_packet_route::route::RouteType;
use netlink_packet_route::AddressFamily as NetlinkAddressFamily;
use rtnetlink::IpVersion;

View File

@ -185,33 +185,6 @@ async fn start_http_server(
Ok(warp::http::Response::builder().body(result).into_response())
}
/*
async fn modify_community_fn(
add: bool,
peers: HashMap<String, UnboundedSender<PeerCommands>>,
name: String,
ld1: u32,
ld2: u32,
) -> Result<impl warp::Reply, warp::Rejection> {
if let Some(chan) = peers.get(&name) {
if let Err(e) = func(chan.clone(), ld1, ld2).await {
warn!("Failed to add large community: {:?}", e);
return Err(warp::reject());
}
} else {
return Err(warp::reject());
}
Ok(warp::reply::with_status("Ok", warp::http::StatusCode::OK))
}
let add_community_filter = warp::post()
.map(move || true)
.and(warp::path::param())
.and(warp::path!(u32 / u32))
.and_then(modify_community_fn);
*/
// Start the web server that has access to the rib managers so that it can expose the state.
let v4_mgr_filter = warp::any().map(move || manager4.clone());

View File

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::config::PrefixAnnouncement;
use crate::config::{PeerConfig, ServerConfig};
use crate::data_structures::RouteWithdraw;
use crate::data_structures::{RouteInfo, RouteUpdate};
@ -45,6 +44,7 @@ use bgp_packet::path_attributes::{
};
use bgp_packet::traits::ParserContext;
use bytes::BytesMut;
use chrono::TimeZone;
use chrono::{DateTime, Utc};
use eyre::{bail, eyre};
use ip_network_table_deps_treebitmap::address::Address;
@ -53,8 +53,8 @@ use std::convert::TryFrom;
use std::convert::TryInto;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@ -165,8 +165,8 @@ async fn run_timer(
async fn check_hold_timer(
cancel_token: CancellationToken,
iface: PeerInterface,
last_msg_time: Arc<RwLock<DateTime<Utc>>>,
hold_time: std::time::Duration,
last_msg_time: Arc<AtomicI64>,
hold_time: chrono::Duration,
) {
loop {
tokio::select! {
@ -175,9 +175,17 @@ async fn check_hold_timer(
return;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
let last = last_msg_time.read().unwrap();
let elapsed_time = Utc::now() - *last;
if elapsed_time.num_seconds() as u64 > hold_time.as_secs() {
let now = Utc::now();
let last_msg = match Utc.timestamp_millis_opt(last_msg_time.load(Ordering::Relaxed)) {
chrono::offset::LocalResult::Single(t) => t,
chrono::offset::LocalResult::Ambiguous(earliest, _) => earliest,
chrono::offset::LocalResult::None => {
warn!("Failed to parse last msg time");
continue;
},
};
if (now - last_msg) > hold_time {
match iface.send(PeerCommands::TimerEvent(PeerTimerEvent::HoldTimerExpire())) {
Ok(()) => {},
Err(e) => {
@ -187,6 +195,7 @@ async fn check_hold_timer(
// Exit the hold timer task since it's expired already and is not needed anymore.
return;
}
}
}
@ -336,7 +345,7 @@ pub struct PeerStateMachine<A: Address> {
// Keep track of the time of the last message to efficiently implement
// the hold timer.
last_msg_time: Arc<RwLock<DateTime<Utc>>>,
last_msg_time: Arc<AtomicI64>,
// Timers and cancellation token to spawned tasks
connect_timer: Option<(JoinHandle<()>, CancellationToken)>,
@ -380,7 +389,7 @@ where
iface_tx,
route_manager,
established_time: None,
last_msg_time: Arc::new(RwLock::new(DateTime::from_timestamp(0, 0).unwrap())),
last_msg_time: Arc::new(AtomicI64::default()),
connect_timer: None,
hold_timer: None,
keepalive_timer: None,
@ -519,13 +528,9 @@ where
}
PeerCommands::MessageFromPeer(msg) => match self.handle_msg(msg).await {
Ok(_) => {
let mut last_time = self
.last_msg_time
.write()
.map_err(|e| eyre!(e.to_string()))?;
*last_time = Utc::now();
}
Ok(_) => self
.last_msg_time
.store(Utc::now().timestamp_millis(), Ordering::Relaxed),
Err(e) => {
bail!(e);
}
@ -580,7 +585,7 @@ where
},
state: format!("{:?}", self.state),
session_established_time: self.established_time.map(|t| t.timestamp() as u64),
last_messaage_time: Some(self.last_msg_time.read().unwrap().timestamp() as u64),
last_messaage_time: Some(self.last_msg_time.load(Ordering::Relaxed) as u64),
route_updates_in: Some(0), /* todo */
route_updates_out: Some(0), /* todo */
};
@ -1066,9 +1071,10 @@ where
if hold_time > 0 {
// Set keepalive timer.
let keepalive_duration = hold_time / 3;
info!(
trace!(
"Using keepalive duration of {} for peer {}",
keepalive_duration, self.config.name
keepalive_duration,
self.config.name
);
{
let token = CancellationToken::new();
@ -1098,7 +1104,7 @@ where
token_copy,
chan,
last_msg_time,
std::time::Duration::from_secs(hold_time.into()),
chrono::Duration::seconds(hold_time.into()),
)
.await
});
@ -1107,11 +1113,7 @@ where
}
};
// TODO: Should not have to clone here?
let announcements: Vec<PrefixAnnouncement> = self.config.announcements.clone();
for announcement in announcements {
self.announce_static(&announcement).await?;
}
self.announce_static().await?;
Ok(())
}
@ -1135,103 +1137,102 @@ where
Ok(())
}
async fn announce_static(&mut self, announcement: &PrefixAnnouncement) -> eyre::Result<()> {
let mut bgp_update_msg = UpdateMessage {
withdrawn_nlri: vec![],
announced_nlri: vec![],
path_attributes: vec![],
};
/// Generates and sends announcements for all statically defined routes.
async fn announce_static(&mut self) -> eyre::Result<()> {
for announcement in &self.config.announcements {
let mut bgp_update_msg = UpdateMessage {
withdrawn_nlri: vec![],
announced_nlri: vec![],
path_attributes: vec![],
};
// Origin, TODO: configure this based on i/eBGP
bgp_update_msg
.path_attributes
.push(PathAttribute::OriginPathAttribute(OriginPathAttribute::EGP));
bgp_update_msg
.path_attributes
.push(ASPathAttribute::from_asns(vec![self.server_config.asn]));
match self.config.afi {
AddressFamilyIdentifier::Ipv4 => {
match announcement.nexthop {
IpAddr::V4(nh) => {
bgp_update_msg
.path_attributes
.push(PathAttribute::NextHopPathAttribute(NextHopPathAttribute(
nh,
)))
}
_ => bail!("Found non IPv4 nexthop in announcement"),
}
let nlri = NLRI::try_from(announcement.prefix.as_str())
.map_err(|e| eyre!(e.to_string()))?;
bgp_update_msg.announced_nlri.push(nlri);
}
AddressFamilyIdentifier::Ipv6 => {
let nexthop_octets = match announcement.nexthop {
IpAddr::V6(nh) => nh.octets().to_vec(),
_ => {
bail!("Found non IPv6 nexthop in announcement");
}
};
let nlri = NLRI::try_from(announcement.prefix.as_str())
.map_err(|e| eyre!(e.to_string()))?;
let mp_reach = MPReachNLRIPathAttribute {
afi: AddressFamilyIdentifier::Ipv6,
safi: SubsequentAddressFamilyIdentifier::Unicast,
nexthop: nexthop_octets,
nlris: vec![nlri],
};
bgp_update_msg
.path_attributes
.push(PathAttribute::MPReachNLRIPathAttribute(mp_reach));
}
}
if let Some(large_communities) = &announcement.large_communities {
let mut large_communities_attr = LargeCommunitiesPathAttribute { values: vec![] };
for large_community in large_communities {
let parts: Vec<u32> = large_community
.split(':')
.flat_map(|x| x.parse::<u32>())
.collect();
if parts.len() != 3 {
warn!("Failed to parse large community: {}", large_community);
}
let payload = LargeCommunitiesPayload {
global_admin: parts[0],
ld1: parts[1],
ld2: parts[2],
};
large_communities_attr.values.push(payload);
}
// Origin, TODO: configure this based on i/eBGP
bgp_update_msg
.path_attributes
.push(PathAttribute::LargeCommunitiesPathAttribute(
large_communities_attr,
));
}
.push(PathAttribute::OriginPathAttribute(OriginPathAttribute::EGP));
let bgp_message = BGPMessage {
msg_type: UPDATE_MESSAGE,
payload: BGPSubmessage::UpdateMessage(bgp_update_msg),
};
bgp_update_msg
.path_attributes
.push(ASPathAttribute::from_asns(vec![self.server_config.asn]));
info!("Sending static announcement to peer: {:?}", bgp_message);
match self.config.afi {
AddressFamilyIdentifier::Ipv4 => {
match announcement.nexthop {
IpAddr::V4(nh) => bgp_update_msg.path_attributes.push(
PathAttribute::NextHopPathAttribute(NextHopPathAttribute(nh)),
),
_ => bail!("Found non IPv4 nexthop in announcement"),
}
let mut buf = BytesMut::new();
self.codec
.lock()
.await
.encode(bgp_message, &mut buf)
.map_err(|e| eyre!("failed to encode BGP message: {}", e))?;
let nlri = NLRI::try_from(announcement.prefix.as_str())
.map_err(|e| eyre!(e.to_string()))?;
bgp_update_msg.announced_nlri.push(nlri);
}
AddressFamilyIdentifier::Ipv6 => {
let nexthop_octets = match announcement.nexthop {
IpAddr::V6(nh) => nh.octets().to_vec(),
_ => {
bail!("Found non IPv6 nexthop in announcement");
}
};
let nlri = NLRI::try_from(announcement.prefix.as_str())
.map_err(|e| eyre!(e.to_string()))?;
let mp_reach = MPReachNLRIPathAttribute {
afi: AddressFamilyIdentifier::Ipv6,
safi: SubsequentAddressFamilyIdentifier::Unicast,
nexthop: nexthop_octets,
nlris: vec![nlri],
};
bgp_update_msg
.path_attributes
.push(PathAttribute::MPReachNLRIPathAttribute(mp_reach));
}
}
if let Some(stream) = self.tcp_stream.as_mut() {
stream
.write(&buf)
if let Some(large_communities) = &announcement.large_communities {
let mut large_communities_attr = LargeCommunitiesPathAttribute { values: vec![] };
for large_community in large_communities {
let parts: Vec<u32> = large_community
.split(':')
.flat_map(|x| x.parse::<u32>())
.collect();
if parts.len() != 3 {
warn!("Failed to parse large community: {}", large_community);
}
let payload = LargeCommunitiesPayload {
global_admin: parts[0],
ld1: parts[1],
ld2: parts[2],
};
large_communities_attr.values.push(payload);
}
bgp_update_msg
.path_attributes
.push(PathAttribute::LargeCommunitiesPathAttribute(
large_communities_attr,
));
}
let bgp_message = BGPMessage {
msg_type: UPDATE_MESSAGE,
payload: BGPSubmessage::UpdateMessage(bgp_update_msg),
};
info!("Sending static announcement to peer: {:?}", bgp_message);
let mut buf = BytesMut::new();
self.codec
.lock()
.await
.map_err(|e| eyre!("Failed to write msg to peer: {}", e))?;
.encode(bgp_message, &mut buf)
.map_err(|e| eyre!("failed to encode BGP message: {}", e))?;
if let Some(stream) = self.tcp_stream.as_mut() {
stream
.write(&buf)
.await
.map_err(|e| eyre!("Failed to write msg to peer: {}", e))?;
}
}
Ok(())
}

View File

@ -213,9 +213,8 @@ where
for pathset in self.rib.iter() {
snapshot.routes.push(pathset.2.lock().unwrap().clone());
}
// TODO: handle an error here.
if let Err(e) = sender.send(snapshot) {
warn!("Failed to send snapshot of RIB: {:?}", e);
trace!("Failed to send snapshot of RIB: {:?}", e);
}
info!("Done RIB dump");
}

View File

@ -110,21 +110,31 @@ impl RouteServer {
impl BgpServerAdminService for RouteServer {
async fn peer_status(
&self,
request: tonic::Request<PeerStatusRequest>,
_request: tonic::Request<PeerStatusRequest>,
) -> Result<Response<PeerStatusResponse>, Status> {
let mut result = PeerStatusResponse::default();
// Store the pending futures in a JoinSet to parallelize fetching.
let mut join_set = tokio::task::JoinSet::new();
for peer in &self.peer_state_machines {
let (tx, rx) = oneshot::channel();
if let Err(e) = peer.1.send(PeerCommands::GetStatus(tx)) {
if let Err(_e) = peer.1.send(PeerCommands::GetStatus(tx)) {
warn!(
peer = peer.0,
"Peer channel dead when trying to send state request"
"Failed to send GetStatus request to PeerStateMachine"
);
continue;
}
let resp = rx.await.map_err(|e| Status::internal(format!("{}", e)))?;
result.peer_status.push(resp);
join_set.spawn(rx);
}
while let Some(next) = join_set.join_next().await {
match next {
Ok(Ok(peer_status)) => result.peer_status.push(peer_status),
Ok(Err(e)) => return Err(Status::internal(format!("{}", e))),
Err(e) => return Err(Status::internal(format!("{}", e))),
}
}
Ok(Response::new(result))