Source code for faucet.faucet_event

"""FAUCET event notification."""

# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import socket
import time
from contextlib import contextmanager

import eventlet

from os_ken.lib import hub
from os_ken.lib.hub import StreamServer


[docs] class NonBlockLock: """Non blocking lock that can be used as a context manager.""" def __init__(self): self._lock = eventlet.semaphore.Semaphore()
[docs] @contextmanager def acquire_nonblock(self): """Attempt to acquire a lock.""" result = self._lock.acquire(blocking=False) yield result if result: self.release()
[docs] def release(self): """Release lock when done.""" self._lock.release()
[docs] class FaucetEventNotifier: """Event notification, via Unix domain socket.""" def __init__(self, socket_path, metrics, logger): self.logger = logger self.socket_path = self.check_path(socket_path) self.metrics = metrics self.event_id = 0 self.thread = None self.lock = NonBlockLock() self.event_q = eventlet.queue.Queue(120)
[docs] def start(self): """Start socket server.""" if self.socket_path: stream_server = StreamServer( (self.socket_path, None), self._loop ).serve_forever self.thread = hub.spawn(stream_server) self.thread.name = "event" return self.thread
def _loop(self, sock, _addr): """Serve events.""" with self.lock.acquire_nonblock() as result: if not result: self.logger.info("multiple event clients not supported") else: self.logger.info("event client connected") while True: event = self.event_q.get() event_bytes = bytes( "\n".join((json.dumps(event, default=str), "")).encode("UTF-8") ) try: sock.sendall(event_bytes) except (socket.error, IOError) as err: self.logger.info("event client disconnected: %s", err) break try: sock.close() except (socket.error, IOError): pass
[docs] def get_event(self): assert self.thread is None, "not allowed with async _loop" return None if self.event_q.empty() else self.event_q.get()
[docs] def notify(self, dp_id, dp_name, event_dict): """Notify of an event.""" assert isinstance(event_dict, dict) self.event_id += 1 event = { "version": 1, "time": time.time(), "dp_id": dp_id, "dp_name": dp_name, "event_id": self.event_id, } for header_key in list(event): assert header_key not in event_dict event.update(event_dict) self.metrics.faucet_event_id.set(event["event_id"]) if self.event_q.full(): self.event_q.get() self.event_q.put(event)
[docs] def check_path(self, socket_path): """Check that socket_path is valid.""" if not socket_path: return None socket_path = os.path.abspath(socket_path) socket_dir = os.path.dirname(socket_path) # Create parent directories that don't exist. if not os.path.exists(socket_dir): try: os.makedirs(socket_dir) except PermissionError as err: # pytype: disable=name-error self.logger.error("Unable to create event socket directory: %s", err) return None # Check directory permissions. if not os.access(socket_dir, os.R_OK | os.W_OK | os.X_OK): self.logger.error( "Incorrect permissions set on socket directory %s", socket_dir ) return None # Remove stale socket file. if os.path.exists(socket_path): try: os.remove(socket_path) except PermissionError as err: # pytype: disable=name-error self.logger.error("Unable to remove old socket: %s", err) return None return socket_path