"""FAUCET event notification."""
# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation.
# Copyright (C) 2015 Brad Cowie, Christopher Lorier and Joe Stringer.
# Copyright (C) 2015 Research and Education Advanced Network New Zealand Ltd.
# Copyright (C) 2015--2019 The Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import socket
import time
from contextlib import contextmanager
import eventlet
from os_ken.lib import hub
from os_ken.lib.hub import StreamServer
[docs]
class NonBlockLock:
"""Non blocking lock that can be used as a context manager."""
def __init__(self):
self._lock = eventlet.semaphore.Semaphore()
[docs]
@contextmanager
def acquire_nonblock(self):
"""Attempt to acquire a lock."""
result = self._lock.acquire(blocking=False)
yield result
if result:
self.release()
[docs]
def release(self):
"""Release lock when done."""
self._lock.release()
[docs]
class FaucetEventNotifier:
"""Event notification, via Unix domain socket."""
def __init__(self, socket_path, metrics, logger):
self.logger = logger
self.socket_path = self.check_path(socket_path)
self.metrics = metrics
self.event_id = 0
self.thread = None
self.lock = NonBlockLock()
self.event_q = eventlet.queue.Queue(120)
[docs]
def start(self):
"""Start socket server."""
if self.socket_path:
stream_server = StreamServer(
(self.socket_path, None), self._loop
).serve_forever
self.thread = hub.spawn(stream_server)
self.thread.name = "event"
return self.thread
def _loop(self, sock, _addr):
"""Serve events."""
with self.lock.acquire_nonblock() as result:
if not result:
self.logger.info("multiple event clients not supported")
else:
self.logger.info("event client connected")
while True:
event = self.event_q.get()
event_bytes = bytes(
"\n".join((json.dumps(event, default=str), "")).encode("UTF-8")
)
try:
sock.sendall(event_bytes)
except (socket.error, IOError) as err:
self.logger.info("event client disconnected: %s", err)
break
try:
sock.close()
except (socket.error, IOError):
pass
[docs]
def get_event(self):
assert self.thread is None, "not allowed with async _loop"
return None if self.event_q.empty() else self.event_q.get()
[docs]
def notify(self, dp_id, dp_name, event_dict):
"""Notify of an event."""
assert isinstance(event_dict, dict)
self.event_id += 1
event = {
"version": 1,
"time": time.time(),
"dp_id": dp_id,
"dp_name": dp_name,
"event_id": self.event_id,
}
for header_key in list(event):
assert header_key not in event_dict
event.update(event_dict)
self.metrics.faucet_event_id.set(event["event_id"])
if self.event_q.full():
self.event_q.get()
self.event_q.put(event)
[docs]
def check_path(self, socket_path):
"""Check that socket_path is valid."""
if not socket_path:
return None
socket_path = os.path.abspath(socket_path)
socket_dir = os.path.dirname(socket_path)
# Create parent directories that don't exist.
if not os.path.exists(socket_dir):
try:
os.makedirs(socket_dir)
except PermissionError as err: # pytype: disable=name-error
self.logger.error("Unable to create event socket directory: %s", err)
return None
# Check directory permissions.
if not os.access(socket_dir, os.R_OK | os.W_OK | os.X_OK):
self.logger.error(
"Incorrect permissions set on socket directory %s", socket_dir
)
return None
# Remove stale socket file.
if os.path.exists(socket_path):
try:
os.remove(socket_path)
except PermissionError as err: # pytype: disable=name-error
self.logger.error("Unable to remove old socket: %s", err)
return None
return socket_path