Source code for faucet.valve_stack

"""Manage higher level stack functions"""

from collections import defaultdict

from faucet.valve_manager_base import ValveManagerBase


[docs] class ValveStackManager(ValveManagerBase): """Implement stack manager, this handles the more higher-order stack functions. This includes port nominations and flood directionality.""" def __init__( self, logger, dp, stack, tunnel_acls, acl_manager, output_table, **_kwargs ): """ Initialize variables and set up peer distances Args: stack (Stack): Stack object of the DP on the Valve being managed """ # Logger for logging self.logger = logger # DP instance for stack healthyness self.dp = dp # pylint: disable=invalid-name # Stack instance self.stack = stack # Used the manage the tunnel ACLs which requires stack knowledge self.tunnel_acls = tunnel_acls self.acl_manager = acl_manager self.output_table = output_table # Ports that are the shortest distance to the root self.towards_root_ports = None # Ports on an adjacent DP that is the chosen shortest path to the root self.chosen_towards_ports = None # Single port on the adjacent shortest path DP self.chosen_towards_port = None # All ports that are not the shortest distance to the root self.away_ports = None # Ports whose peer DPs have a shorter path to root self.inactive_away_ports = None # Redundant ports for each adjacent DP self.pruned_away_ports = None self.reset_peer_distances()
[docs] @staticmethod def stacked_valves(valves): """Return set of valves that have stacking enabled""" return { valve for valve in valves if valve.dp.stack and valve.dp.stack.root_name }
[docs] def reset_peer_distances(self): """Recalculates the towards and away ports for this node""" self.towards_root_ports = set() self.chosen_towards_ports = set() self.chosen_towards_port = None self.away_ports = set() self.inactive_away_ports = set() self.pruned_away_ports = set() all_peer_ports = set(self.stack.canonical_up_ports()) if self.stack.is_root(): self.away_ports = all_peer_ports else: port_peer_distances = { port: len(port.stack["dp"].stack.shortest_path_to_root()) for port in all_peer_ports } shortest_peer_distance = None for port, port_peer_distance in port_peer_distances.items(): if shortest_peer_distance is None: shortest_peer_distance = port_peer_distance continue shortest_peer_distance = min(shortest_peer_distance, port_peer_distance) self.towards_root_ports = { port for port, port_peer_distance in port_peer_distances.items() if port_peer_distance == shortest_peer_distance } self.away_ports = all_peer_ports - self.towards_root_ports if self.towards_root_ports: # Generate a shortest path to calculate the chosen connection to root shortest_path = self.stack.shortest_path_to_root() # Choose the port that is connected to peer DP if shortest_path and len(shortest_path) > 1: first_peer_dp = shortest_path[1] else: first_peer_port = self.stack.canonical_port_order( self.towards_root_ports )[0] first_peer_dp = first_peer_port.stack["dp"].name # The chosen towards ports are the ports through the chosen peer DP self.chosen_towards_ports = { port for port in self.towards_root_ports if port.stack["dp"].name == first_peer_dp } # pytype: disable=attribute-error if self.chosen_towards_ports: self.chosen_towards_port = self.stack.canonical_up_ports( self.chosen_towards_ports )[0] # Away ports are all the remaining (non-towards) ports self.away_ports = all_peer_ports - self.towards_root_ports if self.away_ports: # Get inactive away ports, ports whose peers have a better path to root self.inactive_away_ports = { port for port in self.away_ports if not self.stack.is_in_path( port.stack["dp"].name, self.stack.root_name ) } # Get pruned away ports, redundant ports for each adjacent DP ports_by_dp = defaultdict(list) for port in self.away_ports: ports_by_dp[port.stack["dp"]].append(port) for ports in ports_by_dp.values(): remote_away_ports = self.stack.canonical_up_ports( [port.stack["port"] for port in ports] ) self.pruned_away_ports.update( [ port.stack["port"] for port in remote_away_ports if port != remote_away_ports[0] ] ) return self.chosen_towards_ports
[docs] def update_stack_topo(self, event, dp, port): """ Update the stack topo according to the event. Args: event (bool): True if the port is UP dp (DP): DP object port (Port): The port being brought UP/DOWN """ self.stack.modify_link(dp, port, event) towards_ports = self.reset_peer_distances() if towards_ports: self.logger.info("shortest path to root is via %s" % towards_ports) else: self.logger.info("no path available to root")
[docs] def default_port_towards(self, dp_name): """ Default shortest path towards the provided destination, via direct shortest path Args: dp_name (str): Destination DP Returns: Port: port from current node that is shortest directly towards destination """ return self.stack.shortest_path_port(dp_name)
[docs] def relative_port_towards(self, dp_name): """ Returns the shortest path towards provided destination, via either the root or away paths Args: dp_name (str): Destination DP Returns: Port: port from current node that is towards/away the destination DP depending on relative position of the current node """ if not self.stack.shortest_path_to_root(): # No known path from current node to root, use default return self.default_port_towards(dp_name) if self.stack.name == dp_name: # Current node is the destination node, use default return self.default_port_towards(dp_name) path_to_root = self.stack.shortest_path_to_root(dp_name) if path_to_root and self.stack.name in path_to_root: # Current node is a transit node between root & destination, direct path to destination away_dp = path_to_root[path_to_root.index(self.stack.name) - 1] for port in self.away_ports: if port.stack["dp"].name == away_dp and not self.is_pruned_port(port): return port return None # Otherwise, head towards the root, path to destination via root return self.chosen_towards_port
[docs] def edge_learn_port_towards(self, pkt_meta, edge_dp): """ Returns the port towards the edge DP Args: pkt_meta (PacketMeta): Packet on the edge DP edge_dp (DP): Edge DP that received the packet Returns: Port: Port towards the edge DP via some stack chosen metric """ if pkt_meta.vlan.edge_learn_stack_root: return self.relative_port_towards(edge_dp.name) return self.default_port_towards(edge_dp.name)
[docs] def tunnel_outport(self, src_dp, dst_dp, dst_port): """ Returns the output port for the current stack node for the tunnel path Args: src_dp (str): Source DP name of the tunnel dst_dp (str): Destination DP name of the tunnel dst_port (int): Destination port of the tunnel Returns: int: Output port number for the current node of the tunnel """ if not self.stack.is_in_path(src_dp, dst_dp): # No known path from the source to destination DP, so no port to output return None out_port = self.default_port_towards(dst_dp) if self.stack.name == dst_dp: # Current stack node is the destination, so output to the tunnel destination port out_port = dst_port elif out_port: out_port = out_port.number return out_port
[docs] def update_health(self, now, last_live_times, update_time): """ Returns whether the current stack node is healthy, a healthy stack node is one that attempted connected recently, or was known to be running recently, has all LAGs UP and any stack port UP Args: now (float): Current time last_live_times (dict): Last live time value for each DP update_time (int): Stack root update interval time Returns: bool: True if current stack node is healthy """ prev_health = self.stack.dyn_healthy_info new_health, reason = self.stack.update_health(now, last_live_times, update_time) if prev_health != self.stack.dyn_healthy_info: health = "HEALTHY" if new_health else "UNHEALTHY" self.logger.info( "Stack node %s %s (%s)" % (self.stack.name, health, reason) ) return new_health
[docs] @staticmethod def nominate_stack_root( root_valve, other_valves, now, last_live_times, update_time ): """ Nominate a new stack root Args: root_valve (Valve): Previous/current root Valve object other_valves (list): List of other valves (not including previous root) now (float): Current time last_live_times (dict): Last live time value for each DP update_time (int): Stack root update interval time Returns: str: Name of the new elected stack root """ stack_valves = {valve for valve in other_valves if valve.dp.stack} if root_valve: stack_valves = {root_valve}.union(stack_valves) # Create lists of healthy and unhealthy root candidates healthy_valves = [] unhealthy_valves = [] for valve in stack_valves: if valve.dp.stack.is_root_candidate(): healthy = valve.stack_manager.update_health( now, last_live_times, update_time ) if healthy: healthy_valves.append(valve) elif valve.dp.stack.dyn_healthy_info[0]: unhealthy_valves.append(valve) if not healthy_valves and not unhealthy_valves: # No root candidates/stack valves, so no nomination return None # Choose a candidate valve to be the root if healthy_valves: # Healthy valves exist, so pick a healthy valve as root new_root_name = None if root_valve: new_root_name = root_valve.dp.name if root_valve not in healthy_valves: # Need to pick a new healthy root if current root not healthy stacks = [valve.dp.stack for valve in healthy_valves] _, new_root_name = stacks[0].nominate_stack_root(stacks) else: # No healthy stack roots, so forced to choose a bad root new_root_name = None if root_valve: # Current root is unhealthy along with all other roots, so keep root the same new_root_name = root_valve.dp.name if root_valve not in unhealthy_valves: # Pick the best unhealthy root stacks = [valve.dp.stack for valve in unhealthy_valves] _, new_root_name = stacks[0].nominate_stack_root(stacks) return new_root_name
[docs] def consistent_roots(self, expected_root_name, valve, other_valves): """Returns true if all the stack nodes have the root configured correctly""" stacked_valves = {valve}.union(self.stacked_valves(other_valves)) for stack_valve in stacked_valves: if stack_valve.dp.stack.root_name != expected_root_name: return False return True
[docs] def stack_ports(self): """Yield the stack ports of this stack node""" for port in self.stack.ports: yield port
[docs] @staticmethod def is_stack_port(port): """Return whether the port is a stack port""" return bool(port.stack)
[docs] def is_away(self, port): """Return whether the port is an away port for the node""" return port in self.away_ports
[docs] def is_towards_root(self, port): """Return whether the port is a port towards the root for the node""" return port in self.towards_root_ports
[docs] def is_selected_towards_root_port(self, port): """Return true if the port is the chosen towards root port""" return port == self.chosen_towards_port
[docs] def is_pruned_port(self, port): """Return true if the port is to be pruned""" if self.is_towards_root(port): return not self.is_selected_towards_root_port(port) if self.is_away(port): if self.pruned_away_ports: return port in self.pruned_away_ports return False return True
[docs] def adjacent_stack_ports(self, peer_dp): """Return list of ports that connect to an adjacent DP""" return [port for port in self.stack.ports if port.stack["dp"] == peer_dp]
[docs] def acl_update_tunnel(self, acl): """Return ofmsgs for all tunnels in an ACL with a tunnel rule""" ofmsgs = [] source_vids = defaultdict(list) for _id, tunnel_dest in acl.tunnel_dests.items(): dst_dp, dst_port = tunnel_dest["dst_dp"], tunnel_dest["dst_port"] # Update the tunnel rules for each tunnel action specified updated_sources = [] updated_reverse_sources = [] for source_id, source in acl.tunnel_sources.items(): # We loop through each tunnel source in a single ACL instance and update the info src_dp, src_port = source["dp"], source["port"] in_port = self.tunnel_outport(dst_dp, src_dp, src_port) out_port = self.tunnel_outport(src_dp, dst_dp, dst_port) updated = False if out_port is None and dst_port is None and dst_dp == self.dp.name: # Will need to update at most once, to ensure the correct rules # get populated in the destination DP for a tunnel that outputs # to just a DP updated = acl.update_source_tunnel_rules( self.stack.name, source_id, _id, out_port, self.output_table ) elif out_port: updated = acl.update_source_tunnel_rules( self.stack.name, source_id, _id, out_port, self.output_table ) if updated: if self.stack.name == src_dp: # We need to re-build and apply the whole ACL source_vids[source_id].append(_id) else: # We only need to re-build and apply the tunnel updated_sources.append(source_id) reverse_updated = False if src_port is None and in_port is None and src_dp == self.dp.name: reverse_updated = acl.update_reverse_tunnel_rules( self.stack.name, source_id, _id, in_port, self.output_table ) elif in_port: reverse_updated = acl.update_reverse_tunnel_rules( self.stack.name, source_id, _id, in_port, self.output_table ) if reverse_updated: if acl.requires_reverse_tunnel(_id): # Update the reverse tunnel rules if the tunnel is configured to have them updated_reverse_sources.append(source_id) # The tunnel in the ACL does not have a source on this stack instance, so # we only need to re-build the special tunnel forwarding rule. for source_id in updated_sources: ofmsgs.extend( self.acl_manager.build_tunnel_rules_ofmsgs(source_id, _id, acl) ) for source_id in updated_reverse_sources: ofmsgs.extend( self.acl_manager.build_reverse_tunnel_rules_ofmsgs( source_id, _id, acl ) ) # If a tunnel is updated, but the source is configured as the current DP # then we will also need to re-build the rest of the ACL rules aswell. for source_id, vids in source_vids.items(): for vid in vids: ofmsgs.extend( self.acl_manager.build_tunnel_acl_rule_ofmsgs(source_id, vid, acl) ) return ofmsgs
[docs] def add_tunnel_acls(self): """Returns ofmsgs installing the tunnel path rules""" ofmsgs = [] if self.tunnel_acls: for acl in self.tunnel_acls: ofmsgs.extend(self.acl_update_tunnel(acl)) return ofmsgs
[docs] def add_port(self, port): """Need to add tunnel if port comes up with tunnel ACLs.""" if not port.stack and port.tunnel_acls(): return self.add_tunnel_acls() return []