Source code for faucet.gauge_pollers

"""Library for polling dataplanes for statistics."""

# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import random
import time

from os_ken.lib import hub
from os_ken.ofproto import ofproto_v1_3 as ofp
from os_ken.ofproto import ofproto_v1_3_parser as parser

from faucet.valve_of import devid_present
from faucet.valve_of_old import OLD_MATCH_FIELDS


[docs] class GaugePoller: """Abstraction for a poller for statistics.""" def __init__(self, conf, logname, prom_client): self.dp = conf.dp # pylint: disable=invalid-name self.conf = conf self.prom_client = prom_client self.reply_pending = False self.ryudp = None self.logger = logging.getLogger(logname + ".{0}".format(self.conf.type)) # _running indicates that the watcher is receiving data self._running = False self.req = None
[docs] def report_dp_status(self, dp_status): """Report DP status.""" self.prom_client.dp_status.labels( **dict(dp_id=hex(self.dp.dp_id), dp_name=self.dp.name) ).set( dp_status ) # pylint: disable=no-member
[docs] def start(self, ryudp, active): """Start the poller.""" self.ryudp = ryudp self._running = True if active: self.logger.info("starting")
[docs] def stop(self): """Stop the poller.""" self.logger.info("stopping") self._running = False
[docs] def running(self): """Return True if the poller is running.""" return self._running
[docs] @staticmethod def is_active(): """Return True if the poller is controlling the request loop for its stat""" return False
[docs] def send_req(self): """Send a stats request to a datapath.""" raise NotImplementedError # pragma: no cover
[docs] def no_response(self): """Called when a polling cycle passes without receiving a response.""" dpid_str = "" if self.req and hasattr(self.req, "datapath"): dpid_str = "DPID %s (%s)" % ( self.req.datapath.id, hex(self.req.datapath.id), ) self.logger.info("%s no response to %s", dpid_str, self.req)
[docs] def update(self, rcv_time, msg): """Handle the responses to requests. Called when a reply to a stats request sent by this object is received by the controller. It should acknowledge the receipt by setting self.reply_pending to false. Args: rcv_time: the time the response was received msg: the stats reply message """ # TODO: it may be worth while verifying this is the correct stats # response before doing this if not self.running(): self.logger.debug("update received when not running") return self.reply_pending = False self._update(rcv_time, msg)
def _format_stat_pairs(self, _delim, _stat): return () def _dp_stat_name(self, _stat, _stat_name): return "" def _rcv_time(self, rcv_time): return time.strftime("%b %d %H:%M:%S", time.localtime(rcv_time)) def _update(self, rcv_time, msg): if not self.conf.file: return rcv_time_str = self._rcv_time(rcv_time) log_lines = [] for stat in msg.body: for stat_name, stat_val in self._format_stat_pairs("-", stat): dp_stat_name = self._dp_stat_name(stat, stat_name) log_lines.append( self._update_line(rcv_time_str, dp_stat_name, stat_val) ) with open(self.conf.file, "a", encoding="utf-8") as logfile: logfile.writelines(log_lines) @staticmethod def _format_stats(delim, stat_pairs): formatted_stats = [] for stat_name_list, stat_val in stat_pairs: stat_name = delim.join(stat_name_list) # OVS reports unsupported statistics as all-1-bits (UINT64_MAX) if stat_val == 2**64 - 1: stat_val = 0 formatted_stats.append((stat_name, stat_val)) return formatted_stats @staticmethod def _update_line(rcv_time_str, stat_name, stat_val): return "\t".join((rcv_time_str, stat_name, str(stat_val))) + "\n"
[docs] class GaugeThreadPoller(GaugePoller): """A ryu thread object for sending and receiving OpenFlow stats requests. The thread runs in a loop sending a request, sleeping then checking a response was received before sending another request. The methods send_req, update and no_response should be implemented by subclasses. """ def __init__(self, conf, logname, prom_client): super().__init__(conf, logname, prom_client) self.thread = None self.interval = self.conf.interval self.ryudp = None
[docs] def start(self, ryudp, active): self.stop() super().start(ryudp, active) if active: self.thread = hub.spawn(self) self.thread.name = "GaugeThreadPoller"
[docs] def stop(self): super().stop() if self.is_active(): hub.kill(self.thread) hub.joinall([self.thread]) self.thread = None
[docs] def is_active(self): return self.thread is not None
def __call__(self): """Send request loop. Delays the initial request for a random interval to reduce load. Then sends a request to the datapath, waits the specified interval and checks that a response has been received in a loop.""" # TODO: this should use a deterministic method instead of random hub.sleep(random.randint(1, self.conf.interval)) while True: self.send_req() self.reply_pending = True hub.sleep(self.conf.interval) if self.reply_pending: self.no_response()
[docs] def send_req(self): """Send a stats request to a datapath.""" raise NotImplementedError # pragma: no cover
[docs] class GaugeMeterStatsPoller(GaugeThreadPoller): """Poll for all meter stats."""
[docs] def send_req(self): if self.ryudp: self.req = parser.OFPMeterStatsRequest(self.ryudp, 0, ofp.OFPM_ALL) self.ryudp.send_msg(self.req)
[docs] class GaugePortStatsPoller(GaugeThreadPoller): """Periodically sends a port stats request to the datapath and parses and outputs the response. """
[docs] def send_req(self): if self.ryudp: self.req = parser.OFPPortStatsRequest(self.ryudp, 0, ofp.OFPP_ANY) self.ryudp.send_msg(self.req)
def _format_stat_pairs(self, delim, stat): stat_pairs = ( (("packets", "out"), stat.tx_packets), (("packets", "in"), stat.rx_packets), (("bytes", "out"), stat.tx_bytes), (("bytes", "in"), stat.rx_bytes), (("dropped", "out"), stat.tx_dropped), (("dropped", "in"), stat.rx_dropped), (("errors", "out"), stat.tx_errors), (("errors", "in"), stat.rx_errors), ) return self._format_stats(delim, stat_pairs)
[docs] class GaugeFlowTablePoller(GaugeThreadPoller): """Periodically dumps the current datapath flow table as a yaml object. Includes a timestamp and a reference ($DATAPATHNAME-flowtables). The flow table is dumped as an OFFlowStatsReply message (in yaml format) that matches all flows. """
[docs] def send_req(self): if self.ryudp: match = parser.OFPMatch() self.req = parser.OFPFlowStatsRequest( self.ryudp, 0, ofp.OFPTT_ALL, ofp.OFPP_ANY, ofp.OFPG_ANY, 0, 0, match ) self.ryudp.send_msg(self.req)
def _parse_flow_stats(self, stats): """Parse flow stats reply message into tags/labels and byte/packet counts.""" packet_count = int(stats["packet_count"]) byte_count = int(stats["byte_count"]) instructions = stats["instructions"] tags = { "dp_name": self.dp.name, "dp_id": hex(self.dp.dp_id), "table_id": int(stats["table_id"]), "priority": int(stats["priority"]), "inst_count": len(instructions), "cookie": int(stats["cookie"]), } oxm_matches = stats["match"]["OFPMatch"]["oxm_fields"] for oxm_match in oxm_matches: oxm_tlv = oxm_match["OXMTlv"] mask = oxm_tlv["mask"] val = oxm_tlv["value"] orig_field = oxm_tlv["field"] if mask is not None: val = "/".join((str(val), str(mask))) field = OLD_MATCH_FIELDS.get(orig_field, orig_field) tags[field] = val if field == "vlan_vid" and mask is None: tags["vlan"] = devid_present(int(val)) return ( ("flow_packet_count", tags, packet_count), ("flow_byte_count", tags, byte_count), )
[docs] class GaugePortStatePoller(GaugePoller): """Abstraction for port state poller."""
[docs] def send_req(self): """Send a stats request to a datapath.""" raise NotImplementedError # pragma: no cover
[docs] def no_response(self): """Called when a polling cycle passes without receiving a response.""" raise NotImplementedError # pragma: no cover