From 661946b2f87f24084eedfa308e1a05ed27747499 Mon Sep 17 00:00:00 2001 From: Rayhaan Jaufeerally Date: Sun, 18 Aug 2024 10:42:42 +0000 Subject: [PATCH] Add utility binary for testing --- bin/Cargo.toml | 4 + bin/src/streamer_cli/main.rs | 164 ----------------------------- bin/src/util/main.rs | 85 +++++++++++++++ crates/route_client/src/netlink.rs | 24 ++--- 4 files changed, 95 insertions(+), 182 deletions(-) delete mode 100644 bin/src/streamer_cli/main.rs create mode 100644 bin/src/util/main.rs diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 8ba558b..69686a4 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -29,3 +29,7 @@ path = "src/bgp_server/main.rs" [[bin]] name = "client" path = "src/client/main.rs" + +[[bin]] +name = "util" +path = "src/util/main.rs" diff --git a/bin/src/streamer_cli/main.rs b/bin/src/streamer_cli/main.rs deleted file mode 100644 index 7209c69..0000000 --- a/bin/src/streamer_cli/main.rs +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2021 Rayhaan Jaufeerally. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use bgpd::bgp_packet::constants::AddressFamilyIdentifier; -use bgpd::bgp_packet::nlri::NLRI; -use bgpd::server::route_server::route_server::route_service_client::RouteServiceClient; -use bgpd::server::route_server::route_server::DumpPathsRequest; -use bgpd::server::route_server::route_server::PathSet; -use bgpd::server::route_server::route_server::StreamPathsRequest; -use clap::Parser; -use std::process::exit; -use std::time::Duration; -use tokio::task::JoinHandle; -use tonic::transport::Endpoint; -use tracing::{info, warn}; - -extern crate clap; - -#[derive(clap::Parser)] -#[clap( - author = "Rayhaan Jaufeerally ", - version = "0.1", - about = "A utility for dumping routes from the bgp_server." -)] -struct Cli { - server_address: String, -} - -#[tokio::main] -async fn main() -> Result<(), String> { - let subscriber = tracing_subscriber::fmt(); - - match subscriber.try_init() { - Ok(()) => {} - Err(e) => { - eprintln!("Failed to initialize logger: {:?}", e); - exit(1); - } - } - - let cli = Cli::parse(); - - info!("Starting client"); - let grpc_endpoint = cli.server_address; - let endpoint = Endpoint::from_shared(grpc_endpoint) - .map_err(|e| e.to_string())? - .keep_alive_timeout(Duration::from_secs(10)); - let mut client = RouteServiceClient::connect(endpoint) - .await - .map_err(|e| e.to_string())?; - - info!("Connected"); - - // 1. First subscribe to the route feed and put these into an unbounded channel. - let (stream_tx, mut stream_rx) = tokio::sync::mpsc::unbounded_channel::(); - let request = StreamPathsRequest { - address_family: 2_i32, - }; - - let mut client_copy = client.clone(); - let _recv_handle: JoinHandle> = tokio::spawn(async move { - let mut rpc_stream = client_copy - .stream_paths(request) - .await - .map_err(|e| e.to_string())? - .into_inner(); - while let Some(route) = rpc_stream.message().await.map_err(|e| e.to_string())? { - stream_tx.send(route).map_err(|e| e.to_string())?; - } - Err("Stream closed".to_string()) - }); - - // 2. Dump the whole RIB - let dump_request = DumpPathsRequest { - address_family: 2_i32, - }; - let dump_response = client.dump_paths(dump_request).await.unwrap().into_inner(); - let dump_epoch = dump_response.epoch; - - info!("Dump epoch was: {}", dump_epoch); - - let overrun_slot: Option; - loop { - let item = stream_rx.recv().await; - match &item { - Some(pathset) => { - if pathset.epoch >= dump_epoch { - overrun_slot = Some(pathset.clone()); - break; - } else { - info!("Skipping already-dumped epoch: {}", pathset.epoch); - } - } - None => { - return Err("Stream unexpectedly closed".to_owned()); - } - } - } - - // Replay all the pathsets from dump_response - for pathset in dump_response.path_sets { - info!("Got pathset: {:?}", pathset); - // Parse an NLRI from the pathset - if let Some(prefix) = &pathset.prefix { - let nlri = NLRI::from_bytes( - AddressFamilyIdentifier::Ipv6, - prefix.ip_prefix.clone(), - prefix.prefix_len as u8, - ) - .unwrap(); - info!("Parsed NLRI: {}", nlri.to_string()); - } - } - - // Replay the overrun slot - if let Some(pathset) = overrun_slot { - if let Some(prefix) = &pathset.prefix { - let nlri = NLRI::from_bytes( - AddressFamilyIdentifier::Ipv6, - prefix.ip_prefix.clone(), - prefix.prefix_len as u8, - ) - .unwrap(); - info!("Parsed NLRI: {}", nlri.to_string()); - } - } - - loop { - let item = stream_rx.recv().await; - - match &item { - Some(pathset) => { - info!("Got pathset: {:?}", pathset); - // Parse an NLRI from the pathset - if let Some(prefix) = &pathset.prefix { - let nlri = NLRI::from_bytes( - AddressFamilyIdentifier::Ipv6, - prefix.ip_prefix.clone(), - prefix.prefix_len as u8, - ) - .unwrap(); - info!("Parsed NLRI: {}", nlri.to_string()); - } - } - None => { - warn!("stream_rx closed"); - break; - } - } - } - - Err("Program exited unexpectedly.".to_owned()) -} diff --git a/bin/src/util/main.rs b/bin/src/util/main.rs new file mode 100644 index 0000000..bd9957d --- /dev/null +++ b/bin/src/util/main.rs @@ -0,0 +1,85 @@ +use std::net::IpAddr; + +use bgp_packet::nlri::NLRI; +use clap::{Parser, Subcommand}; +use eyre::{bail, Result}; +use route_client::netlink::NetlinkConnector; +use route_client::southbound_interface::SouthboundInterface; +use tracing::info; + +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::EnvFilter; + +#[derive(Parser)] +#[clap( + author = "Rayhaan Jaufeerally ", + version = "0.1", + about = "Misc routing utilities" +)] +struct Cli { + #[clap(subcommand)] + command: Option, +} + +#[derive(Subcommand)] +enum Commands { + /// AddRoute installs the routes received into the kernel routing table. + AddRoute { + /// The destination IP prefix. + prefix: String, + /// The next hop for sending traffic to the prefix. + nexthop: String, + /// Table to add route into. + #[arg(default_value_t = 201)] + rt_table: u32, + }, + /// DelRoute installs the routes received into the kernel routing table. + DelRoute { + /// The destination IP prefix. + prefix: String, + /// The next hop for sending traffic to the prefix. + nexthop: String, + /// Table to add route into. + #[arg(default_value_t = 201)] + rt_table: u32, + }, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Cli::parse(); + + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("Starting route client"); + + match args.command { + Some(Commands::AddRoute { + prefix, + nexthop, + rt_table, + }) => { + let mut handle = NetlinkConnector::new(Some(rt_table)).await?; + let prefix: NLRI = NLRI::try_from(prefix.as_str())?; + let nexthop: IpAddr = nexthop.parse()?; + handle.route_add(prefix.afi, prefix, nexthop).await?; + } + Some(Commands::DelRoute { + prefix, + nexthop, + rt_table, + }) => { + let mut handle = NetlinkConnector::new(Some(rt_table)).await?; + let prefix: NLRI = NLRI::try_from(prefix.as_str())?; + let nexthop: IpAddr = nexthop.parse()?; + handle.route_del(prefix, nexthop).await?; + } + None => bail!("A subcommand must be specified."), + }; + + Ok(()) +} diff --git a/crates/route_client/src/netlink.rs b/crates/route_client/src/netlink.rs index 2e76444..fe6f000 100644 --- a/crates/route_client/src/netlink.rs +++ b/crates/route_client/src/netlink.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use bgp_packet::{constants::AddressFamilyIdentifier, nlri::NLRI}; -use eyre::Result; +use eyre::{bail, Result}; use futures::TryStreamExt; use netlink_packet_route::route::RouteAddress; use netlink_packet_route::route::RouteAttribute; @@ -10,8 +10,8 @@ use netlink_packet_route::route::RouteProtocol; use netlink_packet_route::route::RouteType; use netlink_packet_route::AddressFamily as NetlinkAddressFamily; use rtnetlink::IpVersion; +use std::convert::TryInto; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use std::{convert::TryInto, io::ErrorKind}; use super::southbound_interface::SouthboundInterface; @@ -50,19 +50,13 @@ impl SouthboundInterface for NetlinkConnector { let addr: Ipv6Addr = match prefix.try_into()? { IpAddr::V6(addr) => addr, _ => { - return Err(eyre::Error::from(std::io::Error::new( - ErrorKind::InvalidInput, - "Got non-IPv6 address from NLRI", - ))) + bail!("Got non IPv6 address from NLRI") } }; let gw_addr: Ipv6Addr = match nexthop.clone().try_into()? { IpAddr::V6(addr) => addr, _ => { - return Err(eyre::Error::from(std::io::Error::new( - ErrorKind::InvalidInput, - "Got non-IPv6 gateway for IPv6 NLRI", - ))) + bail!("Got non IPv6 address from nexthop"); } }; let mut mutation = route @@ -80,19 +74,13 @@ impl SouthboundInterface for NetlinkConnector { let addr: Ipv4Addr = match prefix.clone().try_into()? { IpAddr::V4(addr) => addr, _ => { - return Err(eyre::Error::from(std::io::Error::new( - ErrorKind::InvalidInput, - "Got non-IPv4 address from NLRI", - ))) + bail!("Got non-IPv4 address from NLRI") } }; let gw_addr = match nexthop.clone().try_into()? { IpAddr::V4(addr) => addr, _ => { - return Err(eyre::Error::from(std::io::Error::new( - ErrorKind::InvalidInput, - "Got non-IPv4 gateway for IPv4 NLRI", - ))) + bail!("Got non IPv4 address from nexthop"); } }; let mut mutation = route