Source code for analyzer_dev.analyzer_dev

from __future__ import division
import logging
from Queue import Empty
from redis import StrictRedis
from time import time, sleep
from threading import Thread
from collections import defaultdict
from multiprocessing import Process, Manager, Queue
from msgpack import Unpacker, unpackb, packb
import os
from os import path, kill, getpid, system
from math import ceil
import traceback
import operator
import socket
import re
from sys import version_info

import os.path
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
import settings

from alerters import trigger_alert
from algorithms_dev import run_selected_algorithm
# from skyline import algorithm_exceptions
from algorithm_exceptions import *

try:
    send_algorithm_run_metrics = settings.ENABLE_ALGORITHM_RUN_METRICS
except:
    send_algorithm_run_metrics = False
if send_algorithm_run_metrics:
    from algorithms_dev import determine_median

# TODO if settings.ENABLE_CRUCIBLE: and ENABLE_PANORAMA
#    from spectrum import push_to_crucible

skyline_app = 'analyzer_dev'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

python_version = int(version_info[0])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)


[docs]class Analyzer(Thread): """ The Analyzer class which controls the analyzer thread and spawned processes. """ def __init__(self, parent_pid): """ Initialize the Analyzer Create the :obj:`self.anomalous_metrics` list Create the :obj:`self.exceptions_q` queue Create the :obj:`self.anomaly_breakdown_q` queue """ super(Analyzer, self).__init__() # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() self.anomalous_metrics = Manager().list() self.exceptions_q = Queue() self.anomaly_breakdown_q = Queue() self.mirage_metrics = Manager().list()
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: exit(0)
[docs] def send_graphite_metric(self, name, value): """ Sends the skyline_app metrics to the `GRAPHITE_HOST` if a graphite host is defined. """ if settings.GRAPHITE_HOST != '': skyline_app_metric = skyline_app_graphite_namespace + name sock = socket.socket() sock.settimeout(10) # Handle connection error to Graphite #116 @etsy # Fixed as per https://github.com/etsy/skyline/pull/116 and # mlowicki:etsy_handle_connection_error_to_graphite # Handle connection error to Graphite #7 @ earthgecko # merged 1 commit into earthgecko:master from # mlowicki:handle_connection_error_to_graphite on 16 Mar 2015 try: sock.connect((settings.GRAPHITE_HOST, settings.CARBON_PORT)) sock.settimeout(None) except socket.error: sock.settimeout(None) endpoint = '%s:%d' % (settings.GRAPHITE_HOST, settings.CARBON_PORT) logger.error("Can't connect to Graphite at %s" % endpoint) return False # For the same reason as above # sock.sendall('%s %s %i\n' % (name, value, time())) try: sock.sendall('%s %s %i\n' % (skyline_app_metric, value, time())) sock.close() return True except: endpoint = '%s:%d' % (settings.GRAPHITE_HOST, settings.CARBON_PORT) logger.error("Can't connect to Graphite at %s" % endpoint) return False return False
[docs] def spin_process(self, i, unique_metrics): """ Assign a bunch of metrics for a process to analyze. Multiple get the assigned_metrics to the process from Redis. For each metric:\n * unpack the `raw_timeseries` for the metric.\n * Analyse each timeseries against `ALGORITHMS` to determine if it is\n anomalous.\n * If anomalous add it to the :obj:`self.anomalous_metrics` list\n * Add what algorithms triggered to the :obj:`self.anomaly_breakdown_q` queue\n Add keys and values to the queue so the parent process can collate for:\n * :py:obj:`self.anomaly_breakdown_q` * :py:obj:`self.exceptions_q` """ spin_start = time() logger.info('spin_process started') # Discover assigned metrics keys_per_processor = int(ceil(float(len(unique_metrics)) / float(settings.ANALYZER_PROCESSES))) if i == settings.ANALYZER_PROCESSES: assigned_max = len(unique_metrics) else: assigned_max = min(len(unique_metrics), i * keys_per_processor) # Fix analyzer worker metric assignment #94 # https://github.com/etsy/skyline/pull/94 @languitar:worker-fix assigned_min = (i - 1) * keys_per_processor assigned_keys = range(assigned_min, assigned_max) # assigned_keys = range(300, 310) # Compile assigned metrics assigned_metrics = [unique_metrics[index] for index in assigned_keys] # Check if this process is unnecessary if len(assigned_metrics) == 0: return # Multi get series raw_assigned = self.redis_conn.mget(assigned_metrics) # Make process-specific dicts exceptions = defaultdict(int) anomaly_breakdown = defaultdict(int) # Distill timeseries strings into lists for i, metric_name in enumerate(assigned_metrics): self.check_if_parent_is_alive() # logger.info('analysing %s' % metric_name) try: raw_series = raw_assigned[i] unpacker = Unpacker(use_list=False) unpacker.feed(raw_series) timeseries = list(unpacker) anomalous, ensemble, datapoint = run_selected_algorithm(timeseries, metric_name) # If it's anomalous, add it to list if anomalous: base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1) metric = [datapoint, base_name] self.anomalous_metrics.append(metric) # Get the anomaly breakdown - who returned True? triggered_algorithms = [] for index, value in enumerate(ensemble): if value: algorithm = settings.ALGORITHMS[index] anomaly_breakdown[algorithm] += 1 triggered_algorithms.append(algorithm) # It could have been deleted by the Roomba except TypeError: # logger.error('TypeError analysing %s' % metric_name) exceptions['DeletedByRoomba'] += 1 except TooShort: # logger.error('TooShort analysing %s' % metric_name) exceptions['TooShort'] += 1 except Stale: # logger.error('Stale analysing %s' % metric_name) exceptions['Stale'] += 1 except Boring: # logger.error('Boring analysing %s' % metric_name) exceptions['Boring'] += 1 except: # logger.error('Other analysing %s' % metric_name) exceptions['Other'] += 1 logger.info(traceback.format_exc()) # Add values to the queue so the parent process can collate for key, value in anomaly_breakdown.items(): self.anomaly_breakdown_q.put((key, value)) for key, value in exceptions.items(): self.exceptions_q.put((key, value)) spin_end = time() - spin_start logger.info('spin_process took %.2f seconds' % spin_end)
[docs] def run(self): """ Called when the process intializes. Determine if Redis is up and discover the number of `unique metrics`. Divide the `unique_metrics` between the number of `ANALYZER_PROCESSES` and assign each process a set of metrics to analyse for anomalies. Wait for the processes to finish. Process the Determine whether if any anomalous metrics require:\n * alerting on (and set `EXPIRATION_TIME` key in Redis for alert).\n * feeding to another module e.g. mirage. Populated the webapp json the anomalous_metrics details. Log the details about the run to the skyline log. Send skyline.analyzer metrics to `GRAPHITE_HOST`, """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) pass now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) pass else: logger.info('bin/%s.d log management done' % skyline_app) if not os.path.exists(settings.SKYLINE_TMP_DIR): if python_version == 2: os.makedirs(settings.SKYLINE_TMP_DIR, 0750) if python_version == 3: os.makedirs(settings.SKYLINE_TMP_DIR, mode=0o750) # Initiate the algorithm timings if Analyzer is configured to send the # algorithm_breakdown metrics with ENABLE_ALGORITHM_RUN_METRICS algorithm_tmp_file_prefix = settings.SKYLINE_TMP_DIR + '/' + skyline_app + '.' algorithms_to_time = [] if send_algorithm_run_metrics: algorithms_to_time = settings.ALGORITHMS while 1: now = time() # Make sure Redis is up try: self.redis_conn.ping() except: logger.error('skyline can\'t connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(10) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) continue # Report app up self.redis_conn.setex(skyline_app, 120, now) # Discover unique metrics unique_metrics = list(self.redis_conn.smembers(settings.FULL_NAMESPACE + 'unique_metrics')) if len(unique_metrics) == 0: logger.info('no metrics in redis. try adding some - see README') sleep(10) continue # Using count files rather that multiprocessing.Value to enable metrics for # metrics for algorithm run times, etc for algorithm in algorithms_to_time: algorithm_count_file = algorithm_tmp_file_prefix + algorithm + '.count' algorithm_timings_file = algorithm_tmp_file_prefix + algorithm + '.timings' # with open(algorithm_count_file, 'a') as f: with open(algorithm_count_file, 'w') as f: pass with open(algorithm_timings_file, 'w') as f: pass # Spawn processes pids = [] pid_count = 0 for i in range(1, settings.ANALYZER_PROCESSES + 1): if i > len(unique_metrics): logger.info('WARNING: skyline is set for more cores than needed.') break p = Process(target=self.spin_process, args=(i, unique_metrics)) pids.append(p) pid_count += 1 logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.ANALYZER_PROCESSES))) p.start() # Send wait signal to zombie processes # for p in pids: # p.join() # Self monitor processes and terminate if any spin_process has run # for longer than 180 seconds p_starts = time() while time() - p_starts <= 180: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(settings.ANALYZER_PROCESSES), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app)) for p in pids: p.terminate() # p.join() # Grab data from the queue and populate dictionaries exceptions = dict() anomaly_breakdown = dict() while 1: try: key, value = self.anomaly_breakdown_q.get_nowait() if key not in anomaly_breakdown.keys(): anomaly_breakdown[key] = value else: anomaly_breakdown[key] += value except Empty: break while 1: try: key, value = self.exceptions_q.get_nowait() if key not in exceptions.keys(): exceptions[key] = value else: exceptions[key] += value except Empty: break # Push to panorama # if len(self.panorama_anomalous_metrics) > 0: # logger.info('to do - push to panorama') # Push to crucible # if len(self.crucible_anomalous_metrics) > 0: # logger.info('to do - push to crucible') # Write anomalous_metrics to static webapp directory # Using count files rather that multiprocessing.Value to enable metrics for # metrics for algorithm run times, etc for algorithm in algorithms_to_time: algorithm_count_file = algorithm_tmp_file_prefix + algorithm + '.count' algorithm_timings_file = algorithm_tmp_file_prefix + algorithm + '.timings' try: algorithm_count_array = [] with open(algorithm_count_file, 'r') as f: for line in f: value_string = line.replace('\n', '') unquoted_value_string = value_string.replace("'", '') float_value = float(unquoted_value_string) algorithm_count_array.append(float_value) except: algorithm_count_array = False if not algorithm_count_array: continue number_of_times_algorithm_run = len(algorithm_count_array) logger.info( 'algorithm run count - %s run %s times' % ( algorithm, str(number_of_times_algorithm_run))) if number_of_times_algorithm_run == 0: continue try: algorithm_timings_array = [] with open(algorithm_timings_file, 'r') as f: for line in f: value_string = line.replace('\n', '') unquoted_value_string = value_string.replace("'", '') float_value = float(unquoted_value_string) algorithm_timings_array.append(float_value) except: algorithm_timings_array = False if not algorithm_timings_array: continue number_of_algorithm_timings = len(algorithm_timings_array) logger.info( 'algorithm timings count - %s has %s timings' % ( algorithm, str(number_of_algorithm_timings))) if number_of_algorithm_timings == 0: continue try: _sum_of_algorithm_timings = sum(algorithm_timings_array) except: logger.error("sum error: " + traceback.format_exc()) _sum_of_algorithm_timings = round(0.0, 6) logger.error('error - sum_of_algorithm_timings - %s' % (algorithm)) continue sum_of_algorithm_timings = round(_sum_of_algorithm_timings, 6) # logger.info('sum_of_algorithm_timings - %s - %.16f seconds' % (algorithm, sum_of_algorithm_timings)) try: _median_algorithm_timing = determine_median(algorithm_timings_array) except: _median_algorithm_timing = round(0.0, 6) logger.error('error - _median_algorithm_timing - %s' % (algorithm)) continue median_algorithm_timing = round(_median_algorithm_timing, 6) # logger.info('median_algorithm_timing - %s - %.16f seconds' % (algorithm, median_algorithm_timing)) logger.info( 'algorithm timing - %s - total: %.6f - median: %.6f' % ( algorithm, sum_of_algorithm_timings, median_algorithm_timing)) send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.times_run' self.send_graphite_metric(send_mertic_name, '%d' % number_of_algorithm_timings) send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.total_time' self.send_graphite_metric(send_mertic_name, '%.6f' % sum_of_algorithm_timings) send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.median_time' self.send_graphite_metric(send_mertic_name, '%.6f' % median_algorithm_timing) # Log progress logger.info('seconds to run :: %.2f' % (time() - now)) logger.info('total metrics :: %d' % len(unique_metrics)) logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(exceptions.values()))) logger.info('total anomalies :: %d' % len(self.anomalous_metrics)) logger.info('exception stats :: %s' % exceptions) logger.info('anomaly breakdown :: %s' % anomaly_breakdown) # Log to Graphite self.send_graphite_metric('run_time', '%.2f' % (time() - now)) self.send_graphite_metric('total_analyzed', '%.2f' % (len(unique_metrics) - sum(exceptions.values()))) self.send_graphite_metric('total_anomalies', '%d' % len(self.anomalous_metrics)) self.send_graphite_metric('total_metrics', '%d' % len(unique_metrics)) for key, value in exceptions.items(): send_metric = 'exceptions.%s' % key self.send_graphite_metric(send_metric, '%d' % value) for key, value in anomaly_breakdown.items(): send_metric = 'anomaly_breakdown.%s' % key self.send_graphite_metric(send_metric, '%d' % value) # Check canary metric raw_series = self.redis_conn.get(settings.FULL_NAMESPACE + settings.CANARY_METRIC) if raw_series is not None: unpacker = Unpacker(use_list=False) unpacker.feed(raw_series) timeseries = list(unpacker) time_human = (timeseries[-1][0] - timeseries[0][0]) / 3600 projected = 24 * (time() - now) / time_human logger.info('canary duration :: %.2f' % time_human) self.send_graphite_metric('duration', '%.2f' % time_human) self.send_graphite_metric('projected', '%.2f' % projected) # Reset counters self.anomalous_metrics[:] = [] # Sleep if it went too fast # if time() - now < 5: # logger.info('sleeping due to low run time...') # sleep(10) # @modified 20160504 - @earthgecko - development internal ref #1338, #1340) # Etsy's original if this was a value of 5 seconds which does # not make skyline Analyzer very efficient in terms of installations # where 100s of 1000s of metrics are being analyzed. This lead to # Analyzer running over several metrics multiple time in a minute # and always working. Therefore this was changed from if you took # less than 5 seconds to run only then sleep. This behaviour # resulted in Analyzer analysing a few 1000 metrics in 9 seconds and # then doing it again and again in a single minute. Therefore the # ANALYZER_OPTIMUM_RUN_DURATION setting was added to allow this to # self optimise in cases where skyline is NOT deployed to analyze # 100s of 1000s of metrics. This relates to optimising performance # for any deployments in the few 1000s and 60 second resolution # area, e.g. smaller and local deployments. process_runtime = time() - now analyzer_optimum_run_duration = settings.ANALYZER_OPTIMUM_RUN_DURATION if process_runtime < analyzer_optimum_run_duration: sleep_for = (analyzer_optimum_run_duration - process_runtime) # sleep_for = 60 logger.info('sleeping for %.2f seconds due to low run time...' % sleep_for) sleep(sleep_for)