Source code for faucet.gauge_influx

"""Library for interacting with InfluxDB."""

# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
import requests  # pytype: disable=pyi-error
from faucet.gauge_pollers import (
    GaugePortStatePoller,
    GaugeFlowTablePoller,
    GaugePortStatsPoller,
)


[docs] class InfluxShipper: """Convenience class for shipping values to InfluxDB. Inheritors must have a WatcherConf object as conf. """ conf = None ship_error_prefix = "error shipping points: " logger = None
[docs] def ship_points(self, points): """Make a connection to InfluxDB and ship points.""" if self.conf is not None: try: client = InfluxDBClient( host=self.conf.influx_host, port=self.conf.influx_port, username=self.conf.influx_user, password=self.conf.influx_pwd, database=self.conf.influx_db, timeout=self.conf.influx_timeout, ) if client: if client.write_points(points=points, time_precision="s"): return True self.logger.warning( "%s failed to update InfluxDB" % self.ship_error_prefix ) else: self.logger.warning( "%s error connecting to InfluxDB" % self.ship_error_prefix ) except ( requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout, InfluxDBClientError, InfluxDBServerError, ) as err: self.logger.warning("%s %s" % (self.ship_error_prefix, err)) return False
[docs] @staticmethod def make_point(tags, rcv_time, stat_name, stat_val): """Make an InfluxDB point.""" # InfluxDB has only one integer type, int64. We are logging OF # stats that are uint64. Use float64 to prevent an overflow. # q.v. https://docs.influxdata.com/influxdb/v1.2/write_protocols/line_protocol_reference/ point = { "measurement": stat_name, "tags": tags, "time": int(rcv_time), # pylint: disable=no-member "fields": {"value": float(stat_val)}, } return point
[docs] def make_port_point( self, dp_name, port_name, rcv_time, stat_name, stat_val ): # pylint: disable=too-many-arguments """Make an InfluxDB point about a port measurement.""" port_tags = { "dp_name": dp_name, "port_name": port_name, } return self.make_point(port_tags, rcv_time, stat_name, stat_val)
[docs] class GaugePortStateInfluxDBLogger(GaugePortStatePoller, InfluxShipper): """ Example: :: > use faucet Using database faucet > precision rfc3339 > select * from port_state_reason where port_name = 'port1.0.1' order by time desc limit 10; name: port_state_reason ----------------------- time dp_name port_name value 2017-02-21T02:12:29Z windscale-faucet-1 port1.0.1 2 2017-02-21T02:12:25Z windscale-faucet-1 port1.0.1 2 2016-07-27T22:05:08Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:33:00Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:32:57Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:31:21Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:31:18Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:27:07Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:27:04Z windscale-faucet-1 port1.0.1 2 2016-05-25T04:24:53Z windscale-faucet-1 port1.0.1 2 """ def _update(self, rcv_time, msg): reason = msg.reason port_no = msg.desc.port_no if port_no in self.dp.ports: port_name = self.dp.ports[port_no].name points = [ self.make_port_point( self.dp.name, port_name, rcv_time, "port_state_reason", reason ) ] self.ship_points(points)
[docs] def send_req(self): """Send a stats request to a datapath.""" raise NotImplementedError # pragma: no cover
[docs] def no_response(self): """Called when a polling cycle passes without receiving a response.""" raise NotImplementedError # pragma: no cover
[docs] class GaugePortStatsInfluxDBLogger(GaugePortStatsPoller, InfluxShipper): """Periodically sends a port stats request to the datapath and parses \ and outputs the response. Example: :: > use faucet Using database faucet > show measurements name: measurements ------------------ bytes_in bytes_out dropped_in dropped_out errors_in packets_in packets_out port_state_reason > precision rfc3339 > select * from packets_out where port_name = 'port1.0.1' order by time desc limit 10; name: packets_out ----------------- time dp_name port_name value 2017-03-06T05:21:42Z windscale-faucet-1 port1.0.1 76083431 2017-03-06T05:21:33Z windscale-faucet-1 port1.0.1 76081172 2017-03-06T05:21:22Z windscale-faucet-1 port1.0.1 76078727 2017-03-06T05:21:12Z windscale-faucet-1 port1.0.1 76076612 2017-03-06T05:21:02Z windscale-faucet-1 port1.0.1 76074546 2017-03-06T05:20:52Z windscale-faucet-1 port1.0.1 76072730 2017-03-06T05:20:42Z windscale-faucet-1 port1.0.1 76070528 2017-03-06T05:20:32Z windscale-faucet-1 port1.0.1 76068211 2017-03-06T05:20:22Z windscale-faucet-1 port1.0.1 76065982 2017-03-06T05:20:12Z windscale-faucet-1 port1.0.1 76063941 """ def _update(self, rcv_time, msg): points = [] for stat in msg.body: port_name = str(stat.port_no) for stat_name, stat_val in self._format_stat_pairs("_", stat): points.append( self.make_port_point( self.dp.name, port_name, rcv_time, stat_name, stat_val ) ) self.ship_points(points)
[docs] class GaugeFlowTableInfluxDBLogger(GaugeFlowTablePoller, InfluxShipper): # pylint: disable=line-too-long """ Example: :: > use faucet Using database faucet > show series where table_id = '0' and in_port = '2' key --- flow_byte_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=17,priority=9099,table_id=0,udp_dst=53 flow_byte_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=6,priority=9098,table_id=0,tcp_dst=53 flow_byte_count,dp_name=windscale-faucet-1,in_port=2,priority=9097,table_id=0 flow_packet_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=17,priority=9099,table_id=0,udp_dst=53 flow_packet_count,dp_name=windscale-faucet-1,eth_type=2048,in_port=2,ip_proto=6,priority=9098,table_id=0,tcp_dst=53 flow_packet_count,dp_name=windscale-faucet-1,in_port=2,priority=9097,table_id=0 > select * from flow_byte_count where table_id = '0' and in_port = '2' and ip_proto = '17' and time > now() - 5m name: flow_byte_count time arp_tpa dp_name eth_dst eth_src eth_type icmpv6_type in_port ip_proto ipv4_dst ipv6_dst priority table_id tcp_dst udp_dst value vlan_vid ---- ------- ------- ------- ------- -------- ----------- ------- -------- -------- -------- -------- -------- ------- ------- ----- -------- 1501154797000000000 windscale-faucet-1 2048 2 17 9099 0 53 9414 1501154857000000000 windscale-faucet-1 2048 2 17 9099 0 53 10554 1501154917000000000 windscale-faucet-1 2048 2 17 9099 0 53 10554 1501154977000000000 windscale-faucet-1 2048 2 17 9099 0 53 12164 1501155037000000000 windscale-faucet-1 2048 2 17 9099 0 53 12239 """ # noqa: E501 def _update(self, rcv_time, msg): points = [] jsondict = msg.to_jsondict() for stats_reply in jsondict["OFPFlowStatsReply"]["body"]: stats = stats_reply["OFPFlowStats"] for var, tags, count in self._parse_flow_stats(stats): points.append(self.make_point(tags, rcv_time, var, count)) self.ship_points(points)