// Copyright 2021 Rayhaan Jaufeerally. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use crate::config::PeerConfig; use crate::data_structures::RouteUpdate; use crate::path::path_data::PathData; use crate::path::path_set::PathSet; use crate::path::path_set::PathSource; use crate::peer::PeerCommands; use std::collections::HashMap; use std::convert::TryInto; use std::sync::Arc; use std::sync::Mutex; use bgp_packet::nlri::NLRI; use eyre::{bail, eyre}; use ip_network_table_deps_treebitmap::address::Address; use serde::Serialize; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::{info, trace, warn}; use super::data_structures::RouteWithdraw; type PeerInterface = mpsc::UnboundedSender; /// RibSnapshot contians a version number and the dump of all the routes. #[derive(Debug, Serialize)] pub struct RibSnapshot { pub epoch: u64, pub routes: Vec>, } pub enum RouteManagerCommands { Update(RouteUpdate), /// DumpRib returns the view of the RIB at the current epoch. DumpRib(oneshot::Sender>), /// StreamRib will send all the routes currently in the RIB then stream updates. StreamRib( mpsc::UnboundedSender<(u64, PathSet)>, oneshot::Sender)>>, ), } pub struct RibManager { mgr_rx: mpsc::UnboundedReceiver>, /// Peers configured on this server instance. peers: HashMap, rib: ip_network_table_deps_treebitmap::IpLookupTable>>, epoch: u64, // Handle for streaming updates to PathSets in the RIB. pathset_streaming_handle: broadcast::Sender<(u64, PathSet)>, shutdown: CancellationToken, } impl RibManager where NLRI: TryInto, >::Error: ToString, A: std::fmt::Debug + std::fmt::Display, { pub fn new( chan: mpsc::UnboundedReceiver>, shutdown: CancellationToken, ) -> Result { // TODO: Make this a flag that can be configured. let (pathset_tx, _) = broadcast::channel(10_000_000); Ok(RibManager:: { mgr_rx: chan, peers: HashMap::new(), rib: ip_network_table_deps_treebitmap::IpLookupTable::new(), epoch: 0, pathset_streaming_handle: pathset_tx, shutdown, }) } pub async fn run(&mut self) -> eyre::Result<()> { loop { let next = tokio::select! { cmd = self.mgr_rx.recv() => cmd, _ = self.shutdown.cancelled() => { warn!("RIB manager shutting down."); return Ok(()); } }; match next { Some(mgr_cmd) => match mgr_cmd { RouteManagerCommands::Update(update) => self.handle_update(update)?, RouteManagerCommands::DumpRib(sender) => { self.dump_rib(sender); } RouteManagerCommands::StreamRib(dump_sender, stream_sender) => { self.stream_rib(dump_sender, stream_sender); } }, None => { warn!("All senders of the manager channel have been dropped, manager exiting!"); bail!("Manager exited due to channel closure"); } } } } // dump_rib returns an atomic snapshot of the RIB at the current epoch. fn dump_rib(&mut self, sender: tokio::sync::oneshot::Sender>) { info!("Starting RIB dump"); let mut snapshot = RibSnapshot:: { epoch: self.epoch, routes: vec![], }; for pathset in self.rib.iter() { snapshot.routes.push(pathset.2.lock().unwrap().clone()); } if let Err(e) = sender.send(snapshot) { trace!("Failed to send snapshot of RIB: {:?}", e); } info!("Done RIB dump"); } /// stream_rib sends the current routes in the RIB back via dump_chan then closes it, /// and subsequently returns a broadcast::Receiver for streaming updates. fn stream_rib( &mut self, dump_sender: mpsc::UnboundedSender<(u64, PathSet)>, stream_sender: oneshot::Sender)>>, ) { // Send all the routes currently in the RIB. for pathset in self.rib.iter() { if let Err(e) = dump_sender.send((self.epoch, pathset.2.lock().unwrap().clone())) { warn!("Failed to send dump to client: {}", e); } } drop(dump_sender); // Create a new subscriber and return that to the caller to be notified of updates. let subscriber = self.pathset_streaming_handle.subscribe(); if let Err(e) = stream_sender.send(subscriber) { warn!("Failed to send subscriber in stream_rib: {:?}", e); } } fn handle_update(&mut self, update: RouteUpdate) -> eyre::Result<()> { match update { RouteUpdate::Announce(announce) => self.handle_announce(announce), RouteUpdate::Withdraw(withdraw) => self.handle_withdraw(withdraw), } } fn handle_announce(&mut self, update: (Vec, Arc)) -> eyre::Result<()> { let peer_router_id = match update.1.path_source { PathSource::LocallyConfigured => { bail!("handle_announce should not be called with a LocallyConfigured route") } PathSource::BGPPeer(peer_id) => peer_id, }; for nlri in update.0 { // Increment the epoch on every NLRI processed. self.epoch += 1; let addr: A = nlri.clone().try_into().map_err(|e| eyre!(e.to_string()))?; let prefixlen = nlri.prefixlen; if let Some(path_set_wrapped) = self.rib.exact_match(addr, prefixlen.into()) { let mut path_set = path_set_wrapped.lock().unwrap(); if let Some(new_best) = path_set.insert_pathdata(&peer_router_id, &update.1) { for (_config, peer) in self.peers.values() { peer.send(PeerCommands::Announce(RouteUpdate::Announce(( vec![nlri.clone()], new_best.clone(), ))))?; } } // Ignore errors sending due to no active receivers on the channel. let _ = self .pathset_streaming_handle .send((self.epoch, path_set.clone())); } else { // This prefix has never been seen before, so add a new PathSet for it. let mut path_set = PathSet::::new(addr, nlri.prefixlen, nlri.clone()); if let Some(new_best) = path_set.insert_pathdata(&peer_router_id, &update.1) { for (_config, peer) in self.peers.values() { peer.send(PeerCommands::Announce(RouteUpdate::Announce(( vec![nlri.clone()], new_best.clone(), ))))?; } } else { bail!("Inconsistent state, adding new pathdata but no new best path"); } self.rib .insert(addr, prefixlen.into(), Mutex::new(path_set.clone())); // Ignore errors sending due to no active receivers on the channel. let _ = self.pathset_streaming_handle.send((self.epoch, path_set)); } } Ok(()) } fn handle_withdraw(&mut self, update: RouteWithdraw) -> eyre::Result<()> { for nlri in update.prefixes { self.epoch += 1; let addr: A = nlri.clone().try_into().map_err(|e| eyre!(e.to_string()))?; let mut pathset_empty = false; if let Some(path_set_wrapped) = self.rib.exact_match(addr, nlri.prefixlen.into()) { let mut path_set = path_set_wrapped.lock().unwrap(); let removed = path_set.remove_pathdata(&update.peer_id, &nlri); match removed { Err(_e) => { warn!( "Got a withdrawal for route {} from {}, which was not in RIB", nlri, update.peer_id ); } Ok(Some(new_best)) => { // Communicate new_best to all peers. for (_config, peer) in self.peers.values() { peer.send(PeerCommands::Announce(RouteUpdate::Announce(( vec![nlri.clone()], new_best.clone(), ))))?; } } Ok(None) => { // Do nothing here since we check below if the path is withdrawn. } } // Ignore errors sending due to no active receivers on the channel. let _ = self .pathset_streaming_handle .send((self.epoch, path_set.clone())); if path_set.is_empty() { pathset_empty = true; for (_config, peer) in self.peers.values() { peer.send(PeerCommands::Announce(RouteUpdate::Withdraw( RouteWithdraw { peer_id: update.peer_id, prefixes: vec![nlri.clone()], }, )))?; } } } else { warn!( "Got a withdrawal for route {} from {}, which was not in RIB", nlri, update.peer_id ); } if pathset_empty { self.rib.remove(addr, nlri.prefixlen.into()); } } Ok(()) } pub fn lookup_path_exact(&self, addr: A, prefixlen: u32) -> Option> { self.rib .exact_match(addr, prefixlen) .map(|path| path.lock().unwrap().clone()) } } #[cfg(test)] mod tests { use crate::rib_manager::PathData; use crate::rib_manager::PathSource; use crate::rib_manager::RibManager; use crate::rib_manager::RouteManagerCommands; use crate::rib_manager::RouteUpdate; use bgp_packet::constants::AddressFamilyIdentifier; use bgp_packet::nlri::NLRI; use bgp_packet::path_attributes::OriginPathAttribute; use chrono::Utc; use std::net::Ipv6Addr; use std::str::FromStr; use std::sync::Arc; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; #[test] fn test_manager_process_single() { let (_, rp_rx) = mpsc::unbounded_channel::>(); let mut rib_manager: RibManager = RibManager::::new(rp_rx, CancellationToken::new()).unwrap(); let nexthop = Ipv6Addr::new(0x20, 0x01, 0xd, 0xb8, 0, 0, 0, 0x1); // Send an update to the manager and check that it adds it to the RIB. let path_data = PathData { as_path: vec![65536], local_pref: 0, med: 0, nexthop: nexthop.octets().to_vec(), path_attributes: vec![], origin: OriginPathAttribute::EGP, path_source: PathSource::BGPPeer("1.2.3.4".parse().unwrap()), learn_time: Utc::now(), }; let prefixes = vec![NLRI { afi: AddressFamilyIdentifier::Ipv6, prefixlen: 32, prefix: vec![0x20, 0x01, 0xd, 0xb8], }]; // Manually drive the manager instead of calling run to not deal with async in tests. assert!(rib_manager .handle_update(RouteUpdate::Announce((prefixes, Arc::new(path_data)))) .is_ok()); let addr = Ipv6Addr::from_str("2001:db8::").unwrap(); let prefixlen: u32 = 32; let lookup_result = rib_manager.lookup_path_exact(addr, prefixlen).unwrap(); assert_eq!(lookup_result.len(), 1); let path_result = lookup_result .get_by_announcer(&"1.2.3.4".parse().unwrap()) .unwrap(); assert_eq!(path_result.nexthop, nexthop.octets().to_vec()); } }