"""Prometheus for Gauge."""
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from functools import partial
from prometheus_client import Gauge
from faucet.gauge_pollers import (
GaugePortStatsPoller,
GaugePortStatePoller,
GaugeFlowTablePoller,
)
from faucet.prom_client import PromClient
PROM_PREFIX_DELIM = "_"
PROM_PORT_PREFIX = "of_port"
PROM_PORT_STATE_VARS = (
"reason",
"state",
"curr_speed",
"max_speed",
)
PROM_PORT_VARS = (
"tx_packets",
"rx_packets",
"tx_bytes",
"rx_bytes",
"tx_dropped",
"rx_dropped",
"tx_errors",
"rx_errors",
)
PROM_FLOW_VARS = ("flow_byte_count", "flow_packet_count")
PROM_METER_PREFIX = "of_meter"
PROM_METER_VARS = (
"flow_count",
"byte_in_count",
"packet_in_count",
"byte_band_count",
"packet_band_count",
)
[docs]
class GaugePrometheusClient(PromClient):
"""Wrapper for Prometheus client that is shared between all pollers."""
def __init__(self, reg=None):
super().__init__(reg=reg)
self.table_tags = collections.defaultdict(set)
self.metrics = {}
self.dp_status = Gauge(
"dp_status", "status of datapaths", self.REQUIRED_LABELS, registry=self._reg
)
self.reregister_nonflow_vars()
def _reregister_var(self, var_key, var_func):
try:
self._reg.unregister(self.metrics[var_key])
except KeyError:
pass
self.metrics[var_key] = var_func()
[docs]
def reregister_nonflow_vars(self):
"""Reset all metrics to empty."""
for prom_var in PROM_PORT_VARS + PROM_PORT_STATE_VARS:
exported_prom_var = PROM_PREFIX_DELIM.join((PROM_PORT_PREFIX, prom_var))
self._reregister_var(
exported_prom_var,
partial(
Gauge,
exported_prom_var,
"",
self.REQUIRED_LABELS + ["port", "port_description"],
registry=self._reg,
),
)
for prom_var in PROM_METER_VARS:
exported_prom_var = PROM_PREFIX_DELIM.join((PROM_METER_PREFIX, prom_var))
self._reregister_var(
exported_prom_var,
partial(
Gauge,
exported_prom_var,
"",
self.REQUIRED_LABELS + ["meter_id"],
registry=self._reg,
),
)
[docs]
def reregister_flow_vars(self, table_name, table_tags):
"""Register the flow variables needed for this client"""
for prom_var in PROM_FLOW_VARS:
table_prom_var = PROM_PREFIX_DELIM.join((prom_var, table_name))
self._reregister_var(
table_prom_var,
partial(
Gauge, table_prom_var, "", list(table_tags), registry=self._reg
),
)
[docs]
class GaugePortStatsPrometheusPoller(GaugePortStatsPoller):
"""Exports port stats to Prometheus."""
def __init__(self, conf, logger, prom_client):
super().__init__(conf, logger, prom_client)
self.prom_client.start(
self.conf.prometheus_port,
self.conf.prometheus_addr,
self.conf.prometheus_test_thread,
)
def _format_stat_pairs(self, delim, stat):
stat_pairs = (
((delim.join((PROM_PORT_PREFIX, prom_var)),), getattr(stat, prom_var))
for prom_var in PROM_PORT_VARS
)
return self._format_stats(delim, stat_pairs)
def _update(self, rcv_time, msg):
for stat in msg.body:
port_labels = self.dp.port_labels(stat.port_no)
for stat_name, stat_val in self._format_stat_pairs(PROM_PREFIX_DELIM, stat):
self.prom_client.metrics[stat_name].labels(**port_labels).set(stat_val)
[docs]
class GaugeMeterStatsPrometheusPoller(GaugePortStatsPoller):
"""Exports meter stats to Prometheus."""
def __init__(self, conf, logger, prom_client):
super().__init__(conf, logger, prom_client)
self.prom_client.start(
self.conf.prometheus_port,
self.conf.prometheus_addr,
self.conf.prometheus_test_thread,
)
def _format_stat_pairs(self, delim, stat):
band_stats = stat.band_stats[0]
stat_pairs = (
(("flow", "count"), stat.flow_count),
(("byte", "in", "count"), stat.byte_in_count),
(("packet", "in", "count"), stat.packet_in_count),
(("byte", "band", "count"), band_stats.byte_band_count),
(("packet", "band", "count"), band_stats.packet_band_count),
)
return self._format_stats(delim, stat_pairs)
def _update(self, rcv_time, msg):
for stat in msg.body:
meter_labels = self.dp.base_prom_labels()
meter_labels.update({"meter_id": stat.meter_id})
for stat_name, stat_val in self._format_stat_pairs(PROM_PREFIX_DELIM, stat):
stat_name = PROM_PREFIX_DELIM.join((PROM_METER_PREFIX, stat_name))
self.prom_client.metrics[stat_name].labels(**meter_labels).set(stat_val)
[docs]
class GaugePortStatePrometheusPoller(GaugePortStatePoller):
"""Export port state changes to Prometheus."""
def _update(self, rcv_time, msg):
port_no = msg.desc.port_no
port = self.dp.ports.get(port_no, None)
if port is None:
return
port_labels = self.dp.port_labels(port_no)
for prom_var in PROM_PORT_STATE_VARS:
exported_prom_var = PROM_PREFIX_DELIM.join((PROM_PORT_PREFIX, prom_var))
msg_value = (
msg.reason if prom_var == "reason" else getattr(msg.desc, prom_var)
)
self.prom_client.metrics[exported_prom_var].labels(**port_labels).set(
msg_value
)
[docs]
def send_req(self):
"""Send a stats request to a datapath."""
raise NotImplementedError # pragma: no cover
[docs]
def no_response(self):
"""Called when a polling cycle passes without receiving a response."""
raise NotImplementedError # pragma: no cover
[docs]
class GaugeFlowTablePrometheusPoller(GaugeFlowTablePoller):
"""Export flow table entries to Prometheus."""
def _update(self, rcv_time, msg):
jsondict = msg.to_jsondict()
for stats_reply in jsondict["OFPFlowStatsReply"]["body"]:
stats = stats_reply["OFPFlowStats"]
# TODO: labels based on matches will be dynamic
# Work around this by unregistering/registering the entire variable.
for var, tags, count in self._parse_flow_stats(stats):
table_id = int(tags["table_id"])
table_name = self.dp.table_by_id(table_id).name
table_tags = self.prom_client.table_tags[table_name]
tags_keys = set(tags.keys())
if tags_keys != table_tags:
unreg_tags = tags_keys - table_tags
if unreg_tags:
table_tags.update(unreg_tags)
self.prom_client.reregister_flow_vars(table_name, table_tags)
self.logger.info( # pylint: disable=logging-not-lazy
"Adding tags %s to %s for table %s"
% (unreg_tags, table_tags, table_name)
)
# Add blank tags for any tags not present.
missing_tags = table_tags - tags_keys
for tag in missing_tags:
tags[tag] = ""
table_prom_var = PROM_PREFIX_DELIM.join((var, table_name))
try:
self.prom_client.metrics[table_prom_var].labels(**tags).set(count)
except ValueError:
self.logger.error( # pylint: disable=logging-not-lazy
"labels %s versus %s incorrect on %s"
% (tags, table_tags, table_prom_var)
)