This commit is contained in:
@ -20,6 +20,7 @@ use crate::path::path_data::PathData;
|
||||
use crate::path::path_set::PathSource;
|
||||
use crate::rib_manager::RouteManagerCommands;
|
||||
use crate::route_server::route_server::PeerStatus;
|
||||
|
||||
use bgp_packet::capabilities::{
|
||||
BGPCapability, BGPCapabilityTypeValues, BGPCapabilityValue, BGPOpenOptionTypeValues,
|
||||
FourByteASNCapability, MultiprotocolCapability, OpenOption, OpenOptionCapabilities,
|
||||
@ -45,10 +46,11 @@ use bgp_packet::path_attributes::{
|
||||
LargeCommunitiesPathAttribute, LargeCommunitiesPayload, MPReachNLRIPathAttribute,
|
||||
};
|
||||
use bgp_packet::traits::ParserContext;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use chrono::TimeZone;
|
||||
use chrono::{DateTime, Utc};
|
||||
use eyre::{bail, eyre};
|
||||
use eyre::{bail, eyre, Result};
|
||||
use ip_network_table_deps_treebitmap::address::Address;
|
||||
use ip_network_table_deps_treebitmap::IpLookupTable;
|
||||
use std::convert::TryFrom;
|
||||
@ -330,8 +332,9 @@ pub struct PeerStateMachine<A: Address> {
|
||||
prefixes_in: IpLookupTable<A, RouteInfo<A>>,
|
||||
|
||||
/// prefixes_out contains all the prefixes that we know of which can
|
||||
/// be sent to the peer (unfiltered).
|
||||
prefixes_out: IpLookupTable<A, Arc<PathData>>,
|
||||
/// be sent to the peer along with a boolean of whether it was actually
|
||||
/// sent or not (i.e. false if filter rejected it).
|
||||
prefixes_out: IpLookupTable<A, (bool, Arc<PathData>)>,
|
||||
|
||||
// Interface to this state machine
|
||||
pub iface_rx: mpsc::UnboundedReceiver<PeerCommands>,
|
||||
@ -449,7 +452,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_chan_msg(&mut self, c: PeerCommands) -> eyre::Result<()> {
|
||||
async fn handle_chan_msg(&mut self, c: PeerCommands) -> Result<()> {
|
||||
match c {
|
||||
PeerCommands::NewConnection(mut conn) => {
|
||||
let peer_addr = conn.peer_addr()?;
|
||||
@ -526,9 +529,10 @@ where
|
||||
self.send_notification(notification).await?
|
||||
}
|
||||
|
||||
PeerCommands::Announce(_) => {
|
||||
todo!();
|
||||
}
|
||||
PeerCommands::Announce(route_update) => match route_update {
|
||||
RouteUpdate::Announce(announcement) => todo!(),
|
||||
RouteUpdate::Withdraw(withdrawal) => todo!(),
|
||||
},
|
||||
|
||||
PeerCommands::MessageFromPeer(msg) => match self.handle_msg(msg).await {
|
||||
Ok(_) => self
|
||||
@ -606,6 +610,50 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_announcement(&mut self, nlris: Vec<NLRI>, path_data: Arc<PathData>) -> Result<()> {
|
||||
let mut update = UpdateMessage::default();
|
||||
|
||||
// TODO: The filter evaluator should use a Cow<PathData> to only clone if it is necessary.
|
||||
for nlri in &nlris {
|
||||
let addr: A = nlri.clone().try_into().map_err(|e| eyre!(e.to_string()))?;
|
||||
|
||||
let mut path_data_copy = (*path_data).clone();
|
||||
let accept = self.filter_evaluator.evaluate_out(
|
||||
&mut path_data_copy.path_attributes,
|
||||
&path_data_copy.as_path,
|
||||
nlri,
|
||||
);
|
||||
|
||||
// If this announcement is not accepted by the filter then:
|
||||
// * Check if another announcement has been made for this prefix already,
|
||||
// if so, then we need to send a withdrawal (since the peer should not
|
||||
// continue to use a stale route which we are no longer using, as would
|
||||
// be the case if we masked an update to an already announced announcement).
|
||||
// if not, then we do not need to notify the peer at all.
|
||||
// If this announcement is accepted by the filter, in all cases we send the
|
||||
// announcement to the peer.
|
||||
|
||||
if !accept {
|
||||
if self
|
||||
.prefixes_out
|
||||
.exact_match(addr, nlri.prefixlen.into())
|
||||
.is_some()
|
||||
{
|
||||
// Withdraw this NLRI from the peer.
|
||||
update.withdrawn_nlri.push(nlri.clone());
|
||||
// Mark this route as non-announced in the local cache.
|
||||
self.prefixes_out.insert(
|
||||
addr,
|
||||
nlri.prefixlen.into(),
|
||||
(false, Arc::new(path_data_copy)),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_notification(
|
||||
&mut self,
|
||||
notification: NotificationMessage,
|
||||
@ -629,7 +677,7 @@ where
|
||||
// It deallocates the resources in this peer, unsets the TCP connection, removes the
|
||||
// routes from the inner structure as well as the routes that were propagated into the
|
||||
// RIB.
|
||||
async fn connection_closed(&mut self) -> eyre::Result<()> {
|
||||
async fn connection_closed(&mut self) -> Result<()> {
|
||||
info!("Connection closed on peer {}", self.config.name);
|
||||
|
||||
// Cancel keepalive timer.
|
||||
@ -710,7 +758,7 @@ where
|
||||
|
||||
/// process_withdrawals creates a RouteUpdate from withdrawal announcments and sends
|
||||
/// them to the rib_in channel to be consumed by the route processor.
|
||||
fn process_withdrawals(&mut self, withdrawals: Vec<NLRI>) -> eyre::Result<()> {
|
||||
fn process_withdrawals(&mut self, withdrawals: Vec<NLRI>) -> Result<()> {
|
||||
let peer_id = match &self.peer_open_msg {
|
||||
Some(peer_open_msg) => peer_open_msg.identifier,
|
||||
None => bail!("Missing peer open msg"),
|
||||
@ -747,7 +795,7 @@ where
|
||||
nexthop: Vec<u8>,
|
||||
announcements: Vec<NLRI>,
|
||||
path_attributes: Vec<PathAttribute>,
|
||||
) -> eyre::Result<()> {
|
||||
) -> Result<()> {
|
||||
// Extract the as_path and med from the attributes.
|
||||
let mut as_path: Vec<u32> = vec![];
|
||||
let mut med: u32 = 0;
|
||||
@ -834,6 +882,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: There is a bug here. If a peer announces a prefix and this announcement is accepted,
|
||||
// we forward it to the route manager, however when the peer then updates this prefix announcement
|
||||
// with some attributes that cause it to be rejeceted, the route manager will still think the old
|
||||
// PathData is valid, we need to instead perform a withdrawal of that route to the route manager.
|
||||
|
||||
if accepted {
|
||||
accepted_nlris.push(announcement);
|
||||
}
|
||||
@ -851,15 +904,6 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn decide_accept_message(&mut self, _: &[PathAttribute]) -> bool {
|
||||
// TODO: Implement filtering of Update messages.
|
||||
|
||||
// TODO: Section 9.1.2 of RFC 4271:
|
||||
// * Reject the message if the next hop is not resolvable
|
||||
// * Reject the message if there is an AS loop
|
||||
true
|
||||
}
|
||||
|
||||
/// try_connect attempts to connect to a remote TCP endpoint with a given timeout.
|
||||
async fn try_connect(&mut self, timeout: Duration) -> Result<TcpStream, std::io::Error> {
|
||||
let addr = self.config.ip;
|
||||
@ -894,7 +938,7 @@ where
|
||||
}
|
||||
|
||||
/// handle_msg processes incoming messages and updates the state in PeerStateMachine.
|
||||
async fn handle_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> {
|
||||
async fn handle_msg(&mut self, msg: BGPSubmessage) -> Result<()> {
|
||||
match &self.state {
|
||||
BGPState::Idle => self.handle_idle_msg().await,
|
||||
BGPState::Active => self.handle_active_msg(msg).await,
|
||||
@ -905,18 +949,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_idle_msg(&mut self) -> eyre::Result<()> {
|
||||
async fn handle_idle_msg(&mut self) -> Result<()> {
|
||||
bail!("Peer cannot process messages when in the Idle state")
|
||||
}
|
||||
|
||||
async fn handle_active_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> {
|
||||
async fn handle_active_msg(&mut self, msg: BGPSubmessage) -> Result<()> {
|
||||
// In the active state a new connection should come in via the NewConnection
|
||||
// message on the PSM channel, or if we establish a connection out, then that
|
||||
// logic should handle the messages until OpenSent.
|
||||
bail!("Discarding message received in ACTIVE state: {:?}", msg)
|
||||
}
|
||||
|
||||
async fn handle_connect_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> {
|
||||
async fn handle_connect_msg(&mut self, msg: BGPSubmessage) -> Result<()> {
|
||||
// In the connect state a new connection should come in via the NewConnection
|
||||
// message on the PSM channel, or if we establish a connection out, then that
|
||||
// logic should handle the messages until OpenSent.
|
||||
@ -924,7 +968,7 @@ where
|
||||
}
|
||||
|
||||
// In the opensent state we still need to get the OPEN message from the peer
|
||||
async fn handle_opensent_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> {
|
||||
async fn handle_opensent_msg(&mut self, msg: BGPSubmessage) -> Result<()> {
|
||||
info!("Handling message in OpenSent state: {:?}", msg);
|
||||
match msg {
|
||||
BGPSubmessage::OpenMessage(o) => {
|
||||
@ -959,7 +1003,7 @@ where
|
||||
error_code: u8,
|
||||
error_subcode: u8,
|
||||
iface_tx: &mut mpsc::UnboundedSender<PeerCommands>,
|
||||
) -> eyre::Result<()> {
|
||||
) -> Result<()> {
|
||||
let notification = NotificationMessage {
|
||||
error_code,
|
||||
error_subcode,
|
||||
@ -1055,7 +1099,7 @@ where
|
||||
}
|
||||
|
||||
// In the openconfirm state we are waiting for a KEEPALIVE from the peer.
|
||||
async fn handle_openconfirm_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> {
|
||||
async fn handle_openconfirm_msg(&mut self, msg: BGPSubmessage) -> Result<()> {
|
||||
// In the openconfirm state we wait for a keepalive message from the peer.
|
||||
// We also compute the timer expiry time for the keepalive timer.
|
||||
// Hold time of 0 means no keepalive and hold timer.
|
||||
@ -1127,7 +1171,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn hold_timer_expired(&mut self) -> eyre::Result<()> {
|
||||
async fn hold_timer_expired(&mut self) -> Result<()> {
|
||||
let notification = NotificationMessage {
|
||||
error_code: 4,
|
||||
error_subcode: 0,
|
||||
@ -1141,7 +1185,7 @@ where
|
||||
}
|
||||
|
||||
/// Generates and sends announcements for all statically defined routes.
|
||||
async fn announce_static(&mut self) -> eyre::Result<()> {
|
||||
async fn announce_static(&mut self) -> Result<()> {
|
||||
for announcement in &self.config.announcements {
|
||||
let mut bgp_update_msg = UpdateMessage {
|
||||
withdrawn_nlri: vec![],
|
||||
@ -1241,7 +1285,7 @@ where
|
||||
}
|
||||
|
||||
// In the established state we accept Update, Keepalive and Notification messages.
|
||||
async fn handle_established_msg(&mut self, msg: BGPSubmessage) -> eyre::Result<()> {
|
||||
async fn handle_established_msg(&mut self, msg: BGPSubmessage) -> Result<()> {
|
||||
match msg {
|
||||
BGPSubmessage::UpdateMessage(u) => {
|
||||
if !self.decide_accept_message(&u.path_attributes) {
|
||||
|
||||
Reference in New Issue
Block a user