"""Base configuration implementation."""
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import difflib
import ipaddress
import json
from collections import OrderedDict
[docs]
class InvalidConfigError(Exception):
"""This error is thrown when the config file is not valid."""
[docs]
def test_config_condition(cond, msg):
"""
Evaluate condition and raise InvalidConfigError if condition True.
Args:
cond (bool): Condition on which to raise an error if it is true
msg (str): Message for the error if the condition is true
"""
if cond:
raise InvalidConfigError(msg)
[docs]
class Conf:
"""Base class for FAUCET configuration."""
mutable_attrs = frozenset() # type: frozenset
defaults = {} # type: dict
defaults_types = {} # type: dict
dyn_finalized = False
dyn_hash = None
def __init__(self, _id, dp_id, conf=None):
self._id = _id
self.dp_id = dp_id
if conf is None:
conf = {}
if self.defaults is not None and self.defaults_types is not None:
diff = set(self.defaults.keys()).symmetric_difference(
set(self.defaults_types.keys())
)
assert not diff, diff
if isinstance(conf, dict):
self.update(conf)
self.set_defaults()
self.check_config()
self.orig_conf = {k: self.__dict__[k] for k in self.defaults}
for k, conf_v in self.orig_conf.items():
if isinstance(conf_v, Conf):
self.orig_conf[k] = conf_v.orig_conf
def __setattr__(self, name, value):
if (
not self.dyn_finalized
or name.startswith("dyn")
or name in self.mutable_attrs
):
super().__setattr__(name, value)
else:
raise ValueError("cannot update %s on finalized Conf object" % name)
def _set_default(self, key, value, conf=None):
if conf is None:
conf = self.__dict__
assert key in conf, key
if conf[key] is None:
conf[key] = value
def _set_conf_defaults(self, defaults, conf):
for key, value in defaults.items():
self._set_default(key, value, conf=conf)
[docs]
def set_defaults(self):
"""Set default values and run any basic sanity checks."""
self._set_conf_defaults(self.defaults, self.__dict__)
def _check_unknown_conf(self, conf):
"""Check that supplied conf dict doesn't specify keys not defined."""
sub_conf_names = set(conf.keys())
unknown_conf_names = sub_conf_names - set(self.defaults.keys())
test_config_condition(
unknown_conf_names,
"%s fields unknown in %s" % (unknown_conf_names, self._id),
)
def _check_conf_types(self, conf, conf_types):
"""Check that conf value is of the correct type."""
test_config_condition(
not isinstance(conf, dict),
(
"Conf object %s contents %s must be type %s not %s"
% (self._id, conf, dict, type(conf))
),
)
for conf_key, conf_value in conf.items():
test_config_condition(
conf_key not in conf_types,
"%s field unknown in %s (known types %s)"
% (conf_key, self._id, conf_types),
)
if conf_value is not None:
conf_type = conf_types[conf_key]
test_config_condition(
not isinstance(conf_value, conf_type),
"%s value %s must be %s not %s"
% (conf_key, conf_value, conf_type, type(conf_value)),
) # pytype: disable=invalid-typevar
@staticmethod
def _set_unknown_conf(conf, conf_types):
for conf_key, conf_type in conf_types.items():
if conf_key not in conf:
if conf_type == list:
conf[conf_key] = []
else:
conf[conf_key] = None
return conf
[docs]
def update(self, conf):
"""Parse supplied YAML config and sanity check."""
self.__dict__.update(conf)
self._check_unknown_conf(conf)
self._check_conf_types(conf, self.defaults_types)
[docs]
def check_config(self):
"""Check config at instantiation time for errors, typically via assert."""
return
def _conf_keys(self, conf, subconf=True, ignore_keys=None):
"""Return a list of key/values of attributes with dyn/Conf attributes/filtered."""
conf_keys = []
for key, value in sorted(
(
(key, value)
for key, value in conf.orig_conf.items()
if key in self.defaults
)
):
if ignore_keys and key in ignore_keys:
continue
if not subconf and value:
if isinstance(value, Conf):
continue
if isinstance(value, (tuple, list, set)) and isinstance(value[0], Conf):
continue
conf_keys.append((key, self._str_conf(value)))
return conf_keys
@staticmethod
def _conf_dyn_keys(conf):
return [
(key, value)
for key, value in conf.__dict__.items()
if key.startswith("dyn")
]
[docs]
def merge_dyn(self, other_conf):
"""Merge dynamic state from other conf object."""
self.__dict__.update(self._conf_dyn_keys(other_conf))
def _str_conf(self, conf_v):
if isinstance(conf_v, (bool, str, int)):
return conf_v
if isinstance(
conf_v,
(
ipaddress.IPv4Address,
ipaddress.IPv4Interface,
ipaddress.IPv4Network,
ipaddress.IPv6Address,
ipaddress.IPv6Interface,
ipaddress.IPv6Network,
),
):
return str(conf_v)
if isinstance(conf_v, (dict, OrderedDict)):
return {
str(i): self._str_conf(j) for i, j in conf_v.items() if j is not None
}
if isinstance(conf_v, (list, tuple, frozenset)):
return tuple(self._str_conf(i) for i in conf_v if i is not None)
if isinstance(conf_v, Conf):
for i in ("name", "_id"):
if hasattr(conf_v, i):
return getattr(conf_v, i)
return None
[docs]
def to_conf(self):
"""Return configuration as a dict."""
conf = {k: self.orig_conf[str(k)] for k in self.defaults if k != "name"}
return json.dumps(
self._str_conf(conf), sort_keys=True, indent=4, separators=(",", ": ")
)
[docs]
def conf_diff(self, other):
"""Return text diff between two Confs."""
differ = difflib.Differ()
return "\n".join(
differ.compare(self.to_conf().splitlines(), other.to_conf().splitlines())
)
[docs]
def conf_hash(self, subconf=True, ignore_keys=None):
"""Return hash of keys configurably filtering attributes."""
return hash(
frozenset(
list(
map(
str,
self._conf_keys(self, subconf=subconf, ignore_keys=ignore_keys),
)
)
)
)
def __hash__(self):
if self.dyn_hash is not None:
return self.dyn_hash
dyn_hash = self.conf_hash(subconf=True)
if self.dyn_finalized:
self.dyn_hash = dyn_hash
return dyn_hash
def _finalize_val(self, val):
if isinstance(val, list):
return tuple(self._finalize_val(v) for v in val)
if isinstance(val, set):
return frozenset([self._finalize_val(v) for v in val])
if isinstance(val, dict):
return OrderedDict(
[(k, self._finalize_val(v)) for k, v in sorted(val.items(), key=str)]
)
return val
[docs]
def finalize(self):
"""Configuration parsing marked complete."""
self.__dict__.update(
{
k: self._finalize_val(v)
for k, v in self.__dict__.items()
if not k.startswith("dyn")
}
)
self.dyn_finalized = True
[docs]
def ignore_subconf(self, other, ignore_keys=None):
"""Return True if this config same as other, ignoring sub config."""
return self.conf_hash(
subconf=False, ignore_keys=ignore_keys
) == other.conf_hash(subconf=False, ignore_keys=ignore_keys)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
def __ne__(self, other):
return not self.__eq__(other)
@staticmethod
def _check_ip_str(ip_str, ip_method=ipaddress.ip_address):
try:
# bool type is deprecated by the library ipaddress
if not isinstance(ip_str, bool):
return ip_method(ip_str)
raise InvalidConfigError(
"Invalid IP address %s: IP address of type bool" % (ip_str)
)
except (ValueError, AttributeError, TypeError) as err:
raise InvalidConfigError(
"Invalid IP address %s: %s" % (ip_str, err)
) from err
@staticmethod
def _ipvs(ipas):
return frozenset([ipa.version for ipa in ipas])
@staticmethod
def _by_ipv(ipas, ipv):
return frozenset([ipa for ipa in ipas if ipa.version == ipv])