"""Port configuration."""
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import netaddr
from faucet.conf import Conf, test_config_condition
from faucet import valve_of
# Forced port DOWN
STACK_STATE_ADMIN_DOWN = 0
# New instance, initializing stack port
STACK_STATE_INIT = 1
# Incorrect stack configuration
STACK_STATE_BAD = 2
# Stack port up and running, no timeouts, no incorrect cabling
STACK_STATE_UP = 3
# Stack timed out, too many packets lost
STACK_STATE_GONE = 4
# Not stack port configured
STACK_STATE_NONE = -1
STACK_DISPLAY_DICT = {
STACK_STATE_ADMIN_DOWN: "ADMIN_DOWN",
STACK_STATE_INIT: "INITIALIZING",
STACK_STATE_BAD: "BAD",
STACK_STATE_UP: "UP",
STACK_STATE_GONE: "GONE",
STACK_STATE_NONE: "NONE",
}
# LACP not configured
LACP_ACTOR_NOTCONFIGURED = -1
# Not receiving packets from the actor & port is down
LACP_ACTOR_NONE = 0
# Not receiving packets from the actor & port is up
LACP_ACTOR_INIT = 1
# Receiving LACP packets with sync bit set
LACP_ACTOR_UP = 3
# LACP actor is not sending LACP with sync bit set
LACP_ACTOR_NOSYNC = 5
LACP_ACTOR_DISPLAY_DICT = {
LACP_ACTOR_NOTCONFIGURED: "NOT_CONFIGURED",
LACP_ACTOR_NONE: "NONE",
LACP_ACTOR_INIT: "INITIALIZING",
LACP_ACTOR_UP: "UP",
LACP_ACTOR_NOSYNC: "NO_SYNC",
}
# LACP is not configured
LACP_PORT_NOTCONFIGURED = -1
# Port is not a LACP port on the nominated DP
LACP_PORT_UNSELECTED = 1
# Port is a LACP port on the nominated DP, will send/receive
LACP_PORT_SELECTED = 2
# Port is a LACP port that is in standby
LACP_PORT_STANDBY = 3
LACP_PORT_DISPLAY_DICT = {
LACP_PORT_NOTCONFIGURED: "NOT_CONFIGURED",
LACP_PORT_UNSELECTED: "UNSELECTED",
LACP_PORT_SELECTED: "SELECTED",
LACP_PORT_STANDBY: "STANDBY",
}
[docs]
class Port(Conf):
"""Stores state for ports, including the configuration."""
defaults = {
"number": None,
"name": None,
"description": None,
"enabled": True,
"permanent_learn": False,
# if True, a host once learned on this port cannot be learned on another port.
"unicast_flood": True,
# if True, do classical unicast flooding on this port (False floods ND/ARP/bcast only).
"mirror": None,
# If set, mirror packets from that port to this one.
"native_vlan": None,
# Set untagged VLAN on this port.
"tagged_vlans": None,
# Set tagged VLANs on this port.
"acl_in": None,
"acls_in": None,
# ACL for input on this port.
"stack": None,
# Configure a stack peer on this port.
"max_hosts": 255,
# maximum number of hosts
"hairpin": False,
# if True, then switch unicast and flood between hosts on this port (eg WiFi radio).
"hairpin_unicast": False,
# if True, then switch unicast between hosts on this port (eg WiFi radio).
"lacp": 0,
# if non 0 (LAG ID), experimental LACP support enabled on this port.
"lacp_active": False,
# experimental active LACP
"lacp_collect_and_distribute": False,
# if true, forces LACP port to collect and distribute when syncing with the peer.
"lacp_unselected": False,
# if true, forces LACP port to be in unselected state
"lacp_selected": False,
# if true, forces LACP port to be in the selected state
"lacp_standby": False,
# if true, forces LACP port to be in the standby state
"lacp_passthrough": None,
# If set, fail the lacp on this port if any of the peer ports are down.
"lacp_resp_interval": 1,
# Min time since last LACP response. Used to control rate of response for LACP
"lacp_port_priority": 255,
# Sets port priority value sent out in lacp packet
"lacp_port_id": -1,
# Sets port id value sent out in lacp packet
"loop_protect": False,
# if True, do simple (host/access port) loop protection on this port.
"loop_protect_external": False,
# if True, do external (other switch) loop protection on this port.
"output_only": False,
# if True, all packets input from this port are dropped.
"lldp_beacon": {},
# LLDP beacon configuration for this port.
"opstatus_reconf": True,
# If True, configure pipeline if operational status of port changes.
"receive_lldp": False,
# If True, receive LLDP on this port.
"lldp_peer_mac": None,
# If set, validates src MAC address of incoming LLDP packets
"max_lldp_lost": 3,
# threshold before marking a stack port as down
"dot1x": False,
# If true, block this port until a successful 802.1x auth
"dot1x_acl": False,
# If true, expects authentication and default ACLs for 802.1x auth
"dot1x_mab": False,
# If true, allows Mac Auth Bypass on port (NOTE: this is less secure as MACs can be spoofed)
"dot1x_dyn_acl": False,
# If true, expects authentication and ACLs with dot1x_assigned flag set
"restricted_bcast_arpnd": False,
# If true, this port cannot send non-ARP/IPv6 ND broadcasts to other restricted_bcast_arpnd ports.
"coprocessor": {},
# If defined, this port is attached to a packet coprocessor.
"count_untag_vlan_miss": False,
# If defined, this port will explicitly count unconfigured native VLAN packets.
}
defaults_types = {
"number": int,
"name": str,
"description": str,
"enabled": bool,
"permanent_learn": bool,
"unicast_flood": bool,
"mirror": (list, str, int),
"native_vlan": (str, int),
"tagged_vlans": list,
"acl_in": (str, int),
"acls_in": list,
"stack": dict,
"max_hosts": int,
"hairpin": bool,
"hairpin_unicast": bool,
"lacp": int,
"lacp_active": bool,
"lacp_collect_and_distribute": bool,
"lacp_unselected": bool,
"lacp_selected": bool,
"lacp_standby": bool,
"lacp_passthrough": list,
"lacp_resp_interval": int,
"lacp_port_priority": int,
"lacp_port_id": int,
"loop_protect": bool,
"loop_protect_external": bool,
"output_only": bool,
"lldp_beacon": dict,
"opstatus_reconf": bool,
"receive_lldp": bool,
"lldp_peer_mac": str,
"dot1x": bool,
"dot1x_acl": bool,
"dot1x_mab": bool,
"dot1x_dyn_acl": bool,
"max_lldp_lost": int,
"restricted_bcast_arpnd": bool,
"coprocessor": dict,
"count_untag_vlan_miss": bool,
}
stack_defaults_types = {
"dp": str,
"port": (str, int),
}
lldp_beacon_defaults_types = {
"enable": bool,
"org_tlvs": list,
"system_name": str,
"port_descr": str,
}
lldp_org_tlv_defaults_types = {
"oui": (int, bytearray),
"subtype": (int, bytearray),
"info": (str, bytearray),
}
coprocessor_defaults_types = {
"strategy": str,
"vlan_vid_base": int,
}
def __init__(self, _id, dp_id, conf=None):
self.acl_in = None
self.acls_in = None
self.description = None
self.dot1x = None
self.dot1x_acl = None
self.dot1x_mab = None
self.dot1x_dyn_acl = None
self.dp_id = None
self.enabled = None
self.hairpin = None
self.hairpin_unicast = None
self.lacp = None
self.lacp_active = None
self.lacp_collect_and_distribute = None
self.lacp_unselected = None
self.lacp_selected = None
self.lacp_standby = None
self.lacp_passthrough = None
self.lacp_resp_interval = None
self.lacp_port_priority = None
self.lacp_port_id = None
self.loop_protect = None
self.loop_protect_external = None
self.max_hosts = None
self.max_lldp_lost = None
self.mirror = None
self.name = None
self.native_vlan = None
self.number = None
self.opstatus_reconf = None
self.output_only = None
self.permanent_learn = None
self.receive_lldp = None
self.lldp_peer_mac = None
self.stack = {}
self.unicast_flood = None
self.restricted_bcast_arpnd = None
self.coprocessor = {}
self.count_untag_vlan_miss = None
self.dyn_dot1x_native_vlan = None
self.dyn_lacp_up = None
self.dyn_lacp_updated_time = None
self.dyn_lacp_last_resp_time = None
self.dyn_last_ban_time = None
self.dyn_last_lacp_pkt = None
self.dyn_last_lldp_beacon_time = None
self.dyn_lldp_beacon_recv_state = None
self.dyn_lldp_beacon_recv_time = None
self.dyn_learn_ban_count = 0
self.dyn_phys_up = False
self.dyn_update_time = None
self.dyn_stack_current_state = STACK_STATE_NONE
self.dyn_lacp_port_selected = LACP_PORT_NOTCONFIGURED
self.dyn_lacp_actor_state = LACP_ACTOR_NOTCONFIGURED
self.dyn_stack_probe_info = {}
self.tagged_vlans = []
self.lldp_beacon = {}
super().__init__(_id, dp_id, conf)
# If the port is mirrored convert single attributes to a array
if self.mirror and not isinstance(self.mirror, list):
self.mirror = [self.mirror]
def __str__(self):
return "Port %u" % self.number
def __repr__(self):
return self.__str__()
[docs]
def clone_dyn_state(self, prev_port):
if prev_port:
self.dyn_lldp_beacon_recv_time = prev_port.dyn_lldp_beacon_recv_time
self.dyn_lldp_beacon_recv_state = prev_port.dyn_lldp_beacon_recv_state
self.dyn_stack_current_state = prev_port.dyn_stack_current_state
self.dyn_last_lldp_beacon_time = prev_port.dyn_last_lldp_beacon_time
self.dyn_phys_up = prev_port.dyn_phys_up
self.dyn_stack_probe_info = prev_port.dyn_stack_probe_info
self.dyn_update_time = prev_port.dyn_update_time
[docs]
def stack_descr(self):
"""Return stacking annotation if this is a stacking port."""
if self.stack:
return "remote DP %s %s" % (self.stack["dp"].name, self.stack["port"])
return ""
[docs]
def set_defaults(self):
super().set_defaults()
self._set_default("number", self._id)
self._set_default("name", str(self._id))
self._set_default("description", self.name)
self._set_default("tagged_vlans", [])
[docs]
def check_config(self):
super().check_config()
test_config_condition(
not (
isinstance(self.number, int)
and self.number > 0
and (not valve_of.ignore_port(self.number))
),
("Port number invalid: %s" % self.number),
)
non_vlan_options = {"stack", "mirror", "coprocessor", "output_only"}
vlan_agnostic_options = {
"enabled",
"number",
"name",
"description",
"max_lldp_lost",
}
vlan_port = self.tagged_vlans or self.native_vlan
non_vlan_port_options = {
option for option in non_vlan_options if getattr(self, option)
}
test_config_condition(
vlan_port and non_vlan_port_options,
"cannot have VLANs configured on non-VLAN ports: %s" % self,
)
if self.output_only:
test_config_condition(
not non_vlan_port_options.issubset({"mirror", "output_only"}),
"output_only can only coexist with mirror option on same port %s"
% self,
)
elif self.mirror:
test_config_condition(
not non_vlan_port_options.issubset({"mirror", "coprocessor"}),
"coprocessor can only coexist with mirror option on same port %s"
% self,
)
else:
test_config_condition(
len(non_vlan_port_options) > 1,
"cannot have multiple non-VLAN port options %s on same port: %s"
% (non_vlan_port_options, self),
)
if non_vlan_port_options:
for key, default_val in self.defaults.items():
if key in vlan_agnostic_options or key in non_vlan_port_options:
continue
if key.startswith("acl") and (self.stack or self.coprocessor):
continue
val = getattr(self, key)
test_config_condition(
val != default_val and val,
"Cannot have VLAN option %s: %s on non-VLAN port %s"
% (key, val, self),
)
test_config_condition(
self.hairpin and self.hairpin_unicast,
"Cannot have both hairpin and hairpin_unicast enabled",
)
dot1x_features = {
dot1x_feature
for dot1x_feature, dot1x_config in self.defaults.items()
if dot1x_feature.startswith("dot1x_") and dot1x_config
}
test_config_condition(
dot1x_features and not self.dot1x,
"802.1x features %s require port to have dot1x enabled" % dot1x_features,
)
if self.dot1x:
test_config_condition(
self.number > 65535, ("802.1x not supported on ports > 65535")
)
if self.dot1x_mab:
test_config_condition(
self.dot1x_dyn_acl,
("802.1x_MAB cannot be used with 802.1x_DYN_ACL"),
)
if self.dot1x_dyn_acl:
test_config_condition(
self.dot1x_acl, ("802.1x_DYN_ACL cannot be used with 802.1x_ACL")
)
if self.coprocessor:
self._check_conf_types(self.coprocessor, self.coprocessor_defaults_types)
test_config_condition(
self.coprocessor.get("strategy", None) != "vlan_vid",
"coprocessor only supports vlan_vid strategy",
)
self.coprocessor["vlan_vid_base"] = self.coprocessor.get(
"vlan_vid_base", 1000
)
if self.stack:
self._check_conf_types(self.stack, self.stack_defaults_types)
for stack_config in list(self.stack_defaults_types.keys()):
test_config_condition(
stack_config not in self.stack,
("stack %s must be defined" % stack_config),
)
# LLDP always enabled for stack ports.
self.receive_lldp = True
if not self.lldp_beacon_enabled():
self.lldp_beacon.update({"enable": True})
if self.lacp_resp_interval is not None:
test_config_condition(
self.lacp_resp_interval > 65535 or self.lacp_resp_interval < 0.3,
("interval must be at least 0.3 and less than 65536"),
)
if self.lacp_port_priority is not None:
test_config_condition(
self.lacp_port_priority > 255 or self.lacp_port_priority < 0,
("lacp port priority must be at least 0 and less than 256"),
)
if self.lldp_peer_mac:
test_config_condition(
not netaddr.valid_mac(self.lldp_peer_mac),
("invalid MAC address %s" % self.lldp_peer_mac),
)
if self.lldp_beacon:
self._check_conf_types(self.lldp_beacon, self.lldp_beacon_defaults_types)
self.lldp_beacon = self._set_unknown_conf(
self.lldp_beacon, self.lldp_beacon_defaults_types
)
if self.lldp_beacon_enabled():
if self.lldp_beacon["port_descr"] is None:
self.lldp_beacon["port_descr"] = self.description
org_tlvs = []
for org_tlv in self.lldp_beacon["org_tlvs"]:
self._check_conf_types(org_tlv, self.lldp_org_tlv_defaults_types)
test_config_condition(
len(org_tlv) != len(self.lldp_org_tlv_defaults_types),
("missing org_tlv config"),
)
if not isinstance(org_tlv["info"], bytearray):
try:
org_tlv["info"] = bytearray.fromhex(
org_tlv["info"]
) # pytype: disable=missing-parameter
except ValueError:
org_tlv["info"] = org_tlv["info"].encode("utf-8")
if not isinstance(org_tlv["oui"], bytearray):
org_tlv["oui"] = bytearray.fromhex(
"%6.6x" % org_tlv["oui"]
) # pytype: disable=missing-parameter
org_tlvs.append(org_tlv)
self.lldp_beacon["org_tlvs"] = org_tlvs
test_config_condition(
self.acl_in and self.acls_in,
"Found both acl_in and acls_in, use only acls_in",
)
if self.acl_in and not isinstance(self.acl_in, list):
self.acls_in = [self.acl_in]
self.acl_in = None
if self.acls_in:
for acl in self.acls_in:
test_config_condition(
not isinstance(acl, (int, str)), "ACL names must be int or str"
)
lacp_options = [self.lacp_selected, self.lacp_unselected, self.lacp_standby]
test_config_condition(
lacp_options.count(True) > 1,
"Cannot force multiple LACP port selection states",
)
[docs]
def finalize(self):
if self.native_vlan:
test_config_condition(
self.native_vlan in self.tagged_vlans,
("cannot have same native and tagged VLAN on same port"),
)
self.tagged_vlans = tuple(self.tagged_vlans)
super().finalize()
[docs]
def running(self):
"""Return True if port enabled and up."""
return self.enabled and self.dyn_phys_up
[docs]
def vlans(self):
"""Return all VLANs this port is in."""
if self.native_vlan is not None and self.dyn_dot1x_native_vlan is not None:
return (
(self.native_vlan,)
+ (self.dyn_dot1x_native_vlan,)
+ tuple(self.tagged_vlans)
)
if self.dyn_dot1x_native_vlan is not None:
return (self.dyn_dot1x_native_vlan,) + tuple(self.tagged_vlans)
if self.native_vlan is not None:
return (self.native_vlan,) + tuple(self.tagged_vlans)
return tuple(self.tagged_vlans)
[docs]
def hosts(self, vlans=None):
"""Return all host cache entries this port has learned (on all or specified VLANs)."""
if vlans is None:
vlans = self.vlans()
hosts = []
for vlan in vlans:
hosts.extend(list(list(vlan.cached_hosts_on_port(self))))
return hosts
[docs]
def hosts_count(self, vlans=None):
"""Return count of all hosts this port has learned (on all or specified VLANs)."""
if vlans is None:
vlans = self.vlans()
hosts_count = 0
for vlan in vlans:
hosts_count += vlan.cached_hosts_count_on_port(self)
return hosts_count
[docs]
def lldp_beacon_enabled(self):
"""Return True if LLDP beacon enabled on this port."""
return self.lldp_beacon and self.lldp_beacon.get("enable", False)
[docs]
def mirror_actions(self):
"""Return OF actions to mirror this port."""
if self.mirror is not None:
return [valve_of.output_port(mirror_port) for mirror_port in self.mirror]
return []
[docs]
def non_stack_forwarding(self):
"""Returns True if port is not-stacking and, and able to forward packets."""
if self.stack:
return False
if not self.dyn_phys_up:
return False
if self.lacp and not self.dyn_lacp_up:
return False
return True
[docs]
def tunnel_acls(self):
"""Return any tunnel ACLs on this port."""
acls = []
if self.acls_in:
acls = [acl for acl in self.acls_in if acl.is_tunnel_acl()]
return acls
[docs]
def contains_tunnel_acl(self, tunnel_id=None):
"""Searches through acls_in for a tunnel ACL with a matching tunnel_id"""
acls = self.tunnel_acls()
if tunnel_id is None:
return bool(acls)
for acl in self.tunnel_acls():
if acl.get_tunnel_rules(tunnel_id):
return True
return False
# LACP functions
[docs]
def lacp_actor_update(self, lacp_up, now=None, lacp_pkt=None, cold_start=False):
"""
Update the LACP actor state
Args:
lacp_up (bool): The intended LACP/port state
now (float): Current time
lacp_pkt (PacketMeta): Received LACP packet
cold_start (bool): Whether the port is being cold started
Returns:
current LACP actor state
"""
self.dyn_lacp_up = 1 if lacp_up else 0
self.dyn_lacp_updated_time = now
self.dyn_last_lacp_pkt = lacp_pkt
if cold_start:
# Cold starting, so revert to unconfigured LACP state
self.actor_notconfigured()
elif not self.running():
# Phys not up so we do not initialize actor states
self.actor_none()
elif not lacp_pkt:
# Intialize states
self.actor_init()
else:
# Packets received from actor
if lacp_up:
# Receiving packets & LACP is UP
self.actor_up()
else:
# Receiving packets but LACP sync bit is not set
self.actor_nosync()
return self.actor_state()
[docs]
def lacp_port_update(self, selected, cold_start=False):
"""
Updates the LACP port selection state
Args:
selected (bool): Whether the port's DPID is the selected one
cold_start (bool): Whether the port is being cold started
Returns
current lacp port state
"""
if cold_start:
# Cold starting, so revert to unconfigured state
self.deconfigure_port()
elif self.lacp_selected:
# Configured LACP port option to be forced into SELECTED state
self.select_port()
elif self.lacp_unselected:
# Configured LACP port option to be forced into UNSELECTED state
self.deselect_port()
else:
if selected:
# Port SELECTED so change state to SELECTED
self.select_port()
elif self.lacp_standby:
# Send port to STANDBY if not SELECTED
self.standby_port()
else:
# Doesn't belong on chosen DP for LAG, DESELECT port
self.deselect_port()
return self.lacp_port_state()
[docs]
def get_lacp_flags(self):
"""
Get the LACP flags for the state the port is in
Return sync, collecting, distributing flag values
"""
if self.lacp_collect_and_distribute:
return 1, 1, 1
if self.is_port_standby():
return 1, 0, 0
if self.is_port_selected():
return 1, 1, 1
return 0, 0, 0
# LACP ACTOR STATES:
[docs]
def is_actor_up(self):
"""Return true if the LACP actor state is UP"""
return self.dyn_lacp_actor_state == LACP_ACTOR_UP
[docs]
def is_actor_nosync(self):
"""Return true if the LACP actor state is NOSYNC"""
return self.dyn_lacp_actor_state == LACP_ACTOR_NOSYNC
[docs]
def is_actor_init(self):
"""Return true if the LACP actor state is INIT"""
return self.dyn_lacp_actor_state == LACP_ACTOR_INIT
[docs]
def is_actor_none(self):
"""Return true if the LACP actor state is NONE"""
return self.dyn_lacp_actor_state == LACP_ACTOR_NONE
[docs]
def actor_state(self):
"""Return the current LACP actor state"""
return self.dyn_lacp_actor_state
[docs]
def actor_none(self):
"""Set the LACP actor state to NONE"""
self.dyn_lacp_actor_state = LACP_ACTOR_NONE
[docs]
def actor_init(self):
"""Set the LACP actor state to INIT"""
self.dyn_lacp_actor_state = LACP_ACTOR_INIT
[docs]
def actor_up(self):
"""Set the LACP actor state to UP"""
self.dyn_lacp_actor_state = LACP_ACTOR_UP
[docs]
def actor_nosync(self):
"""Set the LACP actor state to NOSYNC"""
self.dyn_lacp_actor_state = LACP_ACTOR_NOSYNC
[docs]
@staticmethod
def actor_state_name(state):
"""Return the string of the actor state"""
return LACP_ACTOR_DISPLAY_DICT[state]
# LACP PORT ROLES:
[docs]
def is_port_selected(self):
"""Return true if the lacp is a SELECTED port"""
return self.dyn_lacp_port_selected == LACP_PORT_SELECTED
[docs]
def is_port_unselected(self):
"""Return true if the lacp is an UNSELECTED port"""
return self.dyn_lacp_port_selected == LACP_PORT_UNSELECTED
[docs]
def is_port_standby(self):
"""Return true if the lacp is a port in STANDBY"""
return self.dyn_lacp_port_selected == LACP_PORT_STANDBY
[docs]
def lacp_port_state(self):
"""Return the current LACP port state"""
return self.dyn_lacp_port_selected
[docs]
def select_port(self):
"""SELECT the current LACP port"""
self.dyn_lacp_port_selected = LACP_PORT_SELECTED
[docs]
def deselect_port(self):
"""UNSELECT the current LACP port"""
self.dyn_lacp_port_selected = LACP_PORT_UNSELECTED
[docs]
def standby_port(self):
"""Set LACP port state to STANDBY"""
self.dyn_lacp_port_selected = LACP_PORT_STANDBY
[docs]
@staticmethod
def port_role_name(state):
"""Return the LACP port role state name"""
return LACP_PORT_DISPLAY_DICT[state]
# STACK PORT ROLES:
[docs]
def stack_port_update(self, now):
"""
Progesses through the stack link state machine
Args:
now (float): Current time
Returns:
int: Current (new) stack port state
string: reason for the state change and additional information
"""
reason = ""
if self.is_stack_admin_down():
# Stack port ADMIN_DOWN, so no next state
return self.stack_state()
last_seen_lldp_time = self.dyn_stack_probe_info.get("last_seen_lldp_time", None)
if last_seen_lldp_time is None:
if self.is_stack_none():
# New stack, changing to state INIT
self.stack_init()
reason = "new"
else:
# Not a new stack port, so progess through state machine
peer_dp = self.stack["dp"] # pytype: disable=attribute-error
stack_correct = self.dyn_stack_probe_info.get("stack_correct", None)
send_interval = peer_dp.lldp_beacon.get(
"send_interval", peer_dp.DEFAULT_LLDP_SEND_INTERVAL
)
time_since_lldp_seen = None
num_lost_lldp = None
stack_timed_out = True
time_since_lldp_seen = now - last_seen_lldp_time
num_lost_lldp = time_since_lldp_seen / send_interval
if num_lost_lldp < self.max_lldp_lost:
stack_timed_out = False
if stack_timed_out:
# Stack timed out, too many packets lost
self.stack_gone()
reason = "too many (%u) packets lost, last received %us ago" % (
num_lost_lldp,
time_since_lldp_seen,
)
elif not stack_correct:
# Stack bad due to incorrect cabling
self.stack_bad()
reason = "incorrect cabling"
elif not self.is_stack_up():
# Nothing gone wrong, so Stack UP
self.stack_up()
reason = "up"
return self.stack_state(), reason
[docs]
def is_stack_admin_down(self):
"""Return True if port is in ADMIN_DOWN state."""
return self.dyn_stack_current_state == STACK_STATE_ADMIN_DOWN
[docs]
def is_stack_none(self):
"""Return True if port is in NONE state."""
return self.dyn_stack_current_state == STACK_STATE_NONE
[docs]
def is_stack_init(self):
"""Return True if port is in INIT state."""
return self.dyn_stack_current_state == STACK_STATE_INIT
[docs]
def is_stack_bad(self):
"""Return True if port is in BAD state."""
return self.dyn_stack_current_state == STACK_STATE_BAD
[docs]
def is_stack_up(self):
"""Return True if port is in UP state."""
return self.dyn_stack_current_state == STACK_STATE_UP
[docs]
def is_stack_gone(self):
"""Return True if port is in GONE state."""
return self.dyn_stack_current_state == STACK_STATE_GONE
[docs]
def stack_state(self):
"""Return the current port stack state"""
return self.dyn_stack_current_state
[docs]
def stack_admin_down(self):
"""Change the current stack state to ADMIN_DOWN."""
self.dyn_stack_current_state = STACK_STATE_ADMIN_DOWN
[docs]
def stack_init(self):
"""Change the current stack state to INIT_DOWN."""
self.dyn_stack_current_state = STACK_STATE_INIT
[docs]
def stack_bad(self):
"""Change the current stack state to BAD."""
self.dyn_stack_current_state = STACK_STATE_BAD
[docs]
def stack_up(self):
"""Change the current stack state to UP."""
self.dyn_stack_current_state = STACK_STATE_UP
[docs]
def stack_gone(self):
"""Change the current stack state to GONE."""
self.dyn_stack_current_state = STACK_STATE_GONE
[docs]
@staticmethod
def stack_state_name(state):
"""Return stack state name"""
return STACK_DISPLAY_DICT[state]