from __future__ import division
import logging
from Queue import Empty
from redis import StrictRedis
from time import time, sleep
from threading import Thread
from collections import defaultdict
from multiprocessing import Process, Manager, Queue
from msgpack import Unpacker, unpackb, packb
import os
from os import path, kill, getpid, system
from math import ceil
import traceback
import operator
import socket
import re
from sys import version_info
import os.path
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
import settings
from alerters import trigger_alert
from algorithms_dev import run_selected_algorithm
# from skyline import algorithm_exceptions
from algorithm_exceptions import *
try:
send_algorithm_run_metrics = settings.ENABLE_ALGORITHM_RUN_METRICS
except:
send_algorithm_run_metrics = False
if send_algorithm_run_metrics:
from algorithms_dev import determine_median
# TODO if settings.ENABLE_CRUCIBLE: and ENABLE_PANORAMA
# from spectrum import push_to_crucible
skyline_app = 'analyzer_dev'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(version_info[0])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
[docs]class Analyzer(Thread):
"""
The Analyzer class which controls the analyzer thread and spawned processes.
"""
def __init__(self, parent_pid):
"""
Initialize the Analyzer
Create the :obj:`self.anomalous_metrics` list
Create the :obj:`self.exceptions_q` queue
Create the :obj:`self.anomaly_breakdown_q` queue
"""
super(Analyzer, self).__init__()
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.anomalous_metrics = Manager().list()
self.exceptions_q = Queue()
self.anomaly_breakdown_q = Queue()
self.mirage_metrics = Manager().list()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
exit(0)
[docs] def send_graphite_metric(self, name, value):
"""
Sends the skyline_app metrics to the `GRAPHITE_HOST` if a graphite
host is defined.
"""
if settings.GRAPHITE_HOST != '':
skyline_app_metric = skyline_app_graphite_namespace + name
sock = socket.socket()
sock.settimeout(10)
# Handle connection error to Graphite #116 @etsy
# Fixed as per https://github.com/etsy/skyline/pull/116 and
# mlowicki:etsy_handle_connection_error_to_graphite
# Handle connection error to Graphite #7 @ earthgecko
# merged 1 commit into earthgecko:master from
# mlowicki:handle_connection_error_to_graphite on 16 Mar 2015
try:
sock.connect((settings.GRAPHITE_HOST, settings.CARBON_PORT))
sock.settimeout(None)
except socket.error:
sock.settimeout(None)
endpoint = '%s:%d' % (settings.GRAPHITE_HOST,
settings.CARBON_PORT)
logger.error("Can't connect to Graphite at %s" % endpoint)
return False
# For the same reason as above
# sock.sendall('%s %s %i\n' % (name, value, time()))
try:
sock.sendall('%s %s %i\n' % (skyline_app_metric, value, time()))
sock.close()
return True
except:
endpoint = '%s:%d' % (settings.GRAPHITE_HOST,
settings.CARBON_PORT)
logger.error("Can't connect to Graphite at %s" % endpoint)
return False
return False
[docs] def spin_process(self, i, unique_metrics):
"""
Assign a bunch of metrics for a process to analyze.
Multiple get the assigned_metrics to the process from Redis.
For each metric:\n
* unpack the `raw_timeseries` for the metric.\n
* Analyse each timeseries against `ALGORITHMS` to determine if it is\n
anomalous.\n
* If anomalous add it to the :obj:`self.anomalous_metrics` list\n
* Add what algorithms triggered to the :obj:`self.anomaly_breakdown_q` queue\n
Add keys and values to the queue so the parent process can collate for:\n
* :py:obj:`self.anomaly_breakdown_q`
* :py:obj:`self.exceptions_q`
"""
spin_start = time()
logger.info('spin_process started')
# Discover assigned metrics
keys_per_processor = int(ceil(float(len(unique_metrics)) / float(settings.ANALYZER_PROCESSES)))
if i == settings.ANALYZER_PROCESSES:
assigned_max = len(unique_metrics)
else:
assigned_max = min(len(unique_metrics), i * keys_per_processor)
# Fix analyzer worker metric assignment #94
# https://github.com/etsy/skyline/pull/94 @languitar:worker-fix
assigned_min = (i - 1) * keys_per_processor
assigned_keys = range(assigned_min, assigned_max)
# assigned_keys = range(300, 310)
# Compile assigned metrics
assigned_metrics = [unique_metrics[index] for index in assigned_keys]
# Check if this process is unnecessary
if len(assigned_metrics) == 0:
return
# Multi get series
raw_assigned = self.redis_conn.mget(assigned_metrics)
# Make process-specific dicts
exceptions = defaultdict(int)
anomaly_breakdown = defaultdict(int)
# Distill timeseries strings into lists
for i, metric_name in enumerate(assigned_metrics):
self.check_if_parent_is_alive()
# logger.info('analysing %s' % metric_name)
try:
raw_series = raw_assigned[i]
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
anomalous, ensemble, datapoint = run_selected_algorithm(timeseries, metric_name)
# If it's anomalous, add it to list
if anomalous:
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
metric = [datapoint, base_name]
self.anomalous_metrics.append(metric)
# Get the anomaly breakdown - who returned True?
triggered_algorithms = []
for index, value in enumerate(ensemble):
if value:
algorithm = settings.ALGORITHMS[index]
anomaly_breakdown[algorithm] += 1
triggered_algorithms.append(algorithm)
# It could have been deleted by the Roomba
except TypeError:
# logger.error('TypeError analysing %s' % metric_name)
exceptions['DeletedByRoomba'] += 1
except TooShort:
# logger.error('TooShort analysing %s' % metric_name)
exceptions['TooShort'] += 1
except Stale:
# logger.error('Stale analysing %s' % metric_name)
exceptions['Stale'] += 1
except Boring:
# logger.error('Boring analysing %s' % metric_name)
exceptions['Boring'] += 1
except:
# logger.error('Other analysing %s' % metric_name)
exceptions['Other'] += 1
logger.info(traceback.format_exc())
# Add values to the queue so the parent process can collate
for key, value in anomaly_breakdown.items():
self.anomaly_breakdown_q.put((key, value))
for key, value in exceptions.items():
self.exceptions_q.put((key, value))
spin_end = time() - spin_start
logger.info('spin_process took %.2f seconds' % spin_end)
[docs] def run(self):
"""
Called when the process intializes.
Determine if Redis is up and discover the number of `unique metrics`.
Divide the `unique_metrics` between the number of `ANALYZER_PROCESSES`
and assign each process a set of metrics to analyse for anomalies.
Wait for the processes to finish.
Process the Determine whether if any anomalous metrics require:\n
* alerting on (and set `EXPIRATION_TIME` key in Redis for alert).\n
* feeding to another module e.g. mirage.
Populated the webapp json the anomalous_metrics details.
Log the details about the run to the skyline log.
Send skyline.analyzer metrics to `GRAPHITE_HOST`,
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
pass
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
pass
else:
logger.info('bin/%s.d log management done' % skyline_app)
if not os.path.exists(settings.SKYLINE_TMP_DIR):
if python_version == 2:
os.makedirs(settings.SKYLINE_TMP_DIR, 0750)
if python_version == 3:
os.makedirs(settings.SKYLINE_TMP_DIR, mode=0o750)
# Initiate the algorithm timings if Analyzer is configured to send the
# algorithm_breakdown metrics with ENABLE_ALGORITHM_RUN_METRICS
algorithm_tmp_file_prefix = settings.SKYLINE_TMP_DIR + '/' + skyline_app + '.'
algorithms_to_time = []
if send_algorithm_run_metrics:
algorithms_to_time = settings.ALGORITHMS
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except:
logger.error('skyline can\'t connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
continue
# Report app up
self.redis_conn.setex(skyline_app, 120, now)
# Discover unique metrics
unique_metrics = list(self.redis_conn.smembers(settings.FULL_NAMESPACE + 'unique_metrics'))
if len(unique_metrics) == 0:
logger.info('no metrics in redis. try adding some - see README')
sleep(10)
continue
# Using count files rather that multiprocessing.Value to enable metrics for
# metrics for algorithm run times, etc
for algorithm in algorithms_to_time:
algorithm_count_file = algorithm_tmp_file_prefix + algorithm + '.count'
algorithm_timings_file = algorithm_tmp_file_prefix + algorithm + '.timings'
# with open(algorithm_count_file, 'a') as f:
with open(algorithm_count_file, 'w') as f:
pass
with open(algorithm_timings_file, 'w') as f:
pass
# Spawn processes
pids = []
pid_count = 0
for i in range(1, settings.ANALYZER_PROCESSES + 1):
if i > len(unique_metrics):
logger.info('WARNING: skyline is set for more cores than needed.')
break
p = Process(target=self.spin_process, args=(i, unique_metrics))
pids.append(p)
pid_count += 1
logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.ANALYZER_PROCESSES)))
p.start()
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_process has run
# for longer than 180 seconds
p_starts = time()
while time() - p_starts <= 180:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(settings.ANALYZER_PROCESSES), time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
# Grab data from the queue and populate dictionaries
exceptions = dict()
anomaly_breakdown = dict()
while 1:
try:
key, value = self.anomaly_breakdown_q.get_nowait()
if key not in anomaly_breakdown.keys():
anomaly_breakdown[key] = value
else:
anomaly_breakdown[key] += value
except Empty:
break
while 1:
try:
key, value = self.exceptions_q.get_nowait()
if key not in exceptions.keys():
exceptions[key] = value
else:
exceptions[key] += value
except Empty:
break
# Push to panorama
# if len(self.panorama_anomalous_metrics) > 0:
# logger.info('to do - push to panorama')
# Push to crucible
# if len(self.crucible_anomalous_metrics) > 0:
# logger.info('to do - push to crucible')
# Write anomalous_metrics to static webapp directory
# Using count files rather that multiprocessing.Value to enable metrics for
# metrics for algorithm run times, etc
for algorithm in algorithms_to_time:
algorithm_count_file = algorithm_tmp_file_prefix + algorithm + '.count'
algorithm_timings_file = algorithm_tmp_file_prefix + algorithm + '.timings'
try:
algorithm_count_array = []
with open(algorithm_count_file, 'r') as f:
for line in f:
value_string = line.replace('\n', '')
unquoted_value_string = value_string.replace("'", '')
float_value = float(unquoted_value_string)
algorithm_count_array.append(float_value)
except:
algorithm_count_array = False
if not algorithm_count_array:
continue
number_of_times_algorithm_run = len(algorithm_count_array)
logger.info(
'algorithm run count - %s run %s times' % (
algorithm, str(number_of_times_algorithm_run)))
if number_of_times_algorithm_run == 0:
continue
try:
algorithm_timings_array = []
with open(algorithm_timings_file, 'r') as f:
for line in f:
value_string = line.replace('\n', '')
unquoted_value_string = value_string.replace("'", '')
float_value = float(unquoted_value_string)
algorithm_timings_array.append(float_value)
except:
algorithm_timings_array = False
if not algorithm_timings_array:
continue
number_of_algorithm_timings = len(algorithm_timings_array)
logger.info(
'algorithm timings count - %s has %s timings' % (
algorithm, str(number_of_algorithm_timings)))
if number_of_algorithm_timings == 0:
continue
try:
_sum_of_algorithm_timings = sum(algorithm_timings_array)
except:
logger.error("sum error: " + traceback.format_exc())
_sum_of_algorithm_timings = round(0.0, 6)
logger.error('error - sum_of_algorithm_timings - %s' % (algorithm))
continue
sum_of_algorithm_timings = round(_sum_of_algorithm_timings, 6)
# logger.info('sum_of_algorithm_timings - %s - %.16f seconds' % (algorithm, sum_of_algorithm_timings))
try:
_median_algorithm_timing = determine_median(algorithm_timings_array)
except:
_median_algorithm_timing = round(0.0, 6)
logger.error('error - _median_algorithm_timing - %s' % (algorithm))
continue
median_algorithm_timing = round(_median_algorithm_timing, 6)
# logger.info('median_algorithm_timing - %s - %.16f seconds' % (algorithm, median_algorithm_timing))
logger.info(
'algorithm timing - %s - total: %.6f - median: %.6f' % (
algorithm, sum_of_algorithm_timings,
median_algorithm_timing))
send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.times_run'
self.send_graphite_metric(send_mertic_name, '%d' % number_of_algorithm_timings)
send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.total_time'
self.send_graphite_metric(send_mertic_name, '%.6f' % sum_of_algorithm_timings)
send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.median_time'
self.send_graphite_metric(send_mertic_name, '%.6f' % median_algorithm_timing)
# Log progress
logger.info('seconds to run :: %.2f' % (time() - now))
logger.info('total metrics :: %d' % len(unique_metrics))
logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(exceptions.values())))
logger.info('total anomalies :: %d' % len(self.anomalous_metrics))
logger.info('exception stats :: %s' % exceptions)
logger.info('anomaly breakdown :: %s' % anomaly_breakdown)
# Log to Graphite
self.send_graphite_metric('run_time', '%.2f' % (time() - now))
self.send_graphite_metric('total_analyzed', '%.2f' % (len(unique_metrics) - sum(exceptions.values())))
self.send_graphite_metric('total_anomalies', '%d' % len(self.anomalous_metrics))
self.send_graphite_metric('total_metrics', '%d' % len(unique_metrics))
for key, value in exceptions.items():
send_metric = 'exceptions.%s' % key
self.send_graphite_metric(send_metric, '%d' % value)
for key, value in anomaly_breakdown.items():
send_metric = 'anomaly_breakdown.%s' % key
self.send_graphite_metric(send_metric, '%d' % value)
# Check canary metric
raw_series = self.redis_conn.get(settings.FULL_NAMESPACE + settings.CANARY_METRIC)
if raw_series is not None:
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
time_human = (timeseries[-1][0] - timeseries[0][0]) / 3600
projected = 24 * (time() - now) / time_human
logger.info('canary duration :: %.2f' % time_human)
self.send_graphite_metric('duration', '%.2f' % time_human)
self.send_graphite_metric('projected', '%.2f' % projected)
# Reset counters
self.anomalous_metrics[:] = []
# Sleep if it went too fast
# if time() - now < 5:
# logger.info('sleeping due to low run time...')
# sleep(10)
# @modified 20160504 - @earthgecko - development internal ref #1338, #1340)
# Etsy's original if this was a value of 5 seconds which does
# not make skyline Analyzer very efficient in terms of installations
# where 100s of 1000s of metrics are being analyzed. This lead to
# Analyzer running over several metrics multiple time in a minute
# and always working. Therefore this was changed from if you took
# less than 5 seconds to run only then sleep. This behaviour
# resulted in Analyzer analysing a few 1000 metrics in 9 seconds and
# then doing it again and again in a single minute. Therefore the
# ANALYZER_OPTIMUM_RUN_DURATION setting was added to allow this to
# self optimise in cases where skyline is NOT deployed to analyze
# 100s of 1000s of metrics. This relates to optimising performance
# for any deployments in the few 1000s and 60 second resolution
# area, e.g. smaller and local deployments.
process_runtime = time() - now
analyzer_optimum_run_duration = settings.ANALYZER_OPTIMUM_RUN_DURATION
if process_runtime < analyzer_optimum_run_duration:
sleep_for = (analyzer_optimum_run_duration - process_runtime)
# sleep_for = 60
logger.info('sleeping for %.2f seconds due to low run time...' % sleep_for)
sleep(sleep_for)