Source code for faucet.faucet

"""OSKenApp shim between Ryu and Valve."""

# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=using-constant-test,wrong-import-order,wrong-import-position
import eventlet

if True:  # A trick to satisfy linting for E402
    eventlet.monkey_patch()

import time

from functools import partial

from os_ken.controller.handler import CONFIG_DISPATCHER
from os_ken.controller.handler import MAIN_DISPATCHER
from os_ken.controller.handler import set_ev_cls
from os_ken.controller import dpset
from os_ken.controller import event
from os_ken.controller import ofp_event
from os_ken.lib import hub

from faucet.valve_ryuapp import EventReconfigure, OSKenAppBase
from faucet.valve_util import dpid_log, kill_on_exception
from faucet import faucet_event
from faucet import faucet_bgp
from faucet import faucet_dot1x
from faucet import valves_manager
from faucet import faucet_metrics
from faucet import valve_of


EXPORT_RYU_CONFIGS = ["echo_request_interval", "maximum_unreplied_echo_requests"]


[docs] class EventFaucetMaintainStackRoot( # pylint: disable=too-few-public-methods event.EventBase ): """Event used to maintain stack root."""
[docs] class EventFaucetMetricUpdate( # pylint: disable=too-few-public-methods event.EventBase ): """Event used to trigger update of metrics."""
[docs] class EventFaucetResolveGateways( # pylint: disable=too-few-public-methods event.EventBase ): """Event used to trigger gateway re/resolution."""
[docs] class EventFaucetStateExpire(event.EventBase): # pylint: disable=too-few-public-methods """Event used to trigger expiration of state in controller."""
[docs] class EventFaucetFastStateExpire( # pylint: disable=too-few-public-methods event.EventBase ): """Event used to trigger fast expiration of state in controller."""
[docs] class EventFaucetAdvertise(event.EventBase): # pylint: disable=too-few-public-methods """Event used to trigger periodic network advertisements (eg IPv6 RAs)."""
[docs] class EventFaucetFastAdvertise( # pylint: disable=too-few-public-methods event.EventBase ): """Event used to trigger periodic fast network advertisements (eg LACP)."""
[docs] class EventFaucetEventSockHeartbeat( # pylint: disable=too-few-public-methods event.EventBase ): """Event used to trigger periodic events on event sock, causing it to raise an exception if conn is broken. """
[docs] class Faucet(OSKenAppBase): """A OSKenApp that implements an L2/L3 learning VLAN switch. Valve provides the switch implementation; this is a shim for the Ryu event handling framework to interface with Valve. """ _CONTEXTS = { "dpset": dpset.DPSet, } _VALVE_SERVICES = { EventFaucetMetricUpdate: (None, 5), EventFaucetResolveGateways: ("resolve_gateways", 2), EventFaucetStateExpire: ("state_expire", 5), EventFaucetFastStateExpire: ("fast_state_expire", 2), EventFaucetAdvertise: ("advertise", 15), EventFaucetFastAdvertise: ("fast_advertise", 5), } logname = "faucet" exc_logname = logname + ".exception" bgp = None notifier = None valves_manager = None event_socket_heartbeat_time = 0 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.prom_client = faucet_metrics.FaucetMetrics(reg=self._reg) self.bgp = faucet_bgp.FaucetBgp( self.logger, self.exc_logname, self.prom_client, self._send_flow_msgs ) self.dot1x = faucet_dot1x.FaucetDot1x( self.logger, self.exc_logname, self.prom_client, self._send_flow_msgs ) self.notifier = faucet_event.FaucetEventNotifier( self.get_setting("EVENT_SOCK"), self.prom_client, self.logger ) self.valves_manager = valves_manager.ValvesManager( self.logname, self.logger, self.prom_client, self.notifier, self.bgp, self.dot1x, self.get_setting("CONFIG_AUTO_REVERT"), self._send_flow_msgs, ) self.thread_managers = (self.bgp, self.dot1x, self.prom_client, self.notifier) self.event_sock_hrtbeat_time = int( self.get_setting("EVENT_SOCK_HEARTBEAT") or 0 ) if self.event_sock_hrtbeat_time > 0: self._VALVE_SERVICES[EventFaucetEventSockHeartbeat] = ( "event_sock_heartbeat", self.event_sock_hrtbeat_time, ) self.stack_root_state_update_time = int( self.get_setting("STACK_ROOT_STATE_UPDATE_TIME") or 0 ) if self.stack_root_state_update_time: self._VALVE_SERVICES[EventFaucetMaintainStackRoot] = ( None, self.stack_root_state_update_time, ) @kill_on_exception(exc_logname) def _check_thread_exception(self): super()._check_thread_exception() def _export_ryu_config(self): for opt_name in EXPORT_RYU_CONFIGS: value = int(getattr(self.CONF, opt_name)) config_labels = dict(param=opt_name) self.prom_client.ryu_config.labels(**config_labels).set(value)
[docs] @kill_on_exception(exc_logname) def start(self): super().start() # Start Prometheus prom_port = int(self.get_setting("PROMETHEUS_PORT")) prom_addr = self.get_setting("PROMETHEUS_ADDR") self.prom_client.start(prom_port, prom_addr) self._export_ryu_config() # Start event notifier notifier_thread = self.notifier.start() if notifier_thread is not None: self.threads.append(notifier_thread) for service_event, service_pair in self._VALVE_SERVICES.items(): name, interval = service_pair thread = hub.spawn( partial(self._thread_reschedule, service_event(), interval) ) thread.name = name self.threads.append(thread)
def _delete_deconfigured_dp(self, deleted_dpid): self.logger.info("Deleting de-configured %s", dpid_log(deleted_dpid)) ryu_dp = self.dpset.get(deleted_dpid) if ryu_dp is not None: ryu_dp.close()
[docs] @set_ev_cls(EventReconfigure, MAIN_DISPATCHER) @kill_on_exception(exc_logname) def reload_config(self, ryu_event): """Handle a request to reload configuration.""" super().reload_config(ryu_event) self.valves_manager.request_reload_configs( time.time(), self.config_file, delete_dp=self._delete_deconfigured_dp )
@kill_on_exception(exc_logname) def _send_flow_msgs(self, valve, flow_msgs, ryu_dp=None): """Send OpenFlow messages to a connected datapath. Args: Valve instance or None. flow_msgs (list): OpenFlow messages to send. ryu_dp: Override datapath from DPSet. """ if ryu_dp is None: ryu_dp = self.dpset.get(valve.dp.dp_id) if not ryu_dp: valve.logger.error("send_flow_msgs: DP not up") return valve.send_flows(ryu_dp, flow_msgs, time.time()) def _get_valve(self, ryu_event, require_running=False): """Get Valve instance to response to an event. Args: ryu_event (ryu.controller.event.Event): event require_running (bool): require DP to be running. Returns: valve, ryu_dp, msg: tuple of Nones, or datapath object, Ryu datapath, and msg (if any) """ valve, ryu_dp, msg = self._get_datapath_obj( self.valves_manager.valves, ryu_event ) if valve: if msg: valve.ofchannel_log([msg]) if require_running and not valve.dp.dyn_running: valve = None return (valve, ryu_dp, msg) def _config_files_changed(self): return self.valves_manager.config_watcher.files_changed()
[docs] @set_ev_cls(EventFaucetMetricUpdate, MAIN_DISPATCHER) @kill_on_exception(exc_logname) def metric_update(self, _): """Handle a request to update metrics in the controller.""" self.valves_manager.update_metrics(time.time())
@set_ev_cls(EventFaucetMaintainStackRoot, MAIN_DISPATCHER) @kill_on_exception(exc_logname) def _maintain_stack_root(self, _): self.valves_manager.maintain_stack_root( time.time(), self.stack_root_state_update_time ) @set_ev_cls(EventFaucetEventSockHeartbeat, MAIN_DISPATCHER) @kill_on_exception(exc_logname) def _event_socket_heartbeat(self, _): self.valves_manager.event_socket_heartbeat() @set_ev_cls(EventFaucetResolveGateways, MAIN_DISPATCHER) @set_ev_cls(EventFaucetStateExpire, MAIN_DISPATCHER) @set_ev_cls(EventFaucetFastStateExpire, MAIN_DISPATCHER) @set_ev_cls(EventFaucetAdvertise, MAIN_DISPATCHER) @set_ev_cls(EventFaucetFastAdvertise, MAIN_DISPATCHER) @kill_on_exception(exc_logname) def _valve_flow_services(self, ryu_event): """Call a method on all Valves and send any resulting flows.""" self.valves_manager.valve_flow_services( time.time(), self._VALVE_SERVICES[type(ryu_event)][0] )
[docs] @set_ev_cls( ofp_event.EventOFPPacketIn, MAIN_DISPATCHER # pylint: disable=no-member ) @kill_on_exception(exc_logname) def packet_in_handler(self, ryu_event): """Handle a packet in event from the dataplane. Args: ryu_event (ryu.controller.event.EventReplyBase): packet in message. """ valve, _, msg = self._get_valve(ryu_event, require_running=True) if valve is None: return self.valves_manager.valve_packet_in(ryu_event.timestamp, valve, msg)
[docs] @set_ev_cls( ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER # pylint: disable=no-member ) @kill_on_exception(exc_logname) def error_handler(self, ryu_event): """Handle an OFPError from a datapath. Args: ryu_event (ryu.controller.ofp_event.EventOFPErrorMsg): trigger """ valve, _, msg = self._get_valve(ryu_event) if valve is None: return valve.oferror(msg)
[docs] @set_ev_cls( ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER # pylint: disable=no-member ) @kill_on_exception(exc_logname) def features_handler(self, ryu_event): """Handle receiving a switch features message from a datapath. Args: ryu_event (ryu.controller.ofp_event.EventOFPStateChange): trigger. """ valve, ryu_dp, msg = self._get_valve(ryu_event) if valve is None: return self._send_flow_msgs(valve, valve.switch_features(msg), ryu_dp=ryu_dp)
@kill_on_exception(exc_logname) def _datapath_connect(self, ryu_event): """Handle any/all re/connection of a datapath. Args: ryu_event (ryu.controller.ofp_event.Event) """ now = time.time() valve, ryu_dp, _ = self._get_valve(ryu_event) if valve is None: return discovered_up_ports = { port.port_no for port in list(ryu_dp.ports.values()) if ( valve_of.port_status_from_state(port.state) and not valve_of.ignore_port(port.port_no) ) } self._send_flow_msgs( valve, self.valves_manager.datapath_connect(now, valve, discovered_up_ports) ) self.valves_manager.update_config_applied({valve.dp.dp_id: True}) @kill_on_exception(exc_logname) def _datapath_disconnect(self, ryu_event): """Handle any/all disconnection of a datapath. Args: ryu_event (ryu.controller.ofp_event.Event) """ valve, _, _ = self._get_valve(ryu_event) if valve is None: return valve.datapath_disconnect(time.time())
[docs] @set_ev_cls( ofp_event.EventOFPDescStatsReply, MAIN_DISPATCHER # pylint: disable=no-member ) @kill_on_exception(exc_logname) def desc_stats_reply_handler(self, ryu_event): """Handle OFPDescStatsReply from datapath. Args: ryu_event (ryu.controller.ofp_event.EventOFPDescStatsReply): trigger. """ valve, _, msg = self._get_valve(ryu_event) if valve is None: return valve.ofdescstats_handler(msg.body)
[docs] @set_ev_cls( ofp_event.EventOFPPortDescStatsReply, # pylint: disable=no-member CONFIG_DISPATCHER, ) @kill_on_exception(exc_logname) def port_desc_stats_reply_handler(self, ryu_event): """Handle OFPPortDescStatsReply from datapath. Args: ryu_event (ryu.controller.ofp_event.EventOFPPortDescStatsReply): trigger. """ valve, _, msg = self._get_valve(ryu_event) if valve is None: return self.valves_manager.port_desc_stats_reply_handler(valve, msg, time.time())
[docs] @set_ev_cls( ofp_event.EventOFPPortStatus, MAIN_DISPATCHER # pylint: disable=no-member ) @kill_on_exception(exc_logname) def port_status_handler(self, ryu_event): """Handle a port status change event. Args: ryu_event (ryu.controller.ofp_event.EventOFPPortStatus): trigger. """ valve, _, msg = self._get_valve(ryu_event, require_running=True) if valve is None: return self.valves_manager.port_status_handler(valve, msg, time.time())
[docs] @set_ev_cls( ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER # pylint: disable=no-member ) @kill_on_exception(exc_logname) def flowremoved_handler(self, ryu_event): """Handle a flow removed event. Args: ryu_event (ryu.controller.ofp_event.EventOFPFlowRemoved): trigger. """ valve, ryu_dp, msg = self._get_valve(ryu_event, require_running=True) if valve is None: return if msg.reason == ryu_dp.ofproto.OFPRR_IDLE_TIMEOUT: self._send_flow_msgs( valve, valve.flow_timeout(time.time(), msg.table_id, msg.match) )