This commit is contained in:
Rayhaan Jaufeerally
2024-07-16 18:59:57 +00:00
35 changed files with 515 additions and 297 deletions

View File

@ -0,0 +1,524 @@
// Copyright 2021 Rayhaan Jaufeerally.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::config::PeerConfig;
use crate::config::ServerConfig;
use crate::peer::PeerCommands;
use crate::peer::PeerStateMachine;
use crate::rib_manager::RibManager;
use crate::rib_manager::RibSnapshot;
use crate::rib_manager::RouteManagerCommands;
use crate::route_server;
use crate::route_server::route_server::route_service_server::RouteServiceServer;
use bgp_packet::constants::AddressFamilyIdentifier;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use warp::Filter;
use warp::Reply;
// socket_listener starts listening on the given address, and passes clients that have
// made an inbound connection to the provided stream. It also implements logic for
// recreating the listener in the event that it fails.
// Notifier is sent the restult of the first attempt to start the listener.
async fn socket_listener(
c: UnboundedSender<(TcpStream, SocketAddr)>,
listen_addr: String,
notifier: oneshot::Sender<Result<(), String>>,
shutdown: CancellationToken,
) {
info!("Starting to listen on addr: {}", listen_addr);
let listener_result = TcpListener::bind(&listen_addr).await;
if let Err(e) = listener_result {
warn!("Listener for {} failed: {}", listen_addr, e.to_string());
match notifier.send(Err(e.to_string())) {
Ok(_) => {}
Err(e) => warn!(?e, "Failed to send notification of channel error"),
}
return;
}
let listener = listener_result.unwrap();
match notifier.send(Ok(())) {
Ok(_) => {}
Err(e) => warn!("Failed to send notification of channel ready: {:?}", e),
}
info!(listen_addr, "Spawned listner");
loop {
let conn = tokio::select! {
res = listener.accept() => res,
_ = shutdown.cancelled() => {
info!("Shutting down listener");
return;
}
};
info!(?conn, "New inbound connection");
match conn {
Ok((stream, addr)) => {
info!("Accepted socket connection from {}", addr);
match c.send((stream, addr)) {
Ok(_) => {}
Err(e) => {
warn!(
"Dropped connection from {} due to mpsc::channel failure: {}",
addr, e
);
}
}
}
Err(e) => {
warn!("Failed to accept connection: {}, aborting listener", e);
break;
}
}
}
}
async fn start_http_server(
manager4: UnboundedSender<RouteManagerCommands<Ipv4Addr>>,
manager6: UnboundedSender<RouteManagerCommands<Ipv6Addr>>,
peers: HashMap<String, UnboundedSender<PeerCommands>>,
listen_addr: SocketAddr,
shutdown: CancellationToken,
) -> Result<tokio::task::JoinHandle<()>, String> {
async fn manager_get_routes_handler<T: serde::ser::Serialize>(
channel: UnboundedSender<RouteManagerCommands<T>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let (tx, rx) = tokio::sync::oneshot::channel::<RibSnapshot<T>>();
if let Err(e) = channel.send(RouteManagerCommands::DumpRib(tx)) {
warn!("Failed to send DumpRib request: {}", e);
return Err(warp::reject());
}
match rx.await {
Ok(result) => Ok(warp::reply::json(&result)),
Err(e) => {
warn!("Failed to get RIB from manager: {}", e);
Err(warp::reject())
}
}
}
// async fn rm_large_community(
// chan: UnboundedSender<PeerCommands>,
// ld1: u32,
// ld2: u32,
// ) -> Result<impl warp::Reply, warp::Rejection> {
// let (tx, rx) = tokio::sync::oneshot::channel::<String>();
// if let Err(e) = chan.send(PeerCommands::RemoveLargeCommunity((ld1, ld2), tx)) {
// warn!("Failed to send RemoveLargeCommunity request: {}", e);
// return Err(warp::reject());
// }
// match rx.await {
// Ok(result) => Ok(warp::reply::json(&result)),
// Err(e) => {
// warn!(
// "RemoveLargeCommunity response from peer state machine: {}",
// e
// );
// Err(warp::reject())
// }
// }
// }
// async fn add_large_community(
// chan: UnboundedSender<PeerCommands>,
// ld1: u32,
// ld2: u32,
// ) -> Result<impl warp::Reply, warp::Rejection> {
// let (tx, rx) = tokio::sync::oneshot::channel::<String>();
// if let Err(e) = chan.send(PeerCommands::AddLargeCommunity((ld1, ld2), tx)) {
// warn!("Failed to send AddLargeCommunity request: {}", e);
// return Err(warp::reject());
// }
// match rx.await {
// Ok(result) => Ok(warp::reply::json(&result)),
// Err(e) => {
// warn!("AddLargeCommunity response from peer state machine: {}", e);
// Err(warp::reject())
// }
// }
// }
// reset_peer_connection causes the PSM to close the connection, flush state, and reconnect to the peer.
async fn reset_peer_connection(
peer_name: String,
peers: HashMap<String, UnboundedSender<PeerCommands>>,
) -> Result<impl warp::Reply, warp::Rejection> {
if let Some(peer_sender) = peers.get(&peer_name) {
if let Err(e) = peer_sender.send(PeerCommands::ConnectionClosed()) {
Ok(warp::reply::with_status(
format!("Something went wrong: {}", e),
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response())
} else {
Ok(warp::reply::html(
"Sent restart request to PeerStateMachine. Something might happen.",
)
.into_response())
}
} else {
Ok(
warp::reply::with_status("No such peer found!", warp::http::StatusCode::NOT_FOUND)
.into_response(),
)
}
}
/// peerz is a debugging endpoint for PeerStateMachines on this server.
async fn get_peerz(
peers: HashMap<String, UnboundedSender<PeerCommands>>,
) -> Result<impl warp::Reply, warp::Rejection> {
let mut result: String = "<!DOCTYPE html><body>".to_string();
for (peer_name, sender) in peers {
result += &format!("<h2>{}</h2><br/>", peer_name);
let (tx, rx) = oneshot::channel();
match sender.send(PeerCommands::GetStatus(tx)) {
Ok(()) => {}
Err(e) => {
warn!("Failed to send request to PSM channel: {}", e);
return Ok(warp::reply::with_status(
"Something went wrong!",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
}
}
match rx.await {
Ok(resp) => {
result += &format!("Peer state: <b>{:?}</b><br/>", resp.state);
result += &format!("<code>{:?}</code>", resp.config);
}
Err(e) => {
warn!("error on rx from peer channel: {}", e);
return Ok(warp::reply::with_status(
"Something went wrong!",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
}
}
}
result += "</body></html>";
Ok(warp::http::Response::builder().body(result).into_response())
}
/*
async fn modify_community_fn(
add: bool,
peers: HashMap<String, UnboundedSender<PeerCommands>>,
name: String,
ld1: u32,
ld2: u32,
) -> Result<impl warp::Reply, warp::Rejection> {
if let Some(chan) = peers.get(&name) {
if let Err(e) = func(chan.clone(), ld1, ld2).await {
warn!("Failed to add large community: {:?}", e);
return Err(warp::reject());
}
} else {
return Err(warp::reject());
}
Ok(warp::reply::with_status("Ok", warp::http::StatusCode::OK))
}
let add_community_filter = warp::post()
.map(move || true)
.and(warp::path::param())
.and(warp::path!(u32 / u32))
.and_then(modify_community_fn);
*/
// Start the web server that has access to the rib managers so that it can expose the state.
let v4_mgr_filter = warp::any().map(move || manager4.clone());
let warp_v4_routes = warp::get()
.and(warp::path("ipv4"))
.and(warp::path("routes"))
.and(warp::path::end())
.and(v4_mgr_filter)
.and_then(manager_get_routes_handler);
let v6_mgr_filter = warp::any().map(move || manager6.clone());
let warp_v6_routes = warp::get()
.and(warp::path("ipv6"))
.and(warp::path("routes"))
.and(warp::path::end())
.and(v6_mgr_filter)
.and_then(manager_get_routes_handler);
let peers_map_filter = warp::any().map(move || peers.clone());
let peerz_route = warp::get()
.and(warp::path("peerz"))
.and(warp::path::end())
.and(peers_map_filter.clone())
.and_then(get_peerz);
let peers_restart_route = warp::post()
.and(warp::path("peerz"))
.and(warp::path::param())
.and(warp::path("restart"))
.and(warp::path::end())
.and(peers_map_filter)
.and_then(reset_peer_connection);
let routes = warp_v4_routes
.or(warp_v6_routes)
.or(peerz_route)
.or(peers_restart_route);
let (_, server) = warp::serve(routes)
.try_bind_with_graceful_shutdown(listen_addr, async move {
shutdown.cancelled().await;
})
.map_err(|e| e.to_string())?;
Ok(tokio::task::spawn(server))
}
/// Server encapsulates the behavior of the BGP speaker.
pub struct Server {
config: ServerConfig,
// shutdown is a channel that a
shutdown: CancellationToken,
// worker_handles contains the JoinHandle of tasks spawned by the server so that
// we can wait on them for shutdown.
worker_handles: Vec<tokio::task::JoinHandle<()>>,
mgr_v6: Option<UnboundedSender<RouteManagerCommands<Ipv6Addr>>>,
mgr_v4: Option<UnboundedSender<RouteManagerCommands<Ipv4Addr>>>,
}
impl Server {
pub fn new(config: ServerConfig) -> Server {
let shutdown = CancellationToken::new();
Server {
config,
shutdown,
worker_handles: vec![],
mgr_v4: None,
mgr_v6: None,
}
}
// start kicks off the BGP server
// wait_startup controls whether this function waits for the listeners to come up healthy
// before returning. This is useful in tests and other situations where we want to wait
// and then probe the endpoints.
pub async fn start(&mut self, wait_startup: bool) -> Result<(), String> {
// TODO: the following code spawns a bunch of asynchronous tasks, and it would be
// good to have a handle on the status of these tasks so that we can restart them
// or alert if they crash.
// Channel for passing newly established TCP streams to the dispatcher.
let (tcp_in_tx, mut tcp_in_rx): (UnboundedSender<(TcpStream, SocketAddr)>, _) =
tokio::sync::mpsc::unbounded_channel();
// For every address we are meant to listen on, we spawn a task that will listen on
// that address. This is so that if the listening socket breaks somehow, we can
// periodically retry to listen again.
for listen_addr in self.config.clone().listen_addrs {
info!("Starting listener for {}", listen_addr.to_string());
let sender = tcp_in_tx.clone();
let (ready_tx, ready_rx) = oneshot::channel();
let listen_handle = tokio::spawn({
let shutdown = self.shutdown.clone();
async move {
socket_listener(sender, listen_addr.to_string(), ready_tx, shutdown).await;
}
});
self.worker_handles.push(listen_handle);
if wait_startup {
let statup_result = ready_rx.await;
match statup_result {
Ok(_) => {}
Err(err) => return Err(format!("Failed to startup listener: {}", err)),
}
}
}
// Start the route manager for IPv6 and IPv4.
let (rp6_tx, rp6_rx) = unbounded_channel::<RouteManagerCommands<Ipv6Addr>>();
self.mgr_v6 = Some(rp6_tx.clone());
let mut rib_manager6: RibManager<Ipv6Addr> =
RibManager::<Ipv6Addr>::new(rp6_rx, self.shutdown.clone()).unwrap();
tokio::spawn(async move {
match rib_manager6.run().await {
Ok(_) => {}
Err(e) => {
warn!("RIBManager exited: {}", e);
}
}
});
let (rp4_tx, rp4_rx) = unbounded_channel::<RouteManagerCommands<Ipv4Addr>>();
self.mgr_v4 = Some(rp4_tx.clone());
let mut rib_manager4: RibManager<Ipv4Addr> =
RibManager::<Ipv4Addr>::new(rp4_rx, self.shutdown.clone()).unwrap();
tokio::spawn(async move {
match rib_manager4.run().await {
Ok(_) => {}
Err(e) => {
warn!("RIBManager exited: {}", e);
}
}
});
// Start a PeerStateMachine for every peer that is configured and store its channel so that
// we can communicate with it.
let mut peer_statemachines: HashMap<String, (PeerConfig, UnboundedSender<PeerCommands>)> =
HashMap::new();
for peer_config in &self.config.peers {
let (psm_tx, psm_rx) = unbounded_channel::<PeerCommands>();
match peer_config.afi {
AddressFamilyIdentifier::Ipv6 => {
let mut psm = PeerStateMachine::<Ipv6Addr>::new(
self.config.clone(),
peer_config.clone(),
psm_rx,
psm_tx.clone(),
rp6_tx.clone(),
self.shutdown.clone(),
);
self.worker_handles.push(tokio::spawn(async move {
psm.run().await;
}));
}
AddressFamilyIdentifier::Ipv4 => {
let mut psm = PeerStateMachine::<Ipv4Addr>::new(
self.config.clone(),
peer_config.clone(),
psm_rx,
psm_tx.clone(),
rp4_tx.clone(),
self.shutdown.clone(),
);
self.worker_handles.push(tokio::spawn(async move {
psm.run().await;
}));
}
_ => panic!("Unsupported address family: {}", peer_config.afi),
}
peer_statemachines.insert(peer_config.name.clone(), (peer_config.clone(), psm_tx));
}
let mut peer_chan_map: HashMap<String, UnboundedSender<PeerCommands>> = HashMap::new();
for (k, v) in &peer_statemachines {
peer_chan_map.insert(k.to_string(), v.1.clone());
}
// Start the HTTP server for debugging access.
if let Some(http_addr) = &self.config.http_addr {
let addr = http_addr.parse().unwrap();
start_http_server(
rp4_tx.clone(),
rp6_tx.clone(),
peer_chan_map.clone(),
addr,
self.shutdown.clone(),
)
.await
.unwrap();
}
// Start the gRPC server for streaming the RIB.
if let Some(grpc_addr) = &self.config.grpc_addr {
let addr = grpc_addr.parse().unwrap();
info!("Running gRPC RouteService on {}", addr);
let rs = route_server::RouteServer {
ip4_manager: rp4_tx.clone(),
ip6_manager: rp6_tx.clone(),
peer_state_machines: peer_chan_map,
};
let svc = RouteServiceServer::new(rs);
tokio::spawn(async move {
if let Err(e) = tonic::transport::Server::builder()
.add_service(svc)
.serve(addr)
.await
{
warn!("Failed to run gRPC server: {}", e);
}
});
}
// Event loop for processing inbound connections.
let shutdown = self.shutdown.clone();
self.worker_handles.push(tokio::spawn(async move {
loop {
let next = tokio::select! {
cmd = tcp_in_rx.recv() => cmd,
_ = shutdown.cancelled() => {
warn!("Peer connection dispatcher shutting down due to shutdown signal.");
return;
}
};
match next {
Some((socket, addr)) => {
let mut psm_opt: Option<UnboundedSender<PeerCommands>> = None;
for (name, handle) in &peer_statemachines {
if handle.0.ip == addr.ip() {
info!("Got connection for peer: {}", name);
psm_opt = Some(handle.1.clone());
}
}
if let Some(psm) = psm_opt {
psm.send(PeerCommands::NewConnection(socket)).unwrap();
} else {
info!("Dropping unrecognized connection from {}", addr);
}
}
None => {
warn!("Failed to read incoming connections, exiting");
break;
}
}
}
}));
Ok(())
}
pub async fn shutdown(&mut self) {
self.shutdown.cancel();
for handle in &mut self.worker_handles {
match handle.await {
Ok(_) => {}
Err(e) => {
warn!("Failed to shutdown task: {}", e);
}
}
}
}
}

View File

@ -0,0 +1,73 @@
// Copyright 2021 Rayhaan Jaufeerally.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bgp_packet::constants::{AddressFamilyIdentifier, SubsequentAddressFamilyIdentifier};
use serde::{Deserialize, Serialize};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[derive(Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub identifier: Ipv4Addr,
pub asn: u32,
pub hold_time: u16,
// The address to listen on for control plane gRPC connections.
// If unset the gRPC server is not started.
pub grpc_addr: Option<String>,
// The address to listen on for the debugging HTTP server.
// If unset the HTTP server is not started.
pub http_addr: Option<String>,
// The addresses to listen on for BGP peers.
pub listen_addrs: Vec<String>,
pub peers: Vec<PeerConfig>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerConfig {
/// A unique name for this peer.
pub name: String,
pub ip: IpAddr,
/// Optional port number to communicate with this peer.
pub port: Option<u16>,
/// Autonomous system number of the peer.
pub asn: u32,
pub afi: AddressFamilyIdentifier,
pub safi: SubsequentAddressFamilyIdentifier,
pub local_pref: u32,
// Announcements is a hardcoded list of BGP updates to send
// to the peer.
pub announcements: Vec<PrefixAnnouncement>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PrefixAnnouncement {
pub prefix: String,
/// Nexthop to be announced for this prefix.
pub nexthop: IpAddr,
/// Linklocal nexthop to be used for IPv6 announcements.
pub llnh: Option<Ipv6Addr>,
// Path attributes
pub local_pref: Option<u32>,
pub med: Option<u32>,
pub communities: Option<Vec<String>>,
pub large_communities: Option<Vec<String>>,
}

View File

@ -0,0 +1,66 @@
// Copyright 2021 Rayhaan Jaufeerally.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use bgp_packet::nlri::NLRI;
use bgp_packet::path_attributes::PathAttribute;
use std::time::SystemTime;
/// RouteInfo encapsulates information received about a particular BGP route.
#[derive(Clone, Debug)]
pub struct RouteInfo<A> {
pub prefix: A,
pub prefixlen: u8,
pub nlri: NLRI,
/// accepted is true if the route was accepted.
pub accepted: bool,
/// rejection_reason contains the reason why a particular route was dropped.
pub rejection_reason: Option<String>,
/// Time at which this path was learned from the peer.
pub learned: SystemTime,
/// Time at which this path was last updated by the peer.
pub updated: SystemTime,
/// The current path attributes from the UPDATE message where this path
/// was learned.
pub path_attributes: Vec<PathAttribute>,
}
/// RouteUpdate is a type which encapsulates a newly learned, modified, or removed set of prefixes.
#[derive(Debug)]
pub enum RouteUpdate {
Announce(RouteAnnounce),
Withdraw(RouteWithdraw),
}
#[derive(Debug)]
pub struct RouteAnnounce {
pub peer: String,
pub prefixes: Vec<NLRI>,
pub local_pref: u32,
pub med: u32,
pub as_path: Vec<u32>,
pub nexthop: Vec<u8>,
pub path_attributes: Vec<PathAttribute>,
}
#[derive(Debug)]
pub struct RouteWithdraw {
pub peer: String,
pub prefixes: Vec<NLRI>,
}

20
crates/server/src/lib.rs Normal file
View File

@ -0,0 +1,20 @@
// Copyright 2021 Rayhaan Jaufeerally.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod bgp_server;
pub mod config;
pub mod data_structures;
pub mod peer;
pub mod rib_manager;
pub mod route_server;

1336
crates/server/src/peer.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,381 @@
// Copyright 2021 Rayhaan Jaufeerally.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::config::PeerConfig;
use crate::data_structures::RouteAnnounce;
use crate::data_structures::RouteUpdate;
use crate::peer::PeerCommands;
use std::cmp::Eq;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Mutex;
use bgp_packet::nlri::NLRI;
use bgp_packet::path_attributes::PathAttribute;
use ip_network_table_deps_treebitmap::address::Address;
use serde::Serialize;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::{info, trace, warn};
use super::data_structures::RouteWithdraw;
type PeerInterface = mpsc::UnboundedSender<PeerCommands>;
/// Path is a structure to contain a specific route via one nexthop.
/// Note that currently there is an assumption that there is only
/// one route per peer per prefix, but when ADD-PATH support is added
/// this will no longer hold true.
#[derive(Debug, Clone, Serialize)]
pub struct Path {
pub nexthop: Vec<u8>,
pub peer_name: String,
pub local_pref: u32,
pub med: u32,
pub as_path: Vec<u32>,
pub path_attributes: Vec<PathAttribute>,
}
impl PartialEq for Path {
fn eq(&self, other: &Path) -> bool {
// Local pref
if self.local_pref > other.local_pref {
return true;
}
// AS path length
if self.as_path.len() < other.as_path.len() {
return true;
}
// TODO: Origin
// MED lower is better
if self.med < other.med {
return true;
}
// Use peer name as discriminator of last resort
self.peer_name < other.peer_name
}
}
impl Eq for Path {}
#[derive(Debug, Clone, Serialize)]
pub struct PathSet<A> {
pub addr: A,
pub prefixlen: u8,
pub nlri: NLRI,
// paths is stored in a BTreeMap which is sorted and allows us to efficiently
// find the best path.
pub paths: BTreeMap<String, Path>,
}
/// RibSnapshot contians a version number and the dump of all the routes.
#[derive(Debug, Serialize)]
pub struct RibSnapshot<A> {
pub epoch: u64,
pub routes: Vec<PathSet<A>>,
}
pub enum RouteManagerCommands<A> {
Update(RouteUpdate),
/// DumpRib returns the view of the RIB at the current epoch.
DumpRib(oneshot::Sender<RibSnapshot<A>>),
/// StreamRib will send all the routes currently in the RIB then stream updates.
StreamRib(
mpsc::UnboundedSender<(u64, PathSet<A>)>,
oneshot::Sender<broadcast::Receiver<(u64, PathSet<A>)>>,
),
}
pub struct RibManager<A: Address> {
mgr_rx: mpsc::UnboundedReceiver<RouteManagerCommands<A>>,
peers: HashMap<String, (PeerConfig, PeerInterface)>,
// We need to use a mutex for PathSet because IpLookupTable does not return a mut ptr.
rib: ip_network_table_deps_treebitmap::IpLookupTable<A, Mutex<PathSet<A>>>,
epoch: u64,
// Handle for streaming updates to PathSets in the RIB.
pathset_streaming_handle: broadcast::Sender<(u64, PathSet<A>)>,
shutdown: CancellationToken,
}
impl<A: Address> RibManager<A>
where
NLRI: TryInto<A>,
<NLRI as TryInto<A>>::Error: ToString,
A: std::fmt::Debug + std::fmt::Display,
{
pub fn new(
chan: mpsc::UnboundedReceiver<RouteManagerCommands<A>>,
shutdown: CancellationToken,
) -> Result<Self, std::io::Error> {
// TODO: Make this a flag that can be configured.
let (pathset_tx, _) = broadcast::channel(10_000_000);
Ok(RibManager::<A> {
mgr_rx: chan,
peers: HashMap::new(),
rib: ip_network_table_deps_treebitmap::IpLookupTable::new(),
epoch: 0,
pathset_streaming_handle: pathset_tx,
shutdown,
})
}
pub async fn run(&mut self) -> Result<(), String> {
loop {
let next = tokio::select! {
cmd = self.mgr_rx.recv() => cmd,
_ = self.shutdown.cancelled() => {
warn!("RIB manager shutting down.");
return Ok(());
}
};
match next {
Some(mgr_cmd) => match mgr_cmd {
RouteManagerCommands::Update(update) => self.handle_update(update)?,
RouteManagerCommands::DumpRib(sender) => {
self.dump_rib(sender);
}
RouteManagerCommands::StreamRib(dump_sender, stream_sender) => {
self.stream_rib(dump_sender, stream_sender);
}
},
None => {
warn!("All senders of the manager channel have been dropped, manager exiting!");
return Err("Manager exited due to channel closure".to_string());
}
}
}
}
// dump_rib returns an atomic snapshot of the RIB at the current epoch.
fn dump_rib(&mut self, sender: tokio::sync::oneshot::Sender<RibSnapshot<A>>) {
info!("Starting RIB dump");
let mut snapshot = RibSnapshot::<A> {
epoch: self.epoch,
routes: vec![],
};
for pathset in self.rib.iter() {
snapshot.routes.push(pathset.2.lock().unwrap().clone());
}
// TODO: handle an error here.
if let Err(e) = sender.send(snapshot) {
warn!("Failed to send snapshot of RIB: {:?}", e);
}
info!("Done RIB dump");
}
/// stream_rib sends the current routes in the RIB back via dump_chan then closes it,
/// and subsequently returns a broadcast::Receiver for streaming updates.
fn stream_rib(
&mut self,
dump_sender: mpsc::UnboundedSender<(u64, PathSet<A>)>,
stream_sender: oneshot::Sender<broadcast::Receiver<(u64, PathSet<A>)>>,
) {
// Send all the routes currently in the RIB.
for pathset in self.rib.iter() {
if let Err(e) = dump_sender.send((self.epoch, pathset.2.lock().unwrap().clone())) {
warn!("Failed to send dump to client: {}", e);
}
}
drop(dump_sender);
// Create a new subscriber and return that to the caller to be notified of updates.
let subscriber = self.pathset_streaming_handle.subscribe();
if let Err(e) = stream_sender.send(subscriber) {
warn!("Failed to send subscriber in stream_rib: {:?}", e);
}
}
fn handle_update(&mut self, update: RouteUpdate) -> Result<(), String> {
match update {
RouteUpdate::Announce(announce) => self.handle_announce(announce),
RouteUpdate::Withdraw(withdraw) => self.handle_withdraw(withdraw),
}
}
fn handle_announce(&mut self, update: RouteAnnounce) -> Result<(), String> {
let peer_name = update.peer.clone();
let nexthop = update.nexthop;
for nlri in update.prefixes {
// Increment the epoch on every NLRI processed.
self.epoch += 1;
let addr: A = nlri.clone().try_into().map_err(|e| e.to_string())?;
let prefixlen = nlri.prefixlen;
if let Some(path_set_wrapped) = self.rib.exact_match(addr, prefixlen.into()) {
let mut path_set = path_set_wrapped.lock().unwrap();
// There is already this prefix in the RIB, check if this is a
// reannouncement or fresh announcement.
match path_set.paths.get_mut(&update.peer) {
// Peer already announced this route before.
Some(existing) => {
trace!(
"Updating existing path attributes for NLRI: {}/{}",
addr,
prefixlen
);
existing.nexthop = nexthop.clone();
existing.path_attributes = update.path_attributes.clone();
}
// First time that this peer is announcing the route.
None => {
let path = Path {
nexthop: nexthop.clone(),
peer_name: peer_name.clone(),
local_pref: update.local_pref,
med: update.med,
as_path: update.as_path.clone(),
path_attributes: update.path_attributes.clone(),
};
path_set.paths.insert(update.peer.clone(), path);
}
}
// There is no explicit sorting and marking of the best path since
// BTreeMap is already sorted.
// Ignore errors sending due to no active receivers on the channel.
let _ = self
.pathset_streaming_handle
.send((self.epoch, path_set.clone()));
} else {
// This prefix has never been seen before, so add a new PathSet for it.
let mut path_set = PathSet::<A> {
addr,
prefixlen: nlri.prefixlen,
nlri,
paths: BTreeMap::new(),
};
let path = Path {
nexthop: nexthop.clone(),
peer_name: peer_name.clone(),
local_pref: update.local_pref,
med: update.med,
as_path: update.as_path.clone(),
path_attributes: update.path_attributes.clone(),
};
path_set.paths.insert(peer_name.clone(), path);
self.rib
.insert(addr, prefixlen.into(), Mutex::new(path_set.clone()));
// Ignore errors sending due to no active receivers on the channel.
let _ = self.pathset_streaming_handle.send((self.epoch, path_set));
}
}
Ok(())
}
fn handle_withdraw(&mut self, update: RouteWithdraw) -> Result<(), String> {
for nlri in update.prefixes {
self.epoch += 1;
let addr: A = nlri.clone().try_into().map_err(|e| e.to_string())?;
let mut pathset_empty = false;
if let Some(path_set_wrapped) = self.rib.exact_match(addr, nlri.prefixlen.into()) {
let mut path_set = path_set_wrapped.lock().unwrap();
let removed = path_set.paths.remove(&update.peer);
if removed.is_none() {
warn!(
"Got a withdrawal for route {} from {}, which was not in RIB",
nlri, update.peer
);
}
// Ignore errors sending due to no active receivers on the channel.
let _ = self
.pathset_streaming_handle
.send((self.epoch, path_set.clone()));
if path_set.paths.is_empty() {
pathset_empty = true;
}
} else {
warn!(
"Got a withdrawal for route {} from {}, which was not in RIB",
nlri, update.peer
);
}
if pathset_empty {
self.rib.remove(addr, nlri.prefixlen.into());
}
}
Ok(())
}
pub fn lookup_path_exact(&self, addr: A, prefixlen: u32) -> Option<PathSet<A>> {
self.rib
.exact_match(addr, prefixlen)
.map(|path| path.lock().unwrap().clone())
}
}
#[cfg(test)]
mod tests {
use crate::rib_manager::RibManager;
use crate::rib_manager::RouteAnnounce;
use crate::rib_manager::RouteManagerCommands;
use crate::rib_manager::RouteUpdate;
use bgp_packet::constants::AddressFamilyIdentifier;
use bgp_packet::nlri::NLRI;
use std::net::Ipv6Addr;
use std::str::FromStr;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
#[test]
fn test_manager_process_single() {
let (_, rp_rx) = mpsc::unbounded_channel::<RouteManagerCommands<Ipv6Addr>>();
let mut rib_manager: RibManager<Ipv6Addr> =
RibManager::<Ipv6Addr>::new(rp_rx, CancellationToken::new()).unwrap();
let nexthop = Ipv6Addr::new(0x20, 0x01, 0xd, 0xb8, 0, 0, 0, 0x1);
// Send an update to the manager and check that it adds it to the RIB.
let announce = RouteAnnounce {
peer: "Some peer".to_string(),
prefixes: vec![NLRI {
afi: AddressFamilyIdentifier::Ipv6,
prefixlen: 32,
prefix: vec![0x20, 0x01, 0xd, 0xb8],
}],
as_path: vec![65536],
local_pref: 0,
med: 0,
nexthop: nexthop.octets().to_vec(),
path_attributes: vec![],
};
// Manually drive the manager instead of calling run to not deal with async in tests.
assert_eq!(
rib_manager.handle_update(RouteUpdate::Announce(announce)),
Ok(())
);
let addr = Ipv6Addr::from_str("2001:db8::").unwrap();
let prefixlen: u32 = 32;
let lookup_result = rib_manager.lookup_path_exact(addr, prefixlen).unwrap();
assert_eq!(lookup_result.paths.len(), 1);
let path_result = lookup_result.paths.get("Some peer").unwrap();
assert_eq!(path_result.nexthop, nexthop.octets().to_vec());
}
}

View File

@ -0,0 +1,309 @@
// Copyright 2021 Rayhaan Jaufeerally.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::peer::PeerCommands;
use crate::rib_manager;
use crate::rib_manager::RibSnapshot;
use crate::rib_manager::RouteManagerCommands;
use crate::route_server::route_server::route_service_server::RouteService;
use crate::route_server::route_server::AddressFamily;
use crate::route_server::route_server::DumpPathsRequest;
use crate::route_server::route_server::DumpPathsResponse;
use crate::route_server::route_server::Path;
use crate::route_server::route_server::PathSet;
use crate::route_server::route_server::Prefix;
use crate::route_server::route_server::StreamPathsRequest;
use bgp_packet::constants::AddressFamilyIdentifier;
use log::warn;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Response;
use tonic::Status;
pub mod route_server {
tonic::include_proto!("bgpd.grpc");
}
pub struct RouteServer {
pub ip4_manager: UnboundedSender<RouteManagerCommands<Ipv4Addr>>,
pub ip6_manager: UnboundedSender<RouteManagerCommands<Ipv6Addr>>,
pub peer_state_machines: HashMap<String, UnboundedSender<PeerCommands>>,
}
impl RouteServer {
async fn get_streaming_receiver<A>(
&self,
manager: UnboundedSender<RouteManagerCommands<A>>,
// dump_tx is used to receive the current state before streaming starts.
dump_tx: UnboundedSender<(u64, rib_manager::PathSet<A>)>,
) -> Result<broadcast::Receiver<(u64, rib_manager::PathSet<A>)>, Status> {
let (stream_tx, stream_rx) =
oneshot::channel::<broadcast::Receiver<(u64, rib_manager::PathSet<A>)>>();
if let Err(e) = manager.send(RouteManagerCommands::StreamRib(dump_tx, stream_tx)) {
warn!("Failed to send StreamRib command to route manager: {}", e);
return Err(tonic::Status::internal(
"failed to communicate with route manager".to_owned(),
));
}
stream_rx
.await
.map_err(|e| tonic::Status::internal(e.to_string()))
}
/// Converts a rib_manager::PathSet into the proto format PathSet using the
/// appropriate address family.
fn transform_pathset<A>(
mgr_ps: (u64, rib_manager::PathSet<A>),
address_family: i32,
) -> PathSet {
let mut proto_pathset = PathSet {
epoch: mgr_ps.0,
prefix: Some(Prefix {
ip_prefix: mgr_ps.1.nlri.prefix,
prefix_len: mgr_ps.1.nlri.prefixlen.into(),
address_family,
}),
paths: vec![],
};
for (_, path) in mgr_ps.1.paths {
let proto_path = Path {
as_path: path.as_path,
local_pref: path.local_pref,
med: path.med,
nexthop: path.nexthop,
peer_name: path.peer_name,
};
proto_pathset.paths.push(proto_path);
}
proto_pathset
}
}
#[tonic::async_trait]
impl RouteService for RouteServer {
async fn dump_paths(
&self,
request: tonic::Request<DumpPathsRequest>,
) -> Result<Response<DumpPathsResponse>, Status> {
let mut response = DumpPathsResponse {
epoch: 0,
path_sets: vec![],
};
let afi = AddressFamilyIdentifier::try_from(request.get_ref().address_family as u16)
.map_err(|e| tonic::Status::internal(e.to_string()))?;
match afi {
AddressFamilyIdentifier::Ipv4 => {
let (tx, rx) = tokio::sync::oneshot::channel::<RibSnapshot<Ipv4Addr>>();
if let Err(e) = self.ip4_manager.send(RouteManagerCommands::DumpRib(tx)) {
warn!("Failed to send DumpRib command to route manager: {}", e);
return Err(tonic::Status::internal(
"failed to communicate with route manager",
));
}
match rx.await {
Ok(result) => {
response.epoch = result.epoch;
for pathset in result.routes {
let mut proto_pathset = PathSet {
epoch: result.epoch,
prefix: Some(Prefix {
ip_prefix: pathset.nlri.prefix,
prefix_len: pathset.nlri.prefixlen.into(),
address_family: AddressFamily::IPv4.into(),
}),
paths: vec![],
};
for (_, path) in pathset.paths {
let proto_path = Path {
as_path: path.as_path,
local_pref: path.local_pref,
med: path.med,
nexthop: path.nexthop,
peer_name: path.peer_name,
};
proto_pathset.paths.push(proto_path);
}
response.path_sets.push(proto_pathset);
}
Ok(tonic::Response::new(response))
}
Err(e) => {
warn!("Failed to get response from route manager: {}", e);
return Err(tonic::Status::internal(
"failed to get response from route manager",
));
}
}
}
AddressFamilyIdentifier::Ipv6 => {
let (tx, rx) = tokio::sync::oneshot::channel::<RibSnapshot<Ipv6Addr>>();
if let Err(e) = self.ip6_manager.send(RouteManagerCommands::DumpRib(tx)) {
warn!("Failed to send DumpRib command to route manager: {}", e);
return Err(tonic::Status::internal(
"failed to communicate with route manager",
));
}
match rx.await {
Ok(result) => {
response.epoch = result.epoch;
for pathset in result.routes {
let mut proto_pathset = PathSet {
epoch: result.epoch,
prefix: Some(Prefix {
ip_prefix: pathset.nlri.prefix,
prefix_len: pathset.nlri.prefixlen.into(),
address_family: AddressFamily::IPv6.into(),
}),
paths: vec![],
};
for (_, path) in pathset.paths {
let proto_path = Path {
as_path: path.as_path,
local_pref: path.local_pref,
med: path.med,
nexthop: path.nexthop,
peer_name: path.peer_name,
};
proto_pathset.paths.push(proto_path);
}
response.path_sets.push(proto_pathset);
}
Ok(tonic::Response::new(response))
}
Err(e) => {
warn!("Failed to get response from route manager: {}", e);
return Err(tonic::Status::internal(
"failed to get response from route manager",
));
}
}
}
}
}
type StreamPathsStream = ReceiverStream<Result<PathSet, Status>>;
async fn stream_paths(
&self,
request: tonic::Request<StreamPathsRequest>,
) -> Result<Response<Self::StreamPathsStream>, Status> {
match request.get_ref().address_family {
1 => {
let (dump_tx, mut dump_rx) = mpsc::unbounded_channel();
let mut receiver = self
.get_streaming_receiver::<Ipv4Addr>(self.ip4_manager.clone(), dump_tx)
.await?;
let (tx, rx) = mpsc::channel(10_000);
// Spawn a task for receving values from the manager and send them to the peer.
tokio::spawn(async move {
// Consume the dump before moving to the streamed paths.
while let Some(next) = dump_rx.recv().await {
let pathset =
RouteServer::transform_pathset(next, AddressFamily::IPv4.into());
if let Err(e) = tx.send(Ok(pathset)).await {
warn!("Failed to send path to peer: {}", e);
return;
}
}
loop {
let next = receiver.recv().await;
if let Err(e) = next {
warn!("Failed to get next streaming route from manager: {}", e);
let _ = tx
.send(Err(tonic::Status::internal(format!(
"Failed to get next route from manager: {}",
e
))))
.await;
return;
}
let route = next.unwrap();
if let Err(e) = tx
.send(Ok(RouteServer::transform_pathset(
route,
AddressFamily::IPv4.into(),
)))
.await
{
warn!("Failed to send streaming route to peer: {}", e);
return;
}
}
});
return Ok(Response::new(ReceiverStream::new(rx)));
}
2 => {
let (dump_tx, mut dump_rx) = mpsc::unbounded_channel();
let mut receiver = self
.get_streaming_receiver::<Ipv6Addr>(self.ip6_manager.clone(), dump_tx)
.await?;
let (tx, rx) = mpsc::channel(10_000);
// Spawn a task for receving values from the manager and send them to the peer.
tokio::spawn(async move {
// Consume the dump before moving to the streamed paths.
while let Some(next) = dump_rx.recv().await {
let pathset =
RouteServer::transform_pathset(next, AddressFamily::IPv4.into());
if let Err(e) = tx.send(Ok(pathset)).await {
warn!("Failed to send path to peer: {}", e);
return;
}
}
loop {
let next = receiver.recv().await;
if let Err(e) = next {
warn!("Failed to get next streaming route from manager: {}", e);
let _ = tx
.send(Err(tonic::Status::internal(format!(
"Failed to get next route from manager: {}",
e
))))
.await;
return;
}
let route = next.unwrap();
if let Err(e) = tx
.send(Ok(RouteServer::transform_pathset(
route,
AddressFamily::IPv6.into(),
)))
.await
{
warn!("Failed to send streaming route to peer: {}", e);
return;
}
}
});
return Ok(Response::new(ReceiverStream::new(rx)));
}
_ => return Err(tonic::Status::internal("Unknown address family")),
};
}
}