import socket
from os import kill, getpid
try:
from Queue import Full
except ImportError:
from queue import Full
from multiprocessing import Process
from struct import Struct, unpack
from msgpack import unpackb
import sys
from time import time, sleep
import traceback
import logging
import os.path
from os import remove as os_remove
import settings
from skyline_functions import send_graphite_metric
parent_skyline_app = 'horizon'
child_skyline_app = 'listen'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.%s%s.%s' % (
parent_skyline_app, SERVER_METRIC_PATH, child_skyline_app)
python_version = int(sys.version_info[0])
# SafeUnpickler taken from Carbon: https://github.com/graphite-project/carbon/blob/master/lib/carbon/util.py
if python_version == 2:
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
# @added 20191016 - Task #3278: py3 handle bytes and not str in pickles
# For backwards compatibility Horizon needs to load BytesIO in py2
from io import BytesIO
if python_version == 3:
# @modified 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# import io
from io import StringIO
from io import BytesIO
try:
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
import cPickle as pickle # nosec
USING_CPICKLE = True
except:
import pickle # nosec
USING_CPICKLE = False
# @added 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# Add ability to log for debugging
LOCAL_DEBUG = False
# This whole song & dance is due to pickle being insecure
# yet performance critical for carbon. We leave the insecure
# mode (which is faster) as an option (USE_INSECURE_UNPICKLER).
# The SafeUnpickler classes were largely derived from
# http://nadiana.com/python-pickle-insecure
if USING_CPICKLE:
class SafeUnpickler(object):
PICKLE_SAFE = {
'copy_reg': set(['_reconstructor']),
'__builtin__': set(['object']),
}
@classmethod
def find_class(cls, module, name):
if module not in cls.PICKLE_SAFE:
raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
__import__(module)
mod = sys.modules[module]
if name not in cls.PICKLE_SAFE[module]:
raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
return getattr(mod, name)
@classmethod
def loads(cls, pickle_string):
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
# @modified 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# pickle_obj = pickle.Unpickler(StringIO(pickle_string)) # nosec
if python_version == 2:
pickle_obj = pickle.Unpickler(StringIO(pickle_string)) # nosec
if python_version == 3:
pickle_obj = pickle.Unpickler(BytesIO(pickle_string)) # nosec
pickle_obj.find_global = cls.find_class
return pickle_obj.load()
else:
[docs] class SafeUnpickler(pickle.Unpickler):
PICKLE_SAFE = {
'copy_reg': set(['_reconstructor']),
'__builtin__': set(['object']),
}
[docs] def find_class(self, module, name):
if module not in self.PICKLE_SAFE:
raise pickle.UnpicklingError('Attempting to unpickle unsafe module %s' % module)
__import__(module)
mod = sys.modules[module]
if name not in self.PICKLE_SAFE[module]:
raise pickle.UnpicklingError('Attempting to unpickle unsafe class %s' % name)
return getattr(mod, name)
[docs] @classmethod
def loads(cls, pickle_string):
# @modified 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# return cls(StringIO(pickle_string)).load()
if python_version == 2:
return cls(StringIO(pickle_string)).load()
if python_version == 3:
return cls(BytesIO(pickle_string)).load()
# //SafeUnpickler
[docs]class Listen(Process):
"""
The listener is responsible for listening on a port.
"""
def __init__(self, port, queue, parent_pid, type="pickle"):
super(Listen, self).__init__()
try:
self.ip = settings.HORIZON_IP
except AttributeError:
# Default for backwards compatibility
self.ip = socket.gethostname()
self.port = port
self.q = queue
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.type = type
# Use the safe unpickler that comes with carbon rather than standard python pickle/cpickle
self.unpickler = SafeUnpickler
[docs] def gen_unpickle(self, infile):
"""
Generate a pickle from a stream
"""
try:
bunch = self.unpickler.loads(infile)
yield bunch
except EOFError:
return
[docs] def read_all(self, sock, n):
"""
Read n bytes from a stream
"""
# @modified 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# The data is type str in py2 and class bytes in py3 and using bytes in
# the data object does not allow for concatenation as was possible with
# strings
# data = ''
if python_version == 2:
data = ''
if python_version == 3:
data = b''
while n > 0:
# Break the loop when connection closes. #8 @earthgecko
# https://github.com/earthgecko/skyline/pull/8/files
# @earthgecko merged 1 commit into earthgecko:master from
# mlowicki:fix_infinite_loop on 16 Mar 2015
# Break the loop when connection closes. #115 @etsy
chunk = sock.recv(n)
count = len(chunk)
if count == 0:
break
n -= count
# @modified 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# In py3 he data is bytes not str and bytes can not be concatenated
# like str. Also added debug logging.
# data += chunk
try:
if python_version == 2:
data += chunk
if python_version == 3:
new_data = data + chunk
data = new_data
if LOCAL_DEBUG:
logger.debug('debug :: listen :: read_all with chunk - %s' % str(chunk))
except:
if LOCAL_DEBUG:
logger.error(traceback.format_exc())
logger.error('error :: listen :: read_all with chunk - %s' % str(chunk))
data = False
return data
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
# @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics
# Log warning
logger.warning('warning :: parent or current process dead')
exit(0)
[docs] def listen_pickle(self):
"""
Listen for pickles over tcp
"""
while 1:
try:
# Set up the TCP listening socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.ip, self.port))
s.setblocking(1)
s.listen(5)
logger.info('%s :: listening over tcp for pickles on %s' % (skyline_app, str(self.port)))
(conn, address) = s.accept()
logger.info('%s :: connection from %s on %s' % (skyline_app, str(address[0]), str(self.port)))
chunk = []
while 1:
self.check_if_parent_is_alive()
try:
# @modified 20191016 - Task #3278: py3 handle bytes and not str in pickles
# Branch #3262: py3
# Added ability to log and debug
if LOCAL_DEBUG:
length = None
body = None
try:
length = Struct('!I').unpack(self.read_all(conn, 4))
except:
logger.error(traceback.format_exc())
logger.error('error :: listen :: could not determine length')
if length:
try:
body = self.read_all(conn, length[0])
except:
logger.error(traceback.format_exc())
logger.error('error :: listen :: could not determine body')
else:
length = Struct('!I').unpack(self.read_all(conn, 4))
body = self.read_all(conn, length[0])
# Iterate and chunk each individual datapoint
for bunch in self.gen_unpickle(body):
for metric in bunch:
chunk.append(metric)
# Queue the chunk and empty the variable
if len(chunk) > settings.CHUNK_SIZE:
try:
self.q.put(list(chunk), block=False)
chunk[:] = []
# Drop chunk if queue is full
except Full:
chunks_dropped = str(len(chunk))
logger.info(
'%s :: pickle queue is full, dropping %s datapoints'
% (skyline_app, chunks_dropped))
# self.send_graphite_metric(
# 'skyline.horizon.' + SERVER_METRIC_PATH + 'pickle_chunks_dropped',
# chunks_dropped)
send_metric_name = '%s.pickle_chunks_dropped' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, chunks_dropped)
chunk[:] = []
except Exception as e:
logger.info(e)
logger.info('%s :: incoming pickle connection dropped, attempting to reconnect' % skyline_app)
break
except Exception as e:
logger.info('%s :: can not connect to socket: %s' % (skyline_app, str(e)))
break
[docs] def listen_udp(self):
"""
Listen over udp for MessagePack strings
"""
while 1:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind((self.ip, self.port))
logger.info('%s :: listening over udp for messagepack on %s' % (skyline_app, self.port))
chunk = []
while 1:
self.check_if_parent_is_alive()
data, addr = s.recvfrom(1024)
# @modified 20191014 - Task #3272: horizon - listen - py3 handle msgpack bytes
# Branch #3262: py3
# Bug #3266: py3 Redis binary objects not strings
# msgpack encoding of bytes and not str as per
# https://msgpack.org/#string-and-binary-type Python/msgpack
# and https://stackoverflow.com/a/47070687/107406
# metric = unpackb(data)
if python_version == 3:
# @added 20210328 - [Q] The "horizon.test.pickle" test is getting an error. #419
# Wrap in try and except and try without encoding if
# with encoding fails
try:
metric = unpackb(data, encoding='utf-8')
except Exception as e:
logger.error('%s :: unpackb error : %s' % (skyline_app, str(e)))
try:
logger.info('%s :: trying unpackb without encoding' % (skyline_app))
metric = unpackb(data)
except Exception as e:
logger.info('%s :: unpackb without encoding error : %s' % (skyline_app, str(e)))
else:
metric = unpackb(data)
chunk.append(metric)
# Queue the chunk and empty the variable
if len(chunk) > settings.CHUNK_SIZE:
try:
self.q.put(list(chunk), block=False)
chunk[:] = []
# Drop chunk if queue is full
except Full:
chunks_dropped = str(len(chunk))
logger.info(
'%s :: UDP queue is full, dropping %s datapoints'
% (skyline_app, chunks_dropped))
send_metric_name = '%s.udp_chunks_dropped' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, chunks_dropped)
chunk[:] = []
except Exception as e:
logger.info('%s :: cannot connect to socket: %s' % (skyline_app, str(e)))
break
[docs] def run(self):
"""
Called when process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os_remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
pass
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os_remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
pass
else:
logger.info('bin/%s.d log management done' % skyline_app)
logger.info('%s :: started listener' % skyline_app)
if self.type == 'pickle':
self.listen_pickle()
elif self.type == 'udp':
self.listen_udp()
else:
logger.error('%s :: unknown listener format' % skyline_app)