Source code for analyzer_dev.analyzer_dev

from __future__ import division
import logging
try:
    from Queue import Empty
except:
    from queue import Empty
from redis import StrictRedis
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
from multiprocessing import Process, Manager, Queue
from msgpack import Unpacker, unpackb, packb
import os
from os import path, kill, getpid, system
from math import ceil
import traceback
import operator
import socket
import re
from sys import version_info
import os.path
import resource
from ast import literal_eval

import sys
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
import settings
from skyline_functions import (
    send_graphite_metric, write_data_to_file, send_anomalous_metric_to, mkdir_p,
    filesafe_metricname,
    # @added 20170602 - Feature #2034: analyse_derivatives
    nonNegativeDerivative, strictly_increasing_monotonicity, in_list,
    # @added 20200506 - Feature #3532: Sort all time series
    sort_timeseries)

from alerters import trigger_alert
from algorithms_dev import run_selected_algorithm
# from skyline import algorithm_exceptions
from algorithm_exceptions import TooShort, Stale, Boring

try:
    send_algorithm_run_metrics = settings.ENABLE_ALGORITHM_RUN_METRICS
except:
    send_algorithm_run_metrics = False
if send_algorithm_run_metrics:
    from algorithms_dev import determine_median

# TODO if settings.ENABLE_CRUCIBLE: and ENABLE_PANORAMA
#    from spectrum import push_to_crucible

skyline_app = 'analyzer_dev'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

python_version = int(version_info[0])

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)

LOCAL_DEBUG = True

