import socket
from os import kill, getpid
try:
from Queue import Full
except ImportError:
from queue import Full
from multiprocessing import Process
from struct import Struct, unpack
from msgpack import unpackb
import sys
from time import time, sleep
import logging
import os.path
from os import remove as os_remove
import settings
from skyline_functions import send_graphite_metric
parent_skyline_app = 'horizon'
child_skyline_app = 'listen'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.%s%s.%s' % (
parent_skyline_app, SERVER_METRIC_PATH, child_skyline_app)
python_version = int(sys.version_info[0])
# SafeUnpickler taken from Carbon: https://github.com/graphite-project/carbon/blob/master/lib/carbon/util.py
if python_version == 2:
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
if python_version == 3:
import io
try:
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
import cPickle as pickle # nosec
USING_CPICKLE = True
except:
import pickle # nosec
USING_CPICKLE = False
# This whole song & dance is due to pickle being insecure
# yet performance critical for carbon. We leave the insecure
# mode (which is faster) as an option (USE_INSECURE_UNPICKLER).
# The SafeUnpickler classes were largely derived from
# http://nadiana.com/python-pickle-insecure
if USING_CPICKLE:
class SafeUnpickler(object):
PICKLE_SAFE = {
'copy_reg': set(['_reconstructor']),
'__builtin__': set(['object']),
}
@classmethod
def find_class(cls, module, name):
if module not in cls.PICKLE_SAFE:
raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
__import__(module)
mod = sys.modules[module]
if name not in cls.PICKLE_SAFE[module]:
raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
return getattr(mod, name)
@classmethod
def loads(cls, pickle_string):
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
pickle_obj = pickle.Unpickler(StringIO(pickle_string)) # nosec
pickle_obj.find_global = cls.find_class
return pickle_obj.load()
else:
[docs] class SafeUnpickler(pickle.Unpickler):
PICKLE_SAFE = {
'copy_reg': set(['_reconstructor']),
'__builtin__': set(['object']),
}
[docs] def find_class(self, module, name):
if module not in self.PICKLE_SAFE:
raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
__import__(module)
mod = sys.modules[module]
if name not in self.PICKLE_SAFE[module]:
raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
return getattr(mod, name)
[docs] @classmethod
def loads(cls, pickle_string):
return cls(StringIO(pickle_string)).load()
# //SafeUnpickler
[docs]class Listen(Process):
"""
The listener is responsible for listening on a port.
"""
def __init__(self, port, queue, parent_pid, type="pickle"):
super(Listen, self).__init__()
try:
self.ip = settings.HORIZON_IP
except AttributeError:
# Default for backwards compatibility
self.ip = socket.gethostname()
self.port = port
self.q = queue
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.type = type
# Use the safe unpickler that comes with carbon rather than standard python pickle/cpickle
self.unpickler = SafeUnpickler
[docs] def gen_unpickle(self, infile):
"""
Generate a pickle from a stream
"""
try:
bunch = self.unpickler.loads(infile)
yield bunch
except EOFError:
return
[docs] def read_all(self, sock, n):
"""
Read n bytes from a stream
"""
data = ''
while n > 0:
# Break the loop when connection closes. #8 @earthgecko
# https://github.com/earthgecko/skyline/pull/8/files
# @earthgecko merged 1 commit into earthgecko:master from
# mlowicki:fix_infinite_loop on 16 Mar 2015
# Break the loop when connection closes. #115 @etsy
chunk = sock.recv(n)
count = len(chunk)
if count == 0:
break
n -= count
data += chunk
return data
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
exit(0)
[docs] def listen_pickle(self):
"""
Listen for pickles over tcp
"""
while 1:
try:
# Set up the TCP listening socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.ip, self.port))
s.setblocking(1)
s.listen(5)
logger.info('%s :: listening over tcp for pickles on %s' % (skyline_app, self.port))
(conn, address) = s.accept()
logger.info('%s :: connection from %s:%s' % (skyline_app, address[0], self.port))
chunk = []
while 1:
self.check_if_parent_is_alive()
try:
length = Struct('!I').unpack(self.read_all(conn, 4))
body = self.read_all(conn, length[0])
# Iterate and chunk each individual datapoint
for bunch in self.gen_unpickle(body):
for metric in bunch:
chunk.append(metric)
# Queue the chunk and empty the variable
if len(chunk) > settings.CHUNK_SIZE:
try:
self.q.put(list(chunk), block=False)
chunk[:] = []
# Drop chunk if queue is full
except Full:
chunks_dropped = str(len(chunk))
logger.info(
'%s :: pickle queue is full, dropping %s datapoints'
% (skyline_app, chunks_dropped))
# self.send_graphite_metric(
# 'skyline.horizon.' + SERVER_METRIC_PATH + 'pickle_chunks_dropped',
# chunks_dropped)
send_metric_name = '%s.pickle_chunks_dropped' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, chunks_dropped)
chunk[:] = []
except Exception as e:
logger.info(e)
logger.info('%s :: incoming pickle connection dropped, attempting to reconnect' % skyline_app)
break
except Exception as e:
logger.info('%s :: can not connect to socket: %s' % (skyline_app, str(e)))
break
[docs] def listen_udp(self):
"""
Listen over udp for MessagePack strings
"""
while 1:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((self.ip, self.port))
logger.info('%s :: listening over udp for messagepack on %s' % (skyline_app, self.port))
chunk = []
while 1:
self.check_if_parent_is_alive()
data, addr = s.recvfrom(1024)
metric = unpackb(data)
chunk.append(metric)
# Queue the chunk and empty the variable
if len(chunk) > settings.CHUNK_SIZE:
try:
self.q.put(list(chunk), block=False)
chunk[:] = []
# Drop chunk if queue is full
except Full:
chunks_dropped = str(len(chunk))
logger.info(
'%s :: UDP queue is full, dropping %s datapoints'
% (skyline_app, chunks_dropped))
send_metric_name = '%s.udp_chunks_dropped' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, chunks_dropped)
chunk[:] = []
except Exception as e:
logger.info('%s :: cannot connect to socket: %s' % (skyline_app, str(e)))
break
[docs] def run(self):
"""
Called when process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os_remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
pass
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os_remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
pass
else:
logger.info('bin/%s.d log management done' % skyline_app)
logger.info('%s :: started listener' % skyline_app)
if self.type == 'pickle':
self.listen_pickle()
elif self.type == 'udp':
self.listen_udp()
else:
logger.error('%s :: unknown listener format' % skyline_app)