Source code for faucet.valve_route

"""Valve IPv4/IPv6 routing implementation."""

# pylint: disable=too-many-lines

# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict, deque
import random
import time

import ipaddress

from os_ken.lib.packet import arp, icmp, icmpv6, ipv4, ipv6

from faucet import valve_of
from faucet import valve_packet
from faucet.valve_manager_base import ValveManagerBase


[docs] class AnonVLAN: """The anonymous VLAN for global routing""" def __init__(self, vid): self.vid = vid
[docs] class NextHop: """Describes a directly connected (at layer 2) nexthop.""" __slots__ = [ "cache_time", "eth_src", "last_retry_time", "next_retry_time", "resolve_retries", "port", ] def __init__(self, eth_src, port, now): self.eth_src = eth_src self.port = port self.cache_time = now self.resolve_retries = 0 self.last_retry_time = None self.next_retry_time = None if not self.eth_src: self.next_retry_time = now
[docs] def age(self, now): """Return age of this nexthop.""" return now - self.cache_time
[docs] def dead(self, max_fib_retries): """Return True if this nexthop is considered dead.""" return self.resolve_retries >= max_fib_retries
[docs] def next_retry(self, now, max_resolve_backoff_time): """Increment state for next retry.""" self.resolve_retries += 1 self.last_retry_time = now self.next_retry_time = now + min( (2**self.resolve_retries + random.randint(0, self.resolve_retries)), max_resolve_backoff_time, )
[docs] def resolution_due(self, now, max_age): """Return True if this nexthop is due to be re resolved/retried.""" if self.eth_src is not None and self.age(now) < max_age: return False if self.next_retry_time is None or self.next_retry_time < now: return True return False
def __str__(self): return "%s %s" % (self.eth_src, self.port) def __repr__(self): return self.__str__()
[docs] class ValveRouteManager(ValveManagerBase): """Base class to implement RIB/FIB.""" __slots__ = [ "active", "neighbor_timeout", "dec_ttl", "fib_table", "pipeline", "multi_out", "notify", "global_vlan", "global_routing", "logger", "max_host_fib_retry_count", "max_hosts_per_resolve_cycle", "max_resolve_backoff_time", "proactive_learn", "route_priority", "routers", "vip_table", "switch_manager", ] IPV = 0 ETH_TYPE = None ICMP_TYPE = None ICMP_SIZE = None MAX_PACKET_IN_SIZE = valve_of.MAX_PACKET_IN_BYTES CONTROL_ETH_TYPES = () # type: ignore IP_PKT = None def __init__( self, logger, notify, global_vlan, neighbor_timeout, max_hosts_per_resolve_cycle, max_host_fib_retry_count, max_resolve_backoff_time, proactive_learn, dec_ttl, multi_out, fib_table, vip_table, pipeline, routers, stack_manager, ): self.notify = notify self.logger = logger self.global_vlan = AnonVLAN(global_vlan) self.neighbor_timeout = neighbor_timeout self.max_hosts_per_resolve_cycle = max_hosts_per_resolve_cycle self.max_host_fib_retry_count = max_host_fib_retry_count self.max_resolve_backoff_time = max_resolve_backoff_time self.proactive_learn = proactive_learn self.dec_ttl = dec_ttl self.multi_out = multi_out self.fib_table = fib_table self.vip_table = vip_table self.pipeline = pipeline self.route_priority = self._LPM_PRIORITY self.routers = routers self.active = False self.global_routing = self._global_routing() self.stack_manager = stack_manager if self.global_routing: self.logger.info("global routing enabled")
[docs] def notify_learn(self, pkt_meta): self.notify( { "L3_LEARN": { "eth_src": pkt_meta.eth_src, "l3_src_ip": str(pkt_meta.l3_src), "port_no": pkt_meta.port.number, "vid": pkt_meta.vlan.vid, } } )
[docs] def nexthop_dead(self, nexthop_cache_entry): """Returns true if the nexthop_cache_entry is considered dead""" return nexthop_cache_entry.dead(self.max_host_fib_retry_count)
@staticmethod def _unicast_to_vip(pkt_meta): """Return true if packet is from a src in the connected network and dst ip is a faucet vip. I.e: Packet is traffic bound for a VIP""" return ( pkt_meta.eth_dst == pkt_meta.vlan.faucet_mac and pkt_meta.vlan.from_connected_to_vip(pkt_meta.l3_src, pkt_meta.l3_dst) ) @staticmethod def _gw_resolve_pkt(): return None @staticmethod def _gw_respond_pkt(): return None def _flood_stack_links(self, pkt_builder, vlan, multi_out, *args): """Return flood packet-out actions to stack ports for gw resolving""" ofmsgs = [] if self.stack_manager: ports = [] if self.stack_manager.stack.is_root(): ports = list( self.stack_manager.away_ports - self.stack_manager.inactive_away_ports - self.stack_manager.pruned_away_ports ) else: if self.stack_manager.chosen_towards_port is not None: ports = [self.stack_manager.chosen_towards_port] if ports: running_port_nos = [port.number for port in ports if port.running()] pkt = pkt_builder(vlan.vid, *args) if running_port_nos: random.shuffle(running_port_nos) if multi_out: ofmsgs.append( valve_of.packetouts(running_port_nos, bytes(pkt.data)) ) else: ofmsgs.extend( [ valve_of.packetout(port_no, bytes(pkt.data)) for port_no in running_port_nos ] ) return ofmsgs def _resolve_gw_on_vlan(self, vlan, faucet_vip, ip_gw): """Return flood packet-out actions for gw resolving""" ofmsgs = [] stack_ofmsgs = self._flood_stack_links( self._gw_resolve_pkt(), vlan, self.multi_out, vlan.faucet_mac, valve_of.mac.BROADCAST_STR, faucet_vip.ip, ip_gw, ) if stack_ofmsgs: ofmsgs.extend(stack_ofmsgs) vlan_ofmsgs = vlan.flood_pkt( self._gw_resolve_pkt(), self.multi_out, vlan.faucet_mac, valve_of.mac.BROADCAST_STR, faucet_vip.ip, ip_gw, ) if vlan_ofmsgs: ofmsgs.extend(vlan_ofmsgs) return ofmsgs def _resolve_gw_on_port(self, vlan, port, faucet_vip, ip_gw, eth_dst): """Return packet-out actions for outputting to a specific port""" return vlan.pkt_out_port( self._gw_resolve_pkt(), port, vlan.faucet_mac, eth_dst, faucet_vip.ip, ip_gw ) def _controller_and_flood(self): """Return instructions to forward packet to l2-forwarding""" return self.pipeline.accept_to_l2_forwarding( actions=(valve_of.output_controller(max_len=self.MAX_PACKET_IN_SIZE),) ) def _resolve_vip_response(self, pkt_meta, solicited_ip, now): """Learn host requesting for router, and return packet-out ofmsgs router response""" ofmsgs = [] vlan = pkt_meta.vlan if pkt_meta.vlan.is_faucet_vip(solicited_ip) and pkt_meta.vlan.ip_in_vip_subnet( pkt_meta.l3_src ): src_ip = pkt_meta.l3_src eth_src = pkt_meta.eth_src port = pkt_meta.port if self._stateful_gw(vlan, src_ip): ofmsgs.extend(self._add_host_fib_route(vlan, src_ip, blackhole=False)) ofmsgs.extend(self._update_nexthop(now, vlan, port, eth_src, src_ip)) if ofmsgs: self.logger.info( "Resolve response to %s from %s" % (solicited_ip, pkt_meta.log()) ) ofmsgs.append( vlan.pkt_out_port( self._gw_respond_pkt(), port, vlan.faucet_mac, eth_src, solicited_ip, src_ip, ) ) return ofmsgs def _gw_advert(self, pkt_meta, target_ip, now): """Receive an advert, so update nexthop information""" ofmsgs = [] vlan = pkt_meta.vlan if vlan.ip_in_vip_subnet(target_ip): if self._stateful_gw(vlan, target_ip): ofmsgs.extend( self._update_nexthop( now, vlan, pkt_meta.port, pkt_meta.eth_src, target_ip ) ) if ofmsgs: self.logger.info( "Received advert for %s from %s" % (target_ip, pkt_meta.log()) ) return ofmsgs def _vlan_routes(self, vlan): """Return vlan routes""" return vlan.routes_by_ipv(self.IPV) def _vlan_nexthop_cache(self, vlan): """Return vlan neighbour cache""" return vlan.neigh_cache_by_ipv(self.IPV)
[docs] def expire_port_nexthops(self, port): """Expire all hosts on a port""" ofmsgs = [] now = time.time() for vlan in port.vlans(): nexthop_cache = self._vlan_nexthop_cache(vlan) dead_nexthops = [ (ip_gw, nexthop_cache_entry) for ip_gw, nexthop_cache_entry in nexthop_cache.items() if nexthop_cache_entry and nexthop_cache_entry.port and port.number == nexthop_cache_entry.port.number ] for ip_gw, nexthop_cache_entry in dead_nexthops: self.logger.info( "marking %s as a dead nexthop" % nexthop_cache_entry.eth_src ) ofmsgs.extend( self._expire_gateway_flows(ip_gw, nexthop_cache_entry, vlan, now) ) return ofmsgs
def _vlan_nexthop_cache_entry(self, vlan, ip_gw): """Return nexthop cache entry""" nexthop_cache = self._vlan_nexthop_cache(vlan) return nexthop_cache.get(ip_gw, None) def _del_vlan_nexthop_cache_entry(self, vlan, ip_gw): nexthop_cache = self._vlan_nexthop_cache(vlan) del nexthop_cache[ip_gw] def _nexthop_actions(self, eth_dst, vlan): """Return flowrule actions for fib entry""" actions = [] if self.dec_ttl: actions.append(valve_of.dec_ip_ttl()) if self.routers: actions.append(self.fib_table.set_vlan_vid(vlan.vid)) actions.extend( [ self.fib_table.set_field(eth_src=vlan.faucet_mac), self.fib_table.set_field(eth_dst=eth_dst), ] ) return tuple(actions) def _route_match(self, vlan, ip_dst): """Return vid, dst, eth_type flowrule match for fib entry""" return self.fib_table.match(vlan=vlan, eth_type=self.ETH_TYPE, nw_dst=ip_dst) def _route_priority(self, ip_dst): """Return ip dst priority""" prefixlen = ipaddress.ip_network(ip_dst).prefixlen return self.route_priority + prefixlen def _router_for_vlan(self, vlan): """Return vlan router if any""" if self.routers: for router in self.routers.values(): if vlan in router.vlans: return router return None def _routed_vlans(self, vlan): """Return vlans that have routers""" if self.global_routing: return set([self.global_vlan]) vlans = set([vlan]) if self.routers: for router in self.routers.values(): if vlan in router.vlans: vlans = vlans.union(router.vlans) return vlans @staticmethod def _stateful_gw(vlan, dst_ip): return not dst_ip.is_link_local or vlan.ip_dsts_for_ip_gw(dst_ip) def _global_routing(self): """Return true if global routing is enabled""" return self.global_vlan.vid and self.routers and len(self.routers) == 1 def _add_faucet_fib_to_vip(self, vlan, priority, faucet_vip, faucet_vip_host): """Router flowmods""" ofmsgs = [] learn_connected_priority = self.route_priority + faucet_vip.network.prefixlen faucet_mac = vlan.faucet_mac actions = None if self.global_routing: vlan_mac = valve_packet.int_in_mac(faucet_mac, vlan.vid) actions = [ self.fib_table.set_field(eth_dst=vlan_mac), self.fib_table.set_vlan_vid(self.global_vlan.vid), ] ofmsgs.extend( self.pipeline.select_packets( self.fib_table, {"eth_type": self.ETH_TYPE, "eth_dst": faucet_mac, "vlan": vlan}, actions, ) ) if self.global_routing: vlan = self.global_vlan ofmsgs.append( self.fib_table.flowmod( self._route_match(vlan, faucet_vip_host), priority=priority, inst=(self.fib_table.goto(self.vip_table),), ) ) if self.proactive_learn and not faucet_vip.ip.is_link_local: routed_vlans = self._routed_vlans(vlan) for routed_vlan in routed_vlans: ofmsgs.append( self.fib_table.flowmod( self._route_match(routed_vlan, faucet_vip), priority=learn_connected_priority, inst=(self.fib_table.goto(self.vip_table),), ) ) # Unicast ICMP to us. priority -= 1 ofmsgs.append( self.vip_table.flowcontroller( self.vip_table.match( eth_type=self.ETH_TYPE, eth_dst=faucet_mac, nw_proto=self.ICMP_TYPE, ), priority=priority, max_len=self.ICMP_SIZE, ) ) # Learn + flood other ICMP not unicast to us. priority -= 1 ofmsgs.append( self.vip_table.flowmod( self.vip_table.match( eth_type=self.ETH_TYPE, nw_proto=self.ICMP_TYPE ), priority=priority, inst=self._controller_and_flood(), ) ) # Learn from other IP traffic unicast to us. priority -= 1 ofmsgs.append( self.vip_table.flowcontroller( self.vip_table.match(eth_type=self.ETH_TYPE, eth_dst=faucet_mac), priority=priority, max_len=self.MAX_PACKET_IN_SIZE, ) ) # Learn + flood IP traffic not unicast to us. priority -= 1 ofmsgs.append( self.vip_table.flowmod( self.vip_table.match(eth_type=self.ETH_TYPE), priority=priority, inst=self._controller_and_flood(), ) ) return ofmsgs def _add_faucet_vip_nd(self, vlan, priority, faucet_vip, faucet_vip_host): raise NotImplementedError # pragma: no cover
[docs] def add_vlan(self, vlan, cold_start): """Add a VLAN.""" ofmsgs = [] # add controller IPs if configured. for faucet_vip in vlan.faucet_vips_by_ipv(self.IPV): max_prefixlen = faucet_vip.ip.max_prefixlen faucet_vip_host = self._host_from_faucet_vip(faucet_vip) priority = self.route_priority + max_prefixlen ofmsgs.extend( self._add_faucet_vip_nd(vlan, priority, faucet_vip, faucet_vip_host) ) ofmsgs.extend( self._add_faucet_fib_to_vip(vlan, priority, faucet_vip, faucet_vip_host) ) return ofmsgs
[docs] def del_vlan(self, vlan): """Delete a VLAN.""" ofmsgs = [] if vlan.faucet_vips_by_ipv: ofmsgs.append(self.fib_table.flowdel(match=self.fib_table.match(vlan=vlan))) return ofmsgs
def _add_resolved_route(self, vlan, ip_gw, ip_dst, eth_dst, is_updated): """Return flowmods for enabling routing of a resolved nexthop""" ofmsgs = [] if is_updated: self.logger.info( "Updating next hop for route %s via %s (%s) on VLAN %u" % (ip_dst, ip_gw, eth_dst, vlan.vid) ) ofmsgs.extend(self._del_route_flows(vlan, ip_dst)) else: self.logger.info( "Adding new route %s via %s (%s) on VLAN %u" % (ip_dst, ip_gw, eth_dst, vlan.vid) ) inst = self.pipeline.accept_to_l2_forwarding( actions=self._nexthop_actions(eth_dst, vlan) ) routed_vlans = self._routed_vlans(vlan) for routed_vlan in routed_vlans: in_match = self._route_match(routed_vlan, ip_dst) ofmsgs.append( self.fib_table.flowmod( in_match, priority=self._route_priority(ip_dst), inst=inst ) ) return ofmsgs def _update_nexthop_cache(self, now, vlan, eth_src, port, ip_gw): """Add information to the nexthop cache and return the new object""" nexthop = NextHop(eth_src, port, now) nexthop_cache = self._vlan_nexthop_cache(vlan) nexthop_cache[ip_gw] = nexthop return nexthop def _update_nexthop(self, now, vlan, port, eth_src, resolved_ip_gw): """Update routes where nexthop is newly resolved or changed. Args: now (float): seconds since epoch. vlan (vlan): VLAN containing this RIB/FIB. port (port): port for nexthop. eth_src (str): MAC address for nexthop. resolved_ip_gw (IPAddress): IP address for nexthop Returns: list: OpenFlow messages, if routes need to be updated. """ ofmsgs = [] cached_eth_dst = self._cached_nexthop_eth_dst(vlan, resolved_ip_gw) if cached_eth_dst != eth_src: is_updated = cached_eth_dst is not None for ip_dst in vlan.ip_dsts_for_ip_gw(resolved_ip_gw): ofmsgs.extend( self._add_resolved_route( vlan, resolved_ip_gw, ip_dst, eth_src, is_updated ) ) self._update_nexthop_cache(now, vlan, eth_src, port, resolved_ip_gw) return ofmsgs def _vlan_unresolved_nexthops(self, vlan, ip_gws, now): """Return unresolved or expired IP gateways, never tried/oldest first. Args: vlan (vlan): VLAN containing this RIB/FIB. ip_gws (list): tuple, IP gateway and controller IP in same subnet. now (float): seconds since epoch. Returns: list: prioritized list of gateways. """ vlan_nexthop_cache = self._vlan_nexthop_cache(vlan) nexthop_entries = [ (ip_gw, vlan_nexthop_cache.get(ip_gw, None)) for ip_gw in ip_gws ] not_fresh_nexthops = [ (ip_gw, entry) for ip_gw, entry in nexthop_entries if entry is None or entry.resolution_due(now, self.neighbor_timeout) ] unresolved_nexthops_by_retries = defaultdict(list) for ip_gw, entry in not_fresh_nexthops: if entry is None: entry = self._update_nexthop_cache(now, vlan, None, None, ip_gw) unresolved_nexthops_by_retries[entry.resolve_retries].append(ip_gw) unresolved_nexthops = deque() for _retries, nexthops in sorted(unresolved_nexthops_by_retries.items()): random.shuffle(nexthops) unresolved_nexthops.extend(nexthops) return unresolved_nexthops
[docs] def advertise(self, vlan): raise NotImplementedError # pragma: no cover
def _resolve_gateway_flows(self, ip_gw, nexthop_cache_entry, vlan, now): """Return packet-out ofmsgs using ARP/ND to resolve for nexthop""" faucet_vip = vlan.vip_map(ip_gw) if not faucet_vip: self.logger.info("Not resolving %s (not in connected network)" % ip_gw) return [] resolve_flows = [] last_retry_time = nexthop_cache_entry.last_retry_time nexthop_cache_entry.next_retry(now, self.max_resolve_backoff_time) if ( vlan.targeted_gw_resolution and last_retry_time is None and nexthop_cache_entry.port is not None ): port = nexthop_cache_entry.port eth_dst = nexthop_cache_entry.eth_src resolve_flows = [ self._resolve_gw_on_port(vlan, port, faucet_vip, ip_gw, eth_dst) ] else: resolve_flows = self._resolve_gw_on_vlan(vlan, faucet_vip, ip_gw) if resolve_flows: if last_retry_time is None: self.logger.info( "resolving %s (%u flows) on VLAN %u" % (ip_gw, len(resolve_flows), vlan.vid) ) else: self.logger.info( "resolving %s retry %u (last attempt was %us ago; %u flows) on VLAN %u" % ( ip_gw, nexthop_cache_entry.resolve_retries, now - last_retry_time, len(resolve_flows), vlan.vid, ) ) return resolve_flows def _expire_gateway_flows(self, ip_gw, nexthop_cache_entry, vlan, now): """Return ofmsgs deleting the expired nexthop information""" expire_flows = [] self.logger.info( "expiring dead route %s (age %us) on %s" % (ip_gw, nexthop_cache_entry.age(now), vlan) ) port = nexthop_cache_entry.port self._del_vlan_nexthop_cache_entry(vlan, ip_gw) expire_flows = self._del_host_fib_route( vlan, ipaddress.ip_network(ip_gw.exploded) ) if port is None: expire_flows = [] return expire_flows def _resolve_expire_gateway_flows(self, ip_gw, nexthop_cache_entry, vlan, now): """If cache entry is dead then delete related flows otherwise return packet-out ofmsgs to resolve nexthops""" if self.nexthop_dead(nexthop_cache_entry): return self._expire_gateway_flows(ip_gw, nexthop_cache_entry, vlan, now) return self._resolve_gateway_flows(ip_gw, nexthop_cache_entry, vlan, now) def _resolve_gateways_flows( self, resolve_handler, vlan, now, unresolved_nexthops, remaining_attempts ): """Resolve for nexthops using the resolve_handler Return packet-out ofmsgs using V4 ARP/V6 ND to resolve nexthops """ ofmsgs = [] for ip_gw in unresolved_nexthops: if remaining_attempts == 0: break entry = self._vlan_nexthop_cache_entry(vlan, ip_gw) if entry is None: continue if not entry.resolution_due(now, self.neighbor_timeout): continue resolve_flows = resolve_handler(ip_gw, entry, vlan, now) if resolve_flows: ofmsgs.extend(resolve_flows) remaining_attempts -= 1 return ofmsgs
[docs] def resolve_gateways(self, vlan, now, resolve_all=True): """Re/resolve gateways. Args: vlan (vlan): VLAN containing this RIB/FIB. now (float): seconds since epoch. resolve_all (bool): attempt to resolve all unresolved gateways. Returns: list: OpenFlow messages. """ unresolved_gateways = [] if resolve_all: unresolved_gateways = self._vlan_unresolved_nexthops( vlan, vlan.dyn_route_gws_by_ipv[self.IPV], now ) vlan.dyn_unresolved_route_ip_gws[self.IPV] = unresolved_gateways else: if vlan.dyn_unresolved_route_ip_gws[self.IPV]: unresolved_gateways = [ vlan.dyn_unresolved_route_ip_gws[self.IPV].popleft() ] return self._resolve_gateways_flows( self._resolve_gateway_flows, vlan, now, unresolved_gateways, self.max_hosts_per_resolve_cycle, )
[docs] def resolve_expire_hosts(self, vlan, now, resolve_all=True): """Re/resolve hosts. Args: vlan (vlan): VLAN containing this RIB/FIB. now (float): seconds since epoch. resolve_all (bool): attempt to resolve all unresolved gateways. Returns: list: OpenFlow messages. """ unresolved_gateways = [] if resolve_all: unresolved_gateways = self._vlan_unresolved_nexthops( vlan, vlan.dyn_host_gws_by_ipv[self.IPV], now ) vlan.dyn_unresolved_host_ip_gws[self.IPV] = unresolved_gateways else: if vlan.dyn_unresolved_host_ip_gws[self.IPV]: unresolved_gateways = [ vlan.dyn_unresolved_host_ip_gws[self.IPV].popleft() ] return self._resolve_gateways_flows( self._resolve_expire_gateway_flows, vlan, now, unresolved_gateways, self.max_hosts_per_resolve_cycle, )
def _cached_nexthop_eth_dst(self, vlan, ip_gw): """Return nexthop cache entry eth_dst for the ip_gw""" entry = self._vlan_nexthop_cache_entry(vlan, ip_gw) if entry is not None and entry.eth_src is not None: return entry.eth_src return None @staticmethod def _host_ip_to_host_int(host_ip): return ipaddress.ip_interface(ipaddress.ip_network(host_ip)) def _host_from_faucet_vip(self, faucet_vip): return self._host_ip_to_host_int(faucet_vip.ip) def _vlan_nexthop_cache_limit(self, vlan): raise NotImplementedError # pragma: no cover def _proactive_resolve_neighbor(self, now, pkt_meta): """Packet not directly destined for router but we can learn from the packet anyway""" vlan = pkt_meta.vlan dst_ip = pkt_meta.l3_dst ofmsgs = [] if self.proactive_learn: router = self._router_for_vlan(vlan) if router is None: faucet_vip = vlan.vip_map(dst_ip) else: vlan, faucet_vip = router.vip_map(dst_ip) if ( vlan and vlan.ip_in_vip_subnet(dst_ip, faucet_vip) and faucet_vip.ip != dst_ip and self._stateful_gw(vlan, dst_ip) ): limit = self._vlan_nexthop_cache_limit(vlan) if limit is None or len(self._vlan_nexthop_cache(vlan)) < limit: # TODO: avoid relearning L3 source if same L3 source tries # multiple L3 destinations quickly. ofmsgs.extend(self.add_host_fib_route_from_pkt(now, pkt_meta)) resolution_in_progress = ( dst_ip in vlan.dyn_host_gws_by_ipv[self.IPV] ) ofmsgs.extend( self._add_host_fib_route(vlan, dst_ip, blackhole=True) ) nexthop_cache_entry = self._update_nexthop_cache( now, vlan, None, None, dst_ip ) if not resolution_in_progress: resolve_flows = self._resolve_gateway_flows( dst_ip, nexthop_cache_entry, vlan, nexthop_cache_entry.cache_time, ) ofmsgs.extend(resolve_flows) return ofmsgs
[docs] def router_vlan_for_ip_gw(self, vlan, ip_gw): """Return router VLAN for IP gateway (or None). Args: vlan (vlan): VLAN containing this RIB. ip_gw (ipaddress.ip_address): IP address of nexthop. Returns: VLAN for this gateway or None. """ router = self._router_for_vlan(vlan) if router is not None: vlan, _ = router.vip_map(ip_gw) return vlan if vlan.vip_map(ip_gw): return vlan return None
[docs] def add_route(self, vlan, ip_gw, ip_dst): """Add a route to the RIB. Args: vlan (vlan): VLAN containing this RIB. ip_gw (ipaddress.ip_address): IP address of nexthop. ip_dst (ipaddress.ip_network): destination IP network. Returns: list: OpenFlow messages. """ ofmsgs = [] vlan = self.router_vlan_for_ip_gw(vlan, ip_gw) if vlan is None: self.logger.error( ( "Cannot resolve destination VLAN for gateway %s " "(not in global router?)" % ip_gw ) ) return ofmsgs if vlan.is_faucet_vip(ip_dst): return ofmsgs routes = self._vlan_routes(vlan) if routes.get(ip_dst, None) == ip_gw: return ofmsgs vlan.add_route(ip_dst, ip_gw) cached_eth_dst = self._cached_nexthop_eth_dst(vlan, ip_gw) if cached_eth_dst is not None: ofmsgs.extend( self._add_resolved_route( vlan=vlan, ip_gw=ip_gw, ip_dst=ip_dst, eth_dst=cached_eth_dst, is_updated=False, ) ) return ofmsgs
def _add_host_fib_route(self, vlan, host_ip, blackhole=False): """Add a host FIB route. Args: vlan (vlan): VLAN containing this RIB. host_ip (ipaddress.ip_address): IP address of host. Returns: list: OpenFlow messages. """ ofmsgs = [] if blackhole: priority = self._route_priority(host_ip) host_int = self._host_ip_to_host_int(host_ip) timeout = ( self.max_resolve_backoff_time * self.max_host_fib_retry_count + random.randint(0, self.max_resolve_backoff_time * 2) ) routed_vlans = self._routed_vlans(vlan) for routed_vlan in routed_vlans: in_match = self._route_match(routed_vlan, host_int) ofmsgs.append( self.fib_table.flowmod( in_match, priority=priority, hard_timeout=timeout ) ) host_route = ipaddress.ip_network(host_ip.exploded) ofmsgs.extend(self.add_route(vlan, host_ip, host_route)) return ofmsgs def _del_host_fib_route(self, vlan, host_ip): """Delete a host FIB route. Args: vlan (vlan): VLAN containing this RIB. host_ip (ipaddress.ip_address): IP address of host. Returns: list: OpenFlow messages. """ host_route = ipaddress.ip_network(host_ip.exploded) return self.del_route(vlan, host_route) def _ip_pkt(self, pkt): """Return an IP packet from an Ethernet packet. Args: pkt: ryu.lib.packet from host. Returns: IP ryu.lib.packet parsed from pkt. """ return pkt.get_protocol(self.IP_PKT)
[docs] def add_host_fib_route_from_pkt(self, now, pkt_meta): """Add a host FIB route given packet from host. Args: now (float): seconds since epoch. pkt_meta (PacketMeta): received packet. Returns: list: OpenFlow messages. """ src_ip = pkt_meta.l3_src ofmsgs = [] if ( src_ip and pkt_meta.vlan.ip_in_vip_subnet(src_ip) and self._stateful_gw(pkt_meta.vlan, src_ip) ): ip_pkt = self._ip_pkt(pkt_meta.pkt) if ip_pkt: ofmsgs.extend( self._add_host_fib_route(pkt_meta.vlan, src_ip, blackhole=False) ) ofmsgs.extend( self._update_nexthop( now, pkt_meta.vlan, pkt_meta.port, pkt_meta.eth_src, src_ip ) ) return ofmsgs
def _del_route_flows(self, vlan, ip_dst): """Delete all flows matching the vlan and ip_dst""" ofmsgs = [] routed_vlans = self._routed_vlans(vlan) for routed_vlan in routed_vlans: route_match = self._route_match(routed_vlan, ip_dst) ofmsgs.append( self.fib_table.flowdel( route_match, priority=self._route_priority(ip_dst), strict=True ) ) return ofmsgs
[docs] def del_route(self, vlan, ip_dst): """Delete a route from the RIB. Only one route with this exact destination is supported. Args: vlan (vlan): VLAN containing this RIB. ip_dst (ipaddress.ip_network): destination IP network. Returns: list: OpenFlow messages. """ ofmsgs = [] if vlan.is_faucet_vip(ip_dst): return ofmsgs routes = self._vlan_routes(vlan) if ip_dst in routes: vlan.del_route(ip_dst) ofmsgs.extend(self._del_route_flows(vlan, ip_dst)) return ofmsgs
[docs] def control_plane_handler(self, now, pkt_meta): return self._proactive_resolve_neighbor(now, pkt_meta)
[docs] class ValveIPv4RouteManager(ValveRouteManager): """Implement IPv4 RIB/FIB.""" IPV = 4 ETH_TYPE = valve_of.ether.ETH_TYPE_IP ICMP_TYPE = valve_of.inet.IPPROTO_ICMP ICMP_SIZE = valve_packet.VLAN_ICMP_ECHO_REQ_SIZE CONTROL_ETH_TYPES = (valve_of.ether.ETH_TYPE_IP, valve_of.ether.ETH_TYPE_ARP) # type: ignore IP_PKT = ipv4.ipv4
[docs] def advertise(self, _vlan): return []
@staticmethod def _gw_resolve_pkt(): return valve_packet.arp_request @staticmethod def _gw_respond_pkt(): return valve_packet.arp_reply def _vlan_nexthop_cache_limit(self, vlan): return vlan.proactive_arp_limit def _add_faucet_vip_nd(self, vlan, priority, faucet_vip, faucet_vip_host): ofmsgs = [] # ARP ofmsgs.extend( self.pipeline.select_packets( self.vip_table, {"eth_type": valve_of.ether.ETH_TYPE_ARP, "vlan": vlan} ) ) # ARP for FAUCET VIP ofmsgs.append( self.vip_table.flowcontroller( self.vip_table.match( eth_type=valve_of.ether.ETH_TYPE_ARP, eth_dst=valve_of.mac.BROADCAST_STR, nw_dst=faucet_vip_host, ), priority=priority, max_len=valve_packet.VLAN_ARP_PKT_SIZE, ) ) # ARP reply to FAUCET VIP ofmsgs.append( self.vip_table.flowcontroller( self.vip_table.match( eth_type=valve_of.ether.ETH_TYPE_ARP, eth_dst=vlan.faucet_mac ), priority=priority, max_len=valve_packet.VLAN_ARP_PKT_SIZE, ) ) priority -= 1 # Other ARP ofmsgs.append( self.vip_table.flowmod( self.vip_table.match(eth_type=valve_of.ether.ETH_TYPE_ARP), priority=priority, inst=self.pipeline.accept_to_l2_forwarding(), ) ) return ofmsgs def _control_plane_arp_handler(self, now, pkt_meta): """Handle ARP packets destined for the router""" ofmsgs = [] if not pkt_meta.eth_type == valve_of.ether.ETH_TYPE_ARP: return ofmsgs arp_pkt = pkt_meta.pkt.get_protocol(arp.arp) if arp_pkt is None: return ofmsgs opcode = arp_pkt.opcode if opcode == arp.ARP_REQUEST: if pkt_meta.eth_dst in ( valve_of.mac.BROADCAST_STR, pkt_meta.vlan.faucet_mac, ): ofmsgs.extend( self._resolve_vip_response(pkt_meta, pkt_meta.l3_dst, now) ) elif opcode == arp.ARP_REPLY: if pkt_meta.eth_dst == pkt_meta.vlan.faucet_mac: ofmsgs.extend(self._gw_advert(pkt_meta, pkt_meta.l3_src, now)) self.notify_learn(pkt_meta) return ofmsgs def _control_plane_icmp_handler(self, now, pkt_meta, ipv4_pkt): """Handle ICMP packets destined for the router""" ofmsgs = [] if ipv4_pkt.proto != valve_of.inet.IPPROTO_ICMP: return ofmsgs if self._unicast_to_vip(pkt_meta): pkt_meta.reparse_all() icmp_pkt = pkt_meta.pkt.get_protocol(icmp.icmp) if icmp_pkt is None: return ofmsgs if icmp_pkt.type == icmp.ICMP_ECHO_REQUEST: ofmsgs.append( pkt_meta.vlan.pkt_out_port( valve_packet.echo_reply, pkt_meta.port, pkt_meta.vlan.faucet_mac, pkt_meta.eth_src, pkt_meta.l3_dst, pkt_meta.l3_src, icmp_pkt.data, ) ) # ping but no previous ARP request for FAUCET VIP # from this host. Missed ARP request or host has # static ARP entry for us? if self._cached_nexthop_eth_dst(pkt_meta.vlan, pkt_meta.l3_src) is None: ofmsgs.extend(self.add_host_fib_route_from_pkt(now, pkt_meta)) return ofmsgs def _control_plane_time_exceeded_handler(self, pkt_meta, ipv4_pkt): """Handle ICMP TTL expired packets""" ofmsgs = [] src_ip = pkt_meta.l3_src vlan = pkt_meta.vlan faucet_vip = vlan.ip_in_vip_subnet(src_ip) if faucet_vip is None: return ofmsgs pkt_meta.reparse_all() # get offset to L3 header l3_offset = pkt_meta.l3_offset() if l3_offset is None: return ofmsgs l4_offset = l3_offset + valve_packet.IPV4_HEADER_SIZE # need to embed layer 3 header + 8 bytes of layer 4 header in time exceeded response data = pkt_meta.pkt.data[l3_offset : l4_offset + 8] ofmsgs.append( pkt_meta.vlan.pkt_out_port( valve_packet.time_exceeded, pkt_meta.port, pkt_meta.vlan.faucet_mac, pkt_meta.eth_src, faucet_vip.ip, pkt_meta.l3_src, data, ) ) return ofmsgs
[docs] def control_plane_handler(self, now, pkt_meta): """Handle packets destined for router otherwise proactively learn host information""" if pkt_meta.packet_complete(): arp_replies = self._control_plane_arp_handler(now, pkt_meta) if arp_replies: return arp_replies ipv4_pkt = self._ip_pkt(pkt_meta.pkt) if ipv4_pkt is None: return [] if pkt_meta.reason == valve_of.ofp.OFPR_INVALID_TTL: time_exceeded_replies = self._control_plane_time_exceeded_handler( pkt_meta, ipv4_pkt ) if time_exceeded_replies: return time_exceeded_replies return [] icmp_replies = self._control_plane_icmp_handler(now, pkt_meta, ipv4_pkt) if icmp_replies: return icmp_replies return super().control_plane_handler(now, pkt_meta)
[docs] class ValveIPv6RouteManager(ValveRouteManager): """Implement IPv6 FIB.""" IPV = 6 ETH_TYPE = valve_of.ether.ETH_TYPE_IPV6 ICMP_TYPE = valve_of.inet.IPPROTO_ICMPV6 ICMP_SIZE = valve_packet.VLAN_ICMP6_ECHO_REQ_SIZE CONTROL_ETH_TYPES = (valve_of.ether.ETH_TYPE_IPV6,) # type: ignore IP_PKT = ipv6.ipv6 @staticmethod def _gw_resolve_pkt(): return valve_packet.nd_request @staticmethod def _gw_respond_pkt(): return valve_packet.nd_advert def _vlan_nexthop_cache_limit(self, vlan): return vlan.proactive_nd_limit def _add_faucet_vip_nd(self, vlan, priority, faucet_vip, faucet_vip_host): faucet_vip_host_nd_mcast = valve_packet.ipv6_link_eth_mcast( valve_packet.ipv6_solicited_node_from_ucast(faucet_vip.ip) ) ofmsgs = [] # RA if this is a link local FAUCET VIP if faucet_vip.ip.is_link_local: match = { "eth_type": self.ETH_TYPE, "eth_dst": valve_packet.IPV6_ALL_ROUTERS_MCAST, "vlan": vlan, } ofmsgs.extend(self.pipeline.select_packets(self.vip_table, match)) ofmsgs.append( self.vip_table.flowmod( self.vip_table.match( eth_type=self.ETH_TYPE, eth_dst=valve_packet.IPV6_ALL_ROUTERS_MCAST, nw_proto=valve_of.inet.IPPROTO_ICMPV6, icmpv6_type=icmpv6.ND_ROUTER_SOLICIT, ), priority=priority, inst=self._controller_and_flood(), ) ) # IPv6 ping unicast to FAUCET ofmsgs.append( self.vip_table.flowcontroller( self.vip_table.match( eth_type=self.ETH_TYPE, eth_dst=vlan.faucet_mac, nw_proto=valve_of.inet.IPPROTO_ICMPV6, icmpv6_type=icmpv6.ICMPV6_ECHO_REQUEST, ), priority=priority, max_len=self.ICMP_SIZE, ) ) # IPv6 NA unicast to FAUCET. ofmsgs.append( self.vip_table.flowcontroller( self.vip_table.match( eth_type=self.ETH_TYPE, eth_dst=vlan.faucet_mac, nw_proto=valve_of.inet.IPPROTO_ICMPV6, icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT, ), priority=priority, max_len=self.ICMP_SIZE, ) ) # IPv6 NS for FAUCET VIP match = { "eth_type": self.ETH_TYPE, "eth_dst": faucet_vip_host_nd_mcast, "vlan": vlan, } ofmsgs.extend(self.pipeline.select_packets(self.vip_table, match)) ofmsgs.append( self.vip_table.flowmod( self.vip_table.match( eth_type=self.ETH_TYPE, eth_dst=faucet_vip_host_nd_mcast, nw_proto=valve_of.inet.IPPROTO_ICMPV6, icmpv6_type=icmpv6.ND_NEIGHBOR_SOLICIT, ), priority=priority, inst=self._controller_and_flood(), ) ) return ofmsgs def _add_faucet_fib_to_vip(self, vlan, priority, faucet_vip, faucet_vip_host): ofmsgs = super()._add_faucet_fib_to_vip( vlan, priority, faucet_vip, faucet_vip_host ) faucet_vip_broadcast = ipaddress.IPv6Interface( faucet_vip.network.broadcast_address ) if self.global_routing: vlan = self.global_vlan ofmsgs.append( self.fib_table.flowmod( self._route_match(vlan, faucet_vip_broadcast), priority=priority, inst=(self.fib_table.goto(self.vip_table),), ) ) return ofmsgs def _nd_solicit_handler(self, now, pkt_meta, _ipv6_pkt, icmpv6_pkt): ofmsgs = [] solicited_ip = ipaddress.ip_address(icmpv6_pkt.data.dst) ofmsgs.extend(self._resolve_vip_response(pkt_meta, solicited_ip, now)) self.notify_learn(pkt_meta) return ofmsgs def _nd_advert_handler(self, now, pkt_meta, _ipv6_pkt, icmpv6_pkt): ofmsgs = [] target_ip = ipaddress.ip_address(icmpv6_pkt.data.dst) ofmsgs.extend(self._gw_advert(pkt_meta, target_ip, now)) self.notify_learn(pkt_meta) return ofmsgs def _router_solicit_handler(self, _now, pkt_meta, _ipv6_pkt, _icmpv6_pkt): ofmsgs = [] link_local_vips, other_vips = pkt_meta.vlan.link_and_other_vips(self.IPV) for vip in link_local_vips: if pkt_meta.l3_src in vip.network: ofmsgs.append( pkt_meta.vlan.pkt_out_port( valve_packet.router_advert, pkt_meta.port, pkt_meta.vlan.faucet_mac, pkt_meta.eth_src, vip.ip, pkt_meta.l3_src, other_vips, ) ) self.logger.info( "Responded to RS solicit from %s (%s)" % (pkt_meta.l3_src, pkt_meta.log()) ) break return ofmsgs def _echo_request_handler(self, now, pkt_meta, ipv6_pkt, icmpv6_pkt): ofmsgs = [] if self._unicast_to_vip(pkt_meta): ofmsgs.append( pkt_meta.vlan.pkt_out_port( valve_packet.icmpv6_echo_reply, pkt_meta.port, pkt_meta.vlan.faucet_mac, pkt_meta.eth_src, pkt_meta.l3_dst, pkt_meta.l3_src, ipv6_pkt.hop_limit, icmpv6_pkt.data.id, icmpv6_pkt.data.seq, icmpv6_pkt.data.data, ) ) # ping but no previous ND request for FAUCET VIP # from this host. Missed ND request or host has # static ND entry for us? if self._cached_nexthop_eth_dst(pkt_meta.vlan, pkt_meta.l3_src) is None: ofmsgs.extend(self.add_host_fib_route_from_pkt(now, pkt_meta)) return ofmsgs _icmpv6_handlers = { icmpv6.ND_NEIGHBOR_SOLICIT: (_nd_solicit_handler, icmpv6.nd_neighbor, 32), icmpv6.ND_NEIGHBOR_ADVERT: (_nd_advert_handler, icmpv6.nd_neighbor, 32), icmpv6.ND_ROUTER_SOLICIT: (_router_solicit_handler, None, 32), icmpv6.ICMPV6_ECHO_REQUEST: (_echo_request_handler, icmpv6.echo, 96), } def _control_plane_icmpv6_handler(self, now, pkt_meta, ipv6_pkt): """Handle ICMPv6 packets destined for router""" ofmsgs = [] # Must be ICMPv6 and have no extended headers. if ipv6_pkt.nxt != valve_of.inet.IPPROTO_ICMPV6: return ofmsgs if ipv6_pkt.ext_hdrs: return ofmsgs src_ip = pkt_meta.l3_src vlan = pkt_meta.vlan if not vlan.ip_in_vip_subnet(src_ip): return ofmsgs reparse_size = 32 pkt_meta.reparse_ip(payload=reparse_size) icmpv6_pkt = pkt_meta.pkt.get_protocol(icmpv6.icmpv6) if icmpv6_pkt is None: return ofmsgs icmpv6_type = icmpv6_pkt.type_ if ( ipv6_pkt.hop_limit != valve_packet.IPV6_MAX_HOP_LIM and icmpv6_type != icmpv6.ICMPV6_ECHO_REQUEST ): return ofmsgs handler, payload_type, type_reparse_size = self._icmpv6_handlers.get( icmpv6_type, (None, None, None) ) if handler is not None and ( payload_type is None or isinstance(icmpv6_pkt.data, payload_type) ): if type_reparse_size != reparse_size: pkt_meta.reparse_ip(payload=type_reparse_size) icmpv6_pkt = pkt_meta.pkt.get_protocol(icmpv6.icmpv6) ofmsgs = handler(self, now, pkt_meta, ipv6_pkt, icmpv6_pkt) return ofmsgs def _control_plane_time_exceeded_handler(self, pkt_meta, ipv6_pkt): """Handle ICMPv6 TTL expired packets""" ofmsgs = [] # Must have no extended headers. if ipv6_pkt.ext_hdrs: return ofmsgs src_ip = pkt_meta.l3_src vlan = pkt_meta.vlan faucet_vip = vlan.ip_in_vip_subnet(src_ip) if faucet_vip is None: return ofmsgs reparse_size = 32 pkt_meta.reparse_ip(payload=reparse_size) # get offset to L3 header l3_offset = pkt_meta.l3_offset() if l3_offset is None: return ofmsgs l4_offset = l3_offset + valve_packet.IPV6_HEADER_SIZE # pad out 4 reserved bytes data = b"\00" * 4 # need to embed layer 3 header + layer 4 packet # (without exceeding ipv6 minimum MTU) # in time exceeded response data += pkt_meta.pkt.data[l3_offset : l4_offset + 512] ofmsgs.append( pkt_meta.vlan.pkt_out_port( valve_packet.icmpv6_time_exceeded, pkt_meta.port, pkt_meta.vlan.faucet_mac, pkt_meta.eth_src, faucet_vip.ip, pkt_meta.l3_src, data, ) ) return ofmsgs
[docs] def control_plane_handler(self, now, pkt_meta): """Resolve packets destined for router or proactively learn host information""" if pkt_meta.packet_complete(): ipv6_pkt = self._ip_pkt(pkt_meta.pkt) if ipv6_pkt is None: return [] if pkt_meta.reason == valve_of.ofp.OFPR_INVALID_TTL: time_exceeded_replies = self._control_plane_time_exceeded_handler( pkt_meta, ipv6_pkt ) if time_exceeded_replies: return time_exceeded_replies return [] icmp_replies = self._control_plane_icmpv6_handler(now, pkt_meta, ipv6_pkt) if icmp_replies: return icmp_replies return super().control_plane_handler(now, pkt_meta)
[docs] def advertise(self, vlan): ofmsgs = [] link_local_vips, other_vips = vlan.link_and_other_vips(self.IPV) for link_local_vip in link_local_vips: # https://tools.ietf.org/html/rfc4861#section-6.1.2 ofmsgs.extend( vlan.flood_pkt( valve_packet.router_advert, self.multi_out, vlan.faucet_mac, valve_packet.IPV6_ALL_NODES_MCAST, link_local_vip.ip, valve_packet.IPV6_ALL_NODES, other_vips, ) ) return ofmsgs