[docs]class AnalyzerDev(Thread): """ The Analyzer class which controls the analyzer thread and spawned processes. """ def __init__(self, parent_pid): """ Initialize the Analyzer Create the :obj:`self.anomalous_metrics` list Create the :obj:`self.exceptions_q` queue Create the :obj:`self.anomaly_breakdown_q` queue """ super(Analyzer, self).__init__() # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() self.anomalous_metrics = Manager().list() self.exceptions_q = Queue() self.anomaly_breakdown_q = Queue() # @modified 20160813 - Bug #1558: Memory leak in Analyzer # Not used # self.mirage_metrics = Manager().list() # @added 20160923 - Branch #922: Ionosphere self.mirage_metrics = Manager().list() self.ionosphere_metrics = Manager().list() # @added 20161119 - Branch #922: ionosphere # Task #1718: review.tsfresh # Send a breakdown of what metrics were sent to other apps self.sent_to_mirage = Manager().list() self.sent_to_crucible = Manager().list() self.sent_to_panorama = Manager().list() self.sent_to_ionosphere = Manager().list() # @added 20161229 - Feature #1830: Ionosphere alerts self.all_anomalous_metrics = Manager().list() # @added 20170108 - Feature #1830: Ionosphere alerts # Adding lists of smtp_alerter_metrics and non_smtp_alerter_metrics self.smtp_alerter_metrics = Manager().list() self.non_smtp_alerter_metrics = Manager().list() # @added 20180903 - Feature #2580: illuminance # Feature #1986: flux self.illuminance_datapoints = Manager().list() # @added 20190408 - Feature #2882: Mirage - periodic_check self.mirage_periodic_check_metrics = Manager().list() self.real_anomalous_metrics = Manager().list()
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: exit(0)
[docs] def spawn_alerter_process(self, alert, metric, context): """ Spawn a process to trigger an alert. This is used by smtp alerters so that matplotlib objects are cleared down and the alerter cannot create a memory leak in this manner and plt.savefig keeps the object in memory until the process terminates. Seeing as data is being surfaced and processed in the alert_smtp context, multiprocessing the alert creation and handling prevents any memory leaks in the parent. Added 20160814 relating to: * Bug #1558: Memory leak in Analyzer * Issue #21 Memory leak in Analyzer see https://github.com/earthgecko/skyline/issues/21 Parameters as per :py:func:`skyline.analyzer.alerters.trigger_alert <analyzer.alerters.trigger_alert>` """ trigger_alert(alert, metric, context)
[docs] def spin_process(self, i, unique_metrics): """ Assign a bunch of metrics for a process to analyze. Multiple get the assigned_metrics to the process from Redis. For each metric: - unpack the `raw_timeseries` for the metric. - Analyse each timeseries against `ALGORITHMS` to determine if it is anomalous. - If anomalous add it to the :obj:`self.anomalous_metrics` list - Add what algorithms triggered to the :obj:`self.anomaly_breakdown_q` queue - If :mod:`settings.ENABLE_CRUCIBLE` is ``True``: - Add a crucible data file with the details about the timeseries and anomaly. - Write the timeseries to a json file for crucible. Add keys and values to the queue so the parent process can collate for:\n * :py:obj:`self.anomaly_breakdown_q` * :py:obj:`self.exceptions_q` """ spin_start = time() logger.info('spin_process started') if LOCAL_DEBUG: logger.info('debug :: Memory usage spin_process start: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) # TESTING removal of p.join() from p.terminate() # sleep(4) # @modified 20160801 - Adding additional exception handling to Analyzer # Check the unique_metrics list is valid try: len(unique_metrics) except: logger.error('error :: the unique_metrics list is not valid') logger.info(traceback.format_exc()) logger.info('nothing to do, no unique_metrics') return # Discover assigned metrics keys_per_processor = int(ceil(float(len(unique_metrics)) / float(settings.ANALYZER_PROCESSES))) if i == settings.ANALYZER_PROCESSES: assigned_max = len(unique_metrics) else: assigned_max = min(len(unique_metrics), i * keys_per_processor) # Fix analyzer worker metric assignment #94 # https://github.com/etsy/skyline/pull/94 @languitar:worker-fix assigned_min = (i - 1) * keys_per_processor assigned_keys = range(assigned_min, assigned_max) # assigned_keys = range(300, 310) # Compile assigned metrics assigned_metrics = [unique_metrics[index] for index in assigned_keys] if LOCAL_DEBUG: logger.info('debug :: Memory usage spin_process after assigned_metrics: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) # @added 20190410 - Feature #2916: ANALYZER_ENABLED setting if not ANALYZER_ENABLED: len_assigned_metrics = len(assigned_metrics) logger.info('ANALYZER_ENABLED is set to %s removing the %s assigned_metrics' % ( str(ANALYZER_ENABLED), str(len_assigned_metrics))) assigned_metrics = [] del unique_metrics # Check if this process is unnecessary if len(assigned_metrics) == 0: return # Multi get series # @modified 20160801 - Adding additional exception handling to Analyzer raw_assigned_failed = True try: raw_assigned = self.redis_conn.mget(assigned_metrics) raw_assigned_failed = False if LOCAL_DEBUG: logger.info('debug :: Memory usage spin_process after raw_assigned: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) except: logger.info(traceback.format_exc()) logger.error('error :: failed to get assigned_metrics from Redis') # Make process-specific dicts exceptions = defaultdict(int) anomaly_breakdown = defaultdict(int) # @added 20160803 - Adding additional exception handling to Analyzer if raw_assigned_failed: return # @added 20161119 - Branch #922: ionosphere # Task #1718: review.tsfresh # Determine the unique Mirage and Ionosphere metrics once, which are # used later to determine how Analyzer should handle/route anomalies try: mirage_unique_metrics = list(self.redis_conn.smembers('mirage.unique_metrics')) except: mirage_unique_metrics = [] # @added 20190408 - Feature #2882: Mirage - periodic_check # Add Mirage periodic checks so that Mirage is analysing each metric at # least once per hour. mirage_periodic_check_metric_list = [] try: mirage_periodic_check_enabled = settings.MIRAGE_PERIODIC_CHECK except: mirage_periodic_check_enabled = False try: mirage_periodic_check_interval = settings.MIRAGE_PERIODIC_CHECK_INTERVAL except: mirage_periodic_check_interval = 3600 mirage_periodic_check_interval_minutes = int(int(mirage_periodic_check_interval) / 60) if mirage_unique_metrics and mirage_periodic_check_enabled: mirage_unique_metrics_count = len(mirage_unique_metrics) # Mirage periodic checks are only done on declared namespaces as to # process all Mirage metrics periodically would probably create a # substantial load on Graphite and is probably not required only key # metrics should be analysed by Mirage periodically. periodic_check_mirage_metrics = [] try: mirage_periodic_check_namespaces = settings.MIRAGE_PERIODIC_CHECK_NAMESPACES except: mirage_periodic_check_namespaces = [] for namespace in mirage_periodic_check_namespaces: for metric_name in mirage_unique_metrics: metric_namespace_elements = metric_name.split('.') mirage_periodic_metric = False for periodic_namespace in mirage_periodic_check_namespaces: if not namespace in mirage_periodic_check_namespaces: continue periodic_namespace_namespace_elements = periodic_namespace.split('.') elements_matched = set(metric_namespace_elements) & set(periodic_namespace_namespace_elements) if len(elements_matched) == len(periodic_namespace_namespace_elements): mirage_periodic_metric = True break if mirage_periodic_metric: if not metric_name in periodic_check_mirage_metrics: periodic_check_mirage_metrics.append(metric_name) periodic_check_mirage_metrics_count = len(periodic_check_mirage_metrics) logger.info( 'there are %s known Mirage periodic metrics' % ( str(periodic_check_mirage_metrics_count))) for metric_name in periodic_check_mirage_metrics: try: self.redis_conn.sadd('new.mirage.periodic_check.metrics.all', metric_name) except Exception as e: logger.error('error :: could not add %s to Redis set new.mirage.periodic_check.metrics.all: %s' % ( metric_name, e)) try: self.redis_conn.rename('mirage.periodic_check.metrics.all', 'mirage.periodic_check.metrics.all.old') except: pass try: self.redis_conn.rename('new.mirage.periodic_check.metrics.all', 'mirage.periodic_check.metrics.all') except: pass try: self.redis_conn.delete('mirage.periodic_check.metrics.all.old') except: pass if periodic_check_mirage_metrics_count > mirage_periodic_check_interval_minutes: mirage_periodic_checks_per_minute = periodic_check_mirage_metrics_count / mirage_periodic_check_interval_minutes else: mirage_periodic_checks_per_minute = 1 logger.info( '%s Mirage periodic checks can be added' % ( str(int(mirage_periodic_checks_per_minute)))) for metric_name in periodic_check_mirage_metrics: if len(mirage_periodic_check_metric_list) == int(mirage_periodic_checks_per_minute): break base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1) mirage_periodic_check_cache_key = 'mirage.periodic_check.%s' % base_name mirage_periodic_check_key = False try: mirage_periodic_check_key = self.redis_conn.get(mirage_periodic_check_cache_key) except Exception as e: logger.error('error :: could not query Redis for cache_key: %s' % e) if not mirage_periodic_check_key: try: key_created_at = int(time()) self.redis_conn.setex( mirage_periodic_check_cache_key, mirage_periodic_check_interval, key_created_at) logger.info( 'created Mirage periodic_check Redis key - %s' % (mirage_periodic_check_cache_key)) mirage_periodic_check_metric_list.append(metric_name) try: self.redis_conn.sadd('new.mirage.periodic_check.metrics', metric_name) except Exception as e: logger.error('error :: could not add %s to Redis set new.mirage.periodic_check.metrics: %s' % ( metric_name, e)) except: logger.error(traceback.format_exc()) logger.error( 'error :: failed to create Mirage periodic_check Redis key - %s' % (mirage_periodic_check_cache_key)) try: self.redis_conn.rename('mirage.periodic_check.metrics', 'mirage.periodic_check.metrics.old') except: pass try: self.redis_conn.rename('new.mirage.periodic_check.metrics', 'mirage.periodic_check.metrics') except: pass try: self.redis_conn.delete('mirage.periodic_check.metrics.old') except: pass mirage_periodic_check_metric_list_count = len(mirage_periodic_check_metric_list) logger.info( '%s Mirage periodic checks were added' % ( str(mirage_periodic_check_metric_list_count))) try: ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics')) except: ionosphere_unique_metrics = [] # @added 20170602 - Feature #2034: analyse_derivatives # In order to convert monotonic, incrementing metrics to a deriative # metric try: derivative_metrics = list(self.redis_conn.smembers('derivative_metrics')) except: derivative_metrics = [] try: non_derivative_metrics = list(self.redis_conn.smembers('non_derivative_metrics')) except: non_derivative_metrics = [] # This is here to refresh the sets try: manage_derivative_metrics = self.redis_conn.get('analyzer.derivative_metrics_expiry') except Exception as e: if LOCAL_DEBUG: logger.error('error :: could not query Redis for analyzer.derivative_metrics_expiry key: %s' % str(e)) manage_derivative_metrics = False # @added 20170901 - Bug #2154: Infrequent missing new_ Redis keys # If the analyzer.derivative_metrics_expiry is going to expire in the # next 60 seconds, just manage the derivative_metrics in the run as # there is an overlap some times where the key existed at the start of # the run but has expired by the end of the run. derivative_metrics_expiry_ttl = False if manage_derivative_metrics: try: derivative_metrics_expiry_ttl = self.redis_conn.ttl('analyzer.derivative_metrics_expiry') logger.info('the analyzer.derivative_metrics_expiry key ttl is %s' % str(derivative_metrics_expiry_ttl)) except: logger.error('error :: could not query Redis for analyzer.derivative_metrics_expiry key: %s' % str(e)) if derivative_metrics_expiry_ttl: if int(derivative_metrics_expiry_ttl) < 60: logger.info('managing derivative_metrics as the analyzer.derivative_metrics_expiry key ttl is less than 60 with %s' % str(derivative_metrics_expiry_ttl)) manage_derivative_metrics = False try: self.redis_conn.delete('analyzer.derivative_metrics_expiry') logger.info('deleted the Redis key analyzer.derivative_metrics_expiry') except: logger.error('error :: failed to delete Redis key :: analyzer.derivative_metrics_expiry') try: non_derivative_monotonic_metrics = settings.NON_DERIVATIVE_MONOTONIC_METRICS except: non_derivative_monotonic_metrics = [] # @added 20180519 - Feature #2378: Add redis auth to Skyline and rebrow # Added Redis sets for Boring, TooShort and Stale redis_set_errors = 0 # Distill timeseries strings into lists for i, metric_name in enumerate(assigned_metrics): self.check_if_parent_is_alive() # logger.info('analysing %s' % metric_name) try: raw_series = raw_assigned[i] unpacker = Unpacker(use_list=False) unpacker.feed(raw_series) timeseries = list(unpacker) except: timeseries = [] # @added 20200507 - Feature #3532: Sort all time series # To ensure that there are no unordered timestamps in the time # series which are artefacts of the collector or carbon-relay, sort # all time series by timestamp before analysis. original_timeseries = timeseries if original_timeseries: timeseries = sort_timeseries(original_timeseries) del original_timeseries # @added 20170602 - Feature #2034: analyse_derivatives # In order to convert monotonic, incrementing metrics to a deriative # metric known_derivative_metric = False unknown_deriv_status = True if metric_name in non_derivative_metrics: unknown_deriv_status = False if unknown_deriv_status: if metric_name in derivative_metrics: known_derivative_metric = True unknown_deriv_status = False # This is here to refresh the sets if not manage_derivative_metrics: unknown_deriv_status = True base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1) # @added 20170617 - Bug #2050: analyse_derivatives - change in monotonicity # First check if it has its own Redis z.derivative_metric key # that has not expired derivative_metric_key = 'z.derivative_metric.%s' % str(base_name) if unknown_deriv_status: # @added 20170617 - Bug #2050: analyse_derivatives - change in monotonicity last_derivative_metric_key = False try: last_derivative_metric_key = self.redis_conn.get(derivative_metric_key) except Exception as e: logger.error('error :: could not query Redis for last_derivative_metric_key: %s' % e) # Determine if it is a strictly increasing monotonically metric # or has been in last FULL_DURATION via its z.derivative_metric # key if not last_derivative_metric_key: is_strictly_increasing_monotonically = strictly_increasing_monotonicity(timeseries) if is_strictly_increasing_monotonically: try: last_expire_set = int(time()) self.redis_conn.setex( derivative_metric_key, settings.FULL_DURATION, last_expire_set) except Exception as e: logger.error('error :: could not set Redis derivative_metric key: %s' % e) else: # Until the z.derivative_metric key expires, it is classed # as such is_strictly_increasing_monotonically = True skip_derivative = in_list(base_name, non_derivative_monotonic_metrics) if skip_derivative: is_strictly_increasing_monotonically = False if is_strictly_increasing_monotonically: known_derivative_metric = True try: self.redis_conn.sadd('derivative_metrics', metric_name) except: logger.info(traceback.format_exc()) logger.error('error :: failed to add metric to Redis derivative_metrics set') try: self.redis_conn.sadd('new_derivative_metrics', metric_name) except: logger.info(traceback.format_exc()) logger.error('error :: failed to add metric to Redis new_derivative_metrics set') else: try: self.redis_conn.sadd('non_derivative_metrics', metric_name) except: logger.info(traceback.format_exc()) logger.error('error :: failed to add metric to Redis non_derivative_metrics set') try: self.redis_conn.sadd('new_non_derivative_metrics', metric_name) except: logger.info(traceback.format_exc()) logger.error('error :: failed to add metric to Redis new_non_derivative_metrics set') if known_derivative_metric: try: derivative_timeseries = nonNegativeDerivative(timeseries) timeseries = derivative_timeseries except: logger.error('error :: nonNegativeDerivative failed') # @added 20180903 - Feature #2580: illuminance # Feature #1986: flux try: illuminance_datapoint = timeseries[-1][1] if '.illuminance' not in metric_name: self.illuminance_datapoints.append(illuminance_datapoint) except: pass try: anomalous, ensemble, datapoint = run_selected_algorithm(timeseries, metric_name) # @added 20190408 - Feature #2882: Mirage - periodic_check # Add for Mirage periodic - is really anomalous add to # real_anomalous_metrics and if in mirage_periodic_check_metric_list # add as anomalous if anomalous: # @modified 20190412 - Bug #2932: self.real_anomalous_metrics not being populated correctly # Feature #2882: Mirage - periodic_check # self.real_anomalous_metrics.append(base_name) base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1) metric_timestamp = timeseries[-1][0] metric = [datapoint, base_name, metric_timestamp] self.real_anomalous_metrics.append(metric) if metric_name in mirage_periodic_check_metric_list: self.mirage_periodic_check_metrics.append(base_name) anomalous = True # If it's anomalous, add it to list if anomalous: base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1) metric_timestamp = timeseries[-1][0] metric = [datapoint, base_name, metric_timestamp] self.anomalous_metrics.append(metric) # Get the anomaly breakdown - who returned True? triggered_algorithms = [] for index, value in enumerate(ensemble): if value: algorithm = settings.ALGORITHMS[index] anomaly_breakdown[algorithm] += 1 triggered_algorithms.append(algorithm) # It could have been deleted by the Roomba except TypeError: # logger.error('TypeError analysing %s' % metric_name) exceptions['DeletedByRoomba'] += 1 except TooShort: # logger.error('TooShort analysing %s' % metric_name) exceptions['TooShort'] += 1 except Stale: # logger.error('Stale analysing %s' % metric_name) exceptions['Stale'] += 1 except Boring: # logger.error('Boring analysing %s' % metric_name) exceptions['Boring'] += 1 except: # logger.error('Other analysing %s' % metric_name) exceptions['Other'] += 1 logger.info(traceback.format_exc()) # Add values to the queue so the parent process can collate for key, value in anomaly_breakdown.items(): self.anomaly_breakdown_q.put((key, value)) for key, value in exceptions.items(): self.exceptions_q.put((key, value)) spin_end = time() - spin_start logger.info('spin_process took %.2f seconds' % spin_end)
[docs] def run(self): """ Called when the process intializes. Determine if Redis is up and discover the number of `unique metrics`. Divide the `unique_metrics` between the number of `ANALYZER_PROCESSES` and assign each process a set of metrics to analyse for anomalies. Wait for the processes to finish. Process the Determine whether if any anomalous metrics require:\n * alerting on (and set `EXPIRATION_TIME` key in Redis for alert).\n * feeding to another module e.g. mirage. Populated the webapp json the anomalous_metrics details. Log the details about the run to the skyline log. Send skyline.analyzer metrics to `GRAPHITE_HOST`, """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) pass now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) pass else: logger.info('bin/%s.d log management done' % skyline_app) if not os.path.exists(settings.SKYLINE_TMP_DIR): if python_version == 2: # os.makedirs(settings.SKYLINE_TMP_DIR, 0750) os.makedirs(settings.SKYLINE_TMP_DIR, mode=0o755) if python_version == 3: os.makedirs(settings.SKYLINE_TMP_DIR, mode=0o750) # Initiate the algorithm timings if Analyzer is configured to send the # algorithm_breakdown metrics with ENABLE_ALGORITHM_RUN_METRICS algorithm_tmp_file_prefix = settings.SKYLINE_TMP_DIR + '/' + skyline_app + '.' algorithms_to_time = [] if send_algorithm_run_metrics: algorithms_to_time = settings.ALGORITHMS while 1: now = time() # Make sure Redis is up try: self.redis_conn.ping() except: logger.error('skyline can\'t connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(10) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) continue # Report app up self.redis_conn.setex(skyline_app, 120, now) # Discover unique metrics unique_metrics = list(self.redis_conn.smembers(settings.FULL_NAMESPACE + 'unique_metrics')) if len(unique_metrics) == 0: logger.info('no metrics in redis. try adding some - see README') sleep(10) continue # Using count files rather that multiprocessing.Value to enable metrics for # metrics for algorithm run times, etc for algorithm in algorithms_to_time: algorithm_count_file = algorithm_tmp_file_prefix + algorithm + '.count' algorithm_timings_file = algorithm_tmp_file_prefix + algorithm + '.timings' # with open(algorithm_count_file, 'a') as f: with open(algorithm_count_file, 'w') as f: pass with open(algorithm_timings_file, 'w') as f: pass # Spawn processes pids = [] pid_count = 0 for i in range(1, settings.ANALYZER_PROCESSES + 1): if i > len(unique_metrics): logger.info('WARNING: skyline is set for more cores than needed.') break p = Process(target=self.spin_process, args=(i, unique_metrics)) pids.append(p) pid_count += 1 logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.ANALYZER_PROCESSES))) p.start() # Send wait signal to zombie processes # for p in pids: # p.join() # Self monitor processes and terminate if any spin_process has run # for longer than 180 seconds p_starts = time() while time() - p_starts <= 180: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(settings.ANALYZER_PROCESSES), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app)) for p in pids: p.terminate() # p.join() # Grab data from the queue and populate dictionaries exceptions = dict() anomaly_breakdown = dict() while 1: try: key, value = self.anomaly_breakdown_q.get_nowait() if key not in anomaly_breakdown.keys(): anomaly_breakdown[key] = value else: anomaly_breakdown[key] += value except Empty: break while 1: try: key, value = self.exceptions_q.get_nowait() if key not in exceptions.keys(): exceptions[key] = value else: exceptions[key] += value except Empty: break # Push to panorama # if len(self.panorama_anomalous_metrics) > 0: # logger.info('to do - push to panorama') # Push to crucible # if len(self.crucible_anomalous_metrics) > 0: # logger.info('to do - push to crucible') # Write anomalous_metrics to static webapp directory # Using count files rather that multiprocessing.Value to enable metrics for # metrics for algorithm run times, etc for algorithm in algorithms_to_time: algorithm_count_file = algorithm_tmp_file_prefix + algorithm + '.count' algorithm_timings_file = algorithm_tmp_file_prefix + algorithm + '.timings' try: algorithm_count_array = [] with open(algorithm_count_file, 'r') as f: for line in f: value_string = line.replace('\n', '') unquoted_value_string = value_string.replace("'", '') float_value = float(unquoted_value_string) algorithm_count_array.append(float_value) except: algorithm_count_array = False if not algorithm_count_array: continue number_of_times_algorithm_run = len(algorithm_count_array) logger.info( 'algorithm run count - %s run %s times' % ( algorithm, str(number_of_times_algorithm_run))) if number_of_times_algorithm_run == 0: continue try: algorithm_timings_array = [] with open(algorithm_timings_file, 'r') as f: for line in f: value_string = line.replace('\n', '') unquoted_value_string = value_string.replace("'", '') float_value = float(unquoted_value_string) algorithm_timings_array.append(float_value) except: algorithm_timings_array = False if not algorithm_timings_array: continue number_of_algorithm_timings = len(algorithm_timings_array) logger.info( 'algorithm timings count - %s has %s timings' % ( algorithm, str(number_of_algorithm_timings))) if number_of_algorithm_timings == 0: continue try: _sum_of_algorithm_timings = sum(algorithm_timings_array) except: logger.error("sum error: " + traceback.format_exc()) _sum_of_algorithm_timings = round(0.0, 6) logger.error('error - sum_of_algorithm_timings - %s' % (algorithm)) continue sum_of_algorithm_timings = round(_sum_of_algorithm_timings, 6) # logger.info('sum_of_algorithm_timings - %s - %.16f seconds' % (algorithm, sum_of_algorithm_timings)) try: _median_algorithm_timing = determine_median(algorithm_timings_array) except: _median_algorithm_timing = round(0.0, 6) logger.error('error - _median_algorithm_timing - %s' % (algorithm)) continue median_algorithm_timing = round(_median_algorithm_timing, 6) # logger.info('median_algorithm_timing - %s - %.16f seconds' % (algorithm, median_algorithm_timing)) logger.info( 'algorithm timing - %s - total: %.6f - median: %.6f' % ( algorithm, sum_of_algorithm_timings, median_algorithm_timing)) send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.times_run' self.send_graphite_metric(send_mertic_name, '%d' % number_of_algorithm_timings) send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.total_time' self.send_graphite_metric(send_mertic_name, '%.6f' % sum_of_algorithm_timings) send_mertic_name = 'algorithm_breakdown.' + algorithm + '.timing.median_time' self.send_graphite_metric(send_mertic_name, '%.6f' % median_algorithm_timing) # Log progress logger.info('seconds to run :: %.2f' % (time() - now)) logger.info('total metrics :: %d' % len(unique_metrics)) logger.info('total analyzed :: %d' % (len(unique_metrics) - sum(exceptions.values()))) logger.info('total anomalies :: %d' % len(self.anomalous_metrics)) logger.info('exception stats :: %s' % exceptions) logger.info('anomaly breakdown :: %s' % anomaly_breakdown) # Log to Graphite self.send_graphite_metric('run_time', '%.2f' % (time() - now)) self.send_graphite_metric('total_analyzed', '%.2f' % (len(unique_metrics) - sum(exceptions.values()))) self.send_graphite_metric('total_anomalies', '%d' % len(self.anomalous_metrics)) self.send_graphite_metric('total_metrics', '%d' % len(unique_metrics)) for key, value in exceptions.items(): send_metric = 'exceptions.%s' % key self.send_graphite_metric(send_metric, '%d' % value) for key, value in anomaly_breakdown.items(): send_metric = 'anomaly_breakdown.%s' % key self.send_graphite_metric(send_metric, '%d' % value) # Check canary metric raw_series = self.redis_conn.get(settings.FULL_NAMESPACE + settings.CANARY_METRIC) if raw_series is not None: unpacker = Unpacker(use_list=False) unpacker.feed(raw_series) timeseries = list(unpacker) # @added 20200507 - Feature #3532: Sort all time series # To ensure that there are no unordered timestamps in the time # series which are artefacts of the collector or carbon-relay, sort # all time series by timestamp before analysis. original_timeseries = timeseries if original_timeseries: timeseries = sort_timeseries(original_timeseries) del original_timeseries time_human = (timeseries[-1][0] - timeseries[0][0]) / 3600 projected = 24 * (time() - now) / time_human logger.info('canary duration :: %.2f' % time_human) self.send_graphite_metric('duration', '%.2f' % time_human) self.send_graphite_metric('projected', '%.2f' % projected) # Reset counters self.anomalous_metrics[:] = [] # Sleep if it went too fast # if time() - now < 5: # logger.info('sleeping due to low run time...') # sleep(10) # @modified 20160504 - @earthgecko - development internal ref #1338, #1340) # Etsy's original if this was a value of 5 seconds which does # not make skyline Analyzer very efficient in terms of installations # where 100s of 1000s of metrics are being analyzed. This lead to # Analyzer running over several metrics multiple time in a minute # and always working. Therefore this was changed from if you took # less than 5 seconds to run only then sleep. This behaviour # resulted in Analyzer analysing a few 1000 metrics in 9 seconds and # then doing it again and again in a single minute. Therefore the # ANALYZER_OPTIMUM_RUN_DURATION setting was added to allow this to # self optimise in cases where skyline is NOT deployed to analyze # 100s of 1000s of metrics. This relates to optimising performance # for any deployments in the few 1000s and 60 second resolution # area, e.g. smaller and local deployments. process_runtime = time() - now analyzer_optimum_run_duration = settings.ANALYZER_OPTIMUM_RUN_DURATION if process_runtime < analyzer_optimum_run_duration: sleep_for = (analyzer_optimum_run_duration - process_runtime) # sleep_for = 60 logger.info('sleeping for %.2f seconds due to low run time...' % sleep_for) sleep(sleep_for)