Source code for mirage.mirage

import logging
try:
    from Queue import Empty
except:
    from queue import Empty
from redis import StrictRedis
from time import time, sleep
from threading import Thread
from collections import defaultdict
from multiprocessing import Process, Manager, Queue
from msgpack import packb
from os import kill, getpid
import traceback
import re
# imports required for surfacing graphite JSON formatted timeseries for use in
# Mirage
import json
import sys
import requests
try:
    import urlparse
except ImportError:
    import urllib.parse
import os
# import errno
# import imp
from os import listdir
import datetime

import os.path
import resource
from shutil import rmtree
from ast import literal_eval

import settings
# @modified 20160922 - Branch #922: Ionosphere
# Added the send_anomalous_metric_to skyline_functions.py
from skyline_functions import (
    write_data_to_file, fail_check, send_anomalous_metric_to,
    mkdir_p, send_graphite_metric, filesafe_metricname,
    # @added 20170603 - Feature #2034: analyse_derivatives
    nonNegativeDerivative, in_list)

from mirage_alerters import trigger_alert
from negaters import trigger_negater
from mirage_algorithms import run_selected_algorithm
from algorithm_exceptions import TooShort, Stale, Boring
from os.path import join, isfile

"""
ENABLE_MEMORY_PROFILING - DEVELOPMENT ONLY

# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Added all the memory profiling blocks - mem_top, pympler, objgraph, gc
Garbage collection et al, should not be run in anything but development model,
therefore these variables are hard coded and not accessible via settings.py,
if you are in here reading this then knock yourself out.  gc and dump_garbage
can be useful for getting an idea about what all the objects in play are, but
garbage collection will just take longer and longer to run.

"""
LOCAL_DEBUG = False
ENABLE_MEMORY_PROFILING = False
garbage_collection_enabled = False

if ENABLE_MEMORY_PROFILING:
    # @added 20160806 - Bug #1558: Memory leak in Analyzer
    # As per http://stackoverflow.com/a/1641280
    # This got useable understandable data
    if garbage_collection_enabled:
        from gc import get_objects
        # Debug with garbage collection - http://code.activestate.com/recipes/65333/
        import gc

skyline_app = 'mirage'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

python_version = int(sys.version_info[0])

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
failed_checks_dir = '%s_failed' % settings.MIRAGE_CHECK_PATH


[docs]class Mirage(Thread): def __init__(self, parent_pid): """ Initialize the Mirage """ super(Mirage, self).__init__() self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() self.anomalous_metrics = Manager().list() self.mirage_exceptions_q = Queue() self.mirage_anomaly_breakdown_q = Queue() self.not_anomalous_metrics = Manager().list() self.metric_variables = Manager().list() self.ionosphere_metrics = Manager().list() self.sent_to_crucible = Manager().list() self.sent_to_panorama = Manager().list() self.sent_to_ionosphere = Manager().list() # @added 20170603 - Feature #2034: analyse_derivatives # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis( password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis( unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: exit(0)
[docs] def spawn_alerter_process(self, alert, metric, second_order_resolution_seconds, context): """ Spawn a process to trigger an alert. This is used by smtp alerters so that matplotlib objects are cleared down and the alerter cannot create a memory leak in this manner and plt.savefig keeps the object in memory until the process terminates. Seeing as data is being surfaced and processed in the alert_smtp context, multiprocessing the alert creation and handling prevents any memory leaks in the parent. # @added 20160814 - Bug #1558: Memory leak in Analyzer # Issue #21 Memory leak in Analyzer # https://github.com/earthgecko/skyline/issues/21 """ trigger_alert(alert, metric, second_order_resolution_seconds, context)
[docs] def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until): # @added 20160803 - Unescaped Graphite target - https://github.com/earthgecko/skyline/issues/20 # bug1546: Unescaped Graphite target new_metric_namespace = metric_name.replace(':', '\:') metric_namespace = new_metric_namespace.replace('(', '\(') metric_name = metric_namespace.replace(')', '\)') # We use absolute time so that if there is a lag in mirage the correct # timeseries data is still surfaced relevant to the anomalous datapoint # timestamp if settings.GRAPHITE_PORT != '': url = '%s://%s:%s/render/?from=%s&until=%s&target=%s&format=json' % ( settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST, str(settings.GRAPHITE_PORT), graphite_from, graphite_until, metric_name) else: url = '%s://%s/render/?from=%s&until=%s&target=%s&format=json' % ( settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST, graphite_from, graphite_until, metric_name) r = requests.get(url) js = r.json() datapoints = js[0]['datapoints'] converted = [] for datapoint in datapoints: try: new_datapoint = [float(datapoint[1]), float(datapoint[0])] converted.append(new_datapoint) # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests except: # nosec continue parsed = urlparse.urlparse(url) target = urlparse.parse_qs(parsed.query)['target'][0] metric_data_folder = settings.MIRAGE_DATA_FOLDER + "/" + target mkdir_p(metric_data_folder) with open(metric_data_folder + "/" + target + '.json', 'w') as f: f.write(json.dumps(converted)) f.close() return True return False
# @added 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Bug #1460: panorama check file fails # Panorama check file fails #24 # Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def mirage_load_metric_vars(self, metric_vars_file): """ Load the metric variables for a check from a metric check variables file :param metric_vars_file: the path and filename to the metric variables files :type metric_vars_file: str :return: the metric_vars list or ``False`` :rtype: list """ if os.path.isfile(metric_vars_file): logger.info( 'loading metric variables from metric_check_file - %s' % ( str(metric_vars_file))) else: logger.error( 'error :: loading metric variables from metric_check_file - file not found - %s' % ( str(metric_vars_file))) return False metric_vars = [] with open(metric_vars_file) as f: for line in f: no_new_line = line.replace('\n', '') no_equal_line = no_new_line.replace(' = ', ',') array = str(no_equal_line.split(',', 1)) add_line = literal_eval(array) metric_vars.append(add_line) string_keys = ['metric'] float_keys = ['value'] int_keys = ['hours_to_resolve', 'metric_timestamp'] metric_vars_array = [] for var_array in metric_vars: # @modified 20181023 - Feature #2618: alert_slack # Wrapped in try except for debugging issue where the # hours_to_resolve was interpolating to hours_to_resolve = "t" try: key = None value = None if var_array[0] in string_keys: key = var_array[0] _value_str = str(var_array[1]).replace("'", '') value_str = str(_value_str).replace('"', '') value = str(value_str) if var_array[0] == 'metric': metric = value if var_array[0] in float_keys: key = var_array[0] _value_str = str(var_array[1]).replace("'", '') value_str = str(_value_str).replace('"', '') value = float(value_str) if var_array[0] in int_keys: key = var_array[0] _value_str = str(var_array[1]).replace("'", '') value_str = str(_value_str).replace('"', '') value = int(float(value_str)) if key: metric_vars_array.append([key, value]) if len(metric_vars_array) == 0: logger.error( 'error :: loading metric variables - none found' % ( str(metric_vars_file))) return False except: logger.info(traceback.format_exc()) logger.error('error :: failed to load metric variables from check file - %s' % (metric_vars_file)) return False logger.info('debug :: metric_vars for %s' % str(metric)) logger.info('debug :: %s' % str(metric_vars_array)) return metric_vars_array
[docs] def dump_garbage(self): """ DEVELOPMENT ONLY # @added 20160806 - Bug #1558: Memory leak in Analyzer # Debug with garbage collection - http://code.activestate.com/recipes/65333/ show us what's the garbage about """ if ENABLE_MEMORY_PROFILING and garbage_collection_enabled: # force collection if settings.ENABLE_DEBUG or LOCAL_DEBUG: logger.info('debug :: GARBAGE') try: gc.collect() gc_collect_ok = True except: logger.error('error :: gc.collect failed') logger.info(traceback.format_exc()) gc_collect_ok = False if gc_collect_ok: if settings.ENABLE_DEBUG or LOCAL_DEBUG: logger.info('debug :: GARBAGE OBJECTS') for x in gc.garbage: s = str(x) if len(s) > 80: s = s[:80] # print type(x), "\n ", s try: log_string = type(x), "\n ", s log_string = 'unused variable for testing only' except: logger.info(traceback.format_exc()) logger.error('error :: print x and s') if settings.ENABLE_DEBUG or LOCAL_DEBUG: logger.info(log_string) else: return None
[docs] def spin_process(self, i, run_timestamp): """ Assign a metric for a process to analyze. """ # Discover metric to analyze metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))] # Check if this process is unnecessary if len(metric_var_files) == 0: return metric_var_files_sorted = sorted(metric_var_files) metric_check_file = '%s/%s' % ( settings.MIRAGE_CHECK_PATH, str(metric_var_files_sorted[0])) check_file_name = os.path.basename(str(metric_check_file)) check_file_timestamp = check_file_name.split('.', 1)[0] check_file_metricname_txt = check_file_name.split('.', 1)[1] check_file_metricname = check_file_metricname_txt.replace('.txt', '') check_file_metricname_dir = check_file_metricname.replace('.', '/') metric_failed_check_dir = '%s/%s/%s' % (failed_checks_dir, check_file_metricname_dir, check_file_timestamp) # Load metric variables # @modified 20160822 - Bug #1460: panorama check file fails # Changed to panorama style skyline_functions load_metric_vars # self.load_metric_vars(metric_check_file) # Load and validate metric variables try: # @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Bug #1460: panorama check file fails # Panorama check file fails #24 # Get rid of the skyline_functions imp as imp is deprecated in py3 anyway # metric_vars = load_metric_vars(skyline_app, str(metric_check_file)) metric_vars_array = self.mirage_load_metric_vars(str(metric_check_file)) except: logger.info(traceback.format_exc()) logger.error('error :: failed to load metric variables from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # Test metric variables # if len(metric_vars.metric) == 0: # logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file)) # return # else: # metric = metric_vars.metric # metric_name = ['metric_name', metric_vars.metric] # self.metric_variables.append(metric_name) # logger.info('debug :: added metric_name %s from check file - %s' % (metric_name, metric_check_file)) metric = None try: key = 'metric' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric = str(value_list[0]) metric_name = ['metric_name', metric] self.metric_variables.append(metric_name) logger.info('debug :: added metric_name %s from check file - %s' % (metric_name, metric_check_file)) except: logger.info(traceback.format_exc()) logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file)) return if not metric: logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file)) return # if len(metric_vars.value) == 0: # return # else: # metric_value = ['metric_value', metric_vars.value] # self.metric_variables.append(metric_value) value = None try: key = 'value' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] value = float(value_list[0]) metric_value = ['metric_value', value] self.metric_variables.append(metric_value) except: logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file)) return if not value: # @modified 20181119 - Bug #2708: Failing to load metric vars if value == 0.0: pass else: logger.error('error :: failed to load value variable from check file - %s' % (metric_check_file)) return # if len(metric_vars.hours_to_resolve) == 0: # return # else: # hours_to_resolve = ['hours_to_resolve', metric_vars.hours_to_resolve] # self.metric_variables.append(hours_to_resolve) hours_to_resolve = None try: key = 'hours_to_resolve' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] hours_to_resolve = int(value_list[0]) hours_to_resolve_list = ['hours_to_resolve', hours_to_resolve] self.metric_variables.append(hours_to_resolve_list) except: logger.error('error :: failed to read hours_to_resolve variable from check file - %s' % (metric_check_file)) return if not hours_to_resolve: logger.error('error :: failed to load hours_to_resolve variable from check file - %s' % (metric_check_file)) return # if len(metric_vars.metric_timestamp) == 0: # return # else: # metric_timestamp = ['metric_timestamp', metric_vars.metric_timestamp] # self.metric_variables.append(metric_timestamp) metric_timestamp = None try: key = 'metric_timestamp' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric_timestamp = int(value_list[0]) metric_timestamp_list = ['metric_timestamp', metric_timestamp] self.metric_variables.append(metric_timestamp_list) except: logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file)) return if not metric_timestamp: logger.error('error :: failed to load metric_timestamp variable from check file - %s' % (metric_check_file)) return metric_data_dir = '%s/%s' % (settings.MIRAGE_DATA_FOLDER, str(metric)) # Ignore any metric check with a timestamp greater than MIRAGE_STALE_SECONDS int_metric_timestamp = int(metric_timestamp) int_run_timestamp = int(run_timestamp) metric_timestamp_age = int_run_timestamp - int_metric_timestamp if metric_timestamp_age > settings.MIRAGE_STALE_SECONDS: logger.info('stale check :: %s check request is %s seconds old - discarding' % (metric, str(metric_timestamp_age))) # Remove metric check file # try: # os.remove(metric_check_file) # except OSError: # pass # return if os.path.isfile(metric_check_file): os.remove(metric_check_file) logger.info('removed check file - %s' % (metric_check_file)) else: logger.info('could not remove check file - %s' % (metric_check_file)) # Remove the metric directory if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('removed data dir - %s' % metric_data_dir) except: logger.error('error :: failed to rmtree - %s' % metric_data_dir) return # Calculate hours second order resolution to seconds second_order_resolution_seconds = int(hours_to_resolve) * 3600 # Calculate graphite from and until parameters from the metric timestamp graphite_until = datetime.datetime.fromtimestamp(int(float(metric_timestamp))).strftime('%H:%M_%Y%m%d') int_second_order_resolution_seconds = int(float(second_order_resolution_seconds)) second_resolution_timestamp = int_metric_timestamp - int_second_order_resolution_seconds graphite_from = datetime.datetime.fromtimestamp(int(second_resolution_timestamp)).strftime('%H:%M_%Y%m%d') # Remove any old json file related to the metric metric_json_file = '%s/%s.json' % (metric_data_dir, str(metric)) try: os.remove(metric_json_file) except OSError: pass # Get data from graphite logger.info( 'retrieve data :: surfacing %s timeseries from graphite for %s seconds' % ( metric, second_order_resolution_seconds)) self.surface_graphite_metric_data(metric, graphite_from, graphite_until) # Check there is a json timeseries file to test if not os.path.isfile(metric_json_file): logger.error( 'error :: retrieve failed - failed to surface %s timeseries from graphite' % ( metric)) # Remove metric check file try: os.remove(metric_check_file) except OSError: pass # Remove the metric directory try: rmtree(metric_data_dir) logger.info('removed data dir - %s' % metric_data_dir) except: logger.error('error :: failed to rmtree %s' % metric_data_dir) return else: logger.info('retrieved data :: for %s at %s seconds' % ( metric, second_order_resolution_seconds)) # Make process-specific dicts exceptions = defaultdict(int) anomaly_breakdown = defaultdict(int) self.check_if_parent_is_alive() with open((metric_json_file), 'r') as f: timeseries = json.loads(f.read()) logger.info('data points surfaced :: %s' % (len(timeseries))) # @added 20170212 - Feature #1886: Ionosphere learn # Only process if the metric has sufficient data first_timestamp = None try: first_timestamp = int(timeseries[0][0]) except: logger.error('error :: could not determine first timestamp') timestamp_now = int(time()) valid_if_before_timestamp = timestamp_now - int(settings.FULL_DURATION) valid_mirage_timeseries = True if first_timestamp: if first_timestamp > valid_if_before_timestamp: valid_mirage_timeseries = False # @added 20170603 - Feature #2034: analyse_derivatives # Convert the values of metrics strictly increasing monotonically # to their deriative products known_derivative_metric = False try: derivative_metrics = list(self.redis_conn.smembers('derivative_metrics')) except: derivative_metrics = [] redis_metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric)) if redis_metric_name in derivative_metrics: known_derivative_metric = True if known_derivative_metric: try: non_derivative_monotonic_metrics = settings.NON_DERIVATIVE_MONOTONIC_METRICS except: non_derivative_monotonic_metrics = [] skip_derivative = in_list(redis_metric_name, non_derivative_monotonic_metrics) if skip_derivative: known_derivative_metric = False if known_derivative_metric: try: derivative_timeseries = nonNegativeDerivative(timeseries) timeseries = derivative_timeseries except: logger.error('error :: nonNegativeDerivative failed') try: if valid_mirage_timeseries: logger.info('analyzing :: %s at %s seconds' % (metric, second_order_resolution_seconds)) anomalous, ensemble, datapoint = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds) else: logger.info('not analyzing :: %s at %s seconds as there is not sufficiently older datapoints in the timeseries - not valid_mirage_timeseries' % (metric, second_order_resolution_seconds)) anomalous = False datapoint = timeseries[-1][1] # It could have been deleted by the Roomba except TypeError: exceptions['DeletedByRoomba'] += 1 logger.info('exceptions :: DeletedByRoomba') except TooShort: exceptions['TooShort'] += 1 logger.info('exceptions :: TooShort') except Stale: exceptions['Stale'] += 1 logger.info('exceptions :: Stale') except Boring: exceptions['Boring'] += 1 logger.info('exceptions :: Boring') except: exceptions['Other'] += 1 logger.info('exceptions :: Other') logger.info(traceback.format_exc()) if not anomalous: base_name = metric.replace(settings.FULL_NAMESPACE, '', 1) not_anomalous_metric = [datapoint, base_name] self.not_anomalous_metrics.append(not_anomalous_metric) logger.info('not anomalous :: %s with %s' % (metric, value)) # If it's anomalous, add it to list if anomalous: base_name = metric.replace(settings.FULL_NAMESPACE, '', 1) # metric_timestamp = int(timeseries[-1][0]) metric_timestamp = int_metric_timestamp anomalous_metric = [datapoint, base_name, metric_timestamp] self.anomalous_metrics.append(anomalous_metric) logger.info('anomaly detected :: %s with %s' % (metric, str(value))) # It runs so fast, this allows us to process 30 anomalies/min sleep(2) # Get the anomaly breakdown - who returned True? triggered_algorithms = [] for index, value in enumerate(ensemble): if value: algorithm = settings.MIRAGE_ALGORITHMS[index] anomaly_breakdown[algorithm] += 1 triggered_algorithms.append(algorithm) # @added 20170206 - Bug #1904: Handle non filesystem friendly metric names in check files sane_metricname = filesafe_metricname(str(base_name)) # If Crucible or Panorama are enabled determine details determine_anomaly_details = False if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED: determine_anomaly_details = True if settings.PANORAMA_ENABLED: determine_anomaly_details = True # If Ionosphere is enabled determine details try: ionosphere_enabled = settings.IONOSPHERE_ENABLED if settings.IONOSPHERE_ENABLED: determine_anomaly_details = True except: ionosphere_enabled = False if determine_anomaly_details: # metric_timestamp = str(int(timeseries[-1][0])) from_timestamp = str(int(timeseries[1][0])) timeseries_dir = base_name.replace('.', '/') cache_key = 'mirage.last_alert.smtp.%s' % (base_name) last_alert = False try: last_alert = self.redis_conn.get(cache_key) except Exception as e: logger.error('error :: could not query Redis for cache_key: %s' % str(e)) # @added 20170308 - Feature #1960: ionosphere_layers # Allow Ionosphere to send Panorama checks, it is an ionosphere_metric try: ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics')) except: ionosphere_unique_metrics = [] added_at = str(int(time())) # If Panorama is enabled - create a Panorama check # @modified 20170308 - Feature #1960: ionosphere_layers # Allow Ionosphere to send Panorama checks for ionosphere_metrics # if settings.PANORAMA_ENABLED: send_to_panorama = False redis_metric_name = '%s%s' % (str(settings.FULL_NAMESPACE), str(base_name)) if settings.PANORAMA_ENABLED: send_to_panorama = True if redis_metric_name in ionosphere_unique_metrics: send_to_panorama = False if send_to_panorama: if not os.path.exists(settings.PANORAMA_CHECK_PATH): mkdir_p(settings.PANORAMA_CHECK_PATH) # Note: # The values are enclosed is single quoted intentionally # as the imp.load_source used results in a shift in the # decimal position when double quoted, e.g. # value = "5622.0" gets imported as # 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2 # single quoting results in the desired, # 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0 source = 'graphite' panaroma_anomaly_data = 'metric = \'%s\'\n' \ 'value = \'%s\'\n' \ 'from_timestamp = \'%s\'\n' \ 'metric_timestamp = \'%s\'\n' \ 'algorithms = %s\n' \ 'triggered_algorithms = %s\n' \ 'app = \'%s\'\n' \ 'source = \'%s\'\n' \ 'added_by = \'%s\'\n' \ 'added_at = \'%s\'\n' \ % (base_name, str(datapoint), from_timestamp, str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS), triggered_algorithms, skyline_app, source, this_host, added_at) # Create an anomaly file with details about the anomaly panaroma_anomaly_file = '%s/%s.%s.txt' % ( settings.PANORAMA_CHECK_PATH, added_at, sane_metricname) try: write_data_to_file( skyline_app, panaroma_anomaly_file, 'w', panaroma_anomaly_data) logger.info('added panorama anomaly file :: %s' % (panaroma_anomaly_file)) self.sent_to_panorama.append(base_name) except: logger.error('error :: failed to add panorama anomaly file :: %s' % (panaroma_anomaly_file)) logger.info(traceback.format_exc()) # If crucible is enabled - save timeseries and create a # crucible check if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED: from_timestamp = str(int(timeseries[1][0])) timeseries_dir = base_name.replace('.', '/') crucible_anomaly_dir = settings.CRUCIBLE_DATA_FOLDER + '/' + timeseries_dir + '/' + metric_timestamp if not os.path.exists(crucible_anomaly_dir): mkdir_p(crucible_anomaly_dir) # Note: # The value is enclosed is single quoted intentionally # as the imp.load_source used in crucible results in a # shift in the decimal position when double quoted, e.g. # value = "5622.0" gets imported as # 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2 # single quoting results in the desired, # 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0 crucible_anomaly_data = 'metric = \'%s\'\n' \ 'value = \'%s\'\n' \ 'from_timestamp = \'%s\'\n' \ 'metric_timestamp = \'%s\'\n' \ 'algorithms = %s\n' \ 'triggered_algorithms = %s\n' \ 'anomaly_dir = \'%s\'\n' \ 'graphite_metric = True\n' \ 'run_crucible_tests = False\n' \ 'added_by = \'%s\'\n' \ 'added_at = \'%s\'\n' \ % (base_name, str(datapoint), from_timestamp, str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS), triggered_algorithms, crucible_anomaly_dir, skyline_app, added_at) # Create an anomaly file with details about the anomaly crucible_anomaly_file = '%s/%s.txt' % (crucible_anomaly_dir, sane_metricname) try: write_data_to_file( skyline_app, crucible_anomaly_file, 'w', crucible_anomaly_data) logger.info('added crucible anomaly file :: %s' % (crucible_anomaly_file)) self.sent_to_crucible.append(base_name) except: logger.error('error :: failed to add crucible anomaly file :: %s' % (crucible_anomaly_file)) logger.info(traceback.format_exc()) # Create timeseries json file with the timeseries json_file = '%s/%s.json' % (crucible_anomaly_dir, base_name) timeseries_json = str(timeseries).replace('[', '(').replace(']', ')') try: write_data_to_file(skyline_app, json_file, 'w', timeseries_json) logger.info('added crucible timeseries file :: %s' % (json_file)) except: logger.error('error :: failed to add crucible timeseries file :: %s' % (json_file)) logger.info(traceback.format_exc()) # Create a crucible check file crucible_check_file = '%s/%s.%s.txt' % (settings.CRUCIBLE_CHECK_PATH, metric_timestamp, sane_metricname) try: write_data_to_file( skyline_app, crucible_check_file, 'w', crucible_anomaly_data) logger.info('added crucible check :: %s,%s' % (base_name, metric_timestamp)) except: logger.error('error :: failed to add crucible check file :: %s' % (crucible_check_file)) logger.info(traceback.format_exc()) # @added 20160922 - Branch #922: Ionosphere # Also added the send_anomalous_metric_to skyline_functions.py # function if ionosphere_enabled: if not last_alert: # @modified 20161228 Feature #1830: Ionosphere alerts # Added full_duration which needs to be recorded to allow Mirage metrics # to be profiled on Redis timeseries data at FULL_DURATION # e.g. mirage.redis.24h.json full_duration = str(second_order_resolution_seconds) # @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Added ionosphere_parent_id, always zero from Analyzer and Mirage ionosphere_parent_id = 0 send_anomalous_metric_to( skyline_app, 'ionosphere', timeseries_dir, str(int_metric_timestamp), base_name, str(datapoint), from_timestamp, triggered_algorithms, timeseries, full_duration, str(ionosphere_parent_id)) self.sent_to_ionosphere.append(base_name) else: logger.info('alert expiry key exists not sending to Ionosphere :: %s' % base_name) # Add values to the queue so the parent process can collate for key, value in anomaly_breakdown.items(): self.mirage_anomaly_breakdown_q.put((key, value)) for key, value in exceptions.items(): self.mirage_exceptions_q.put((key, value)) metric_var_files = [] timeseries = [] if os.path.isfile(metric_check_file): # Remove metric check file try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: logger.error('error :: failed to remove check file - %s' % metric_check_file) pass # Remove the metric directory if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('removed data dir - %s' % metric_data_dir) except: logger.error('error :: failed to rmtree %s' % metric_data_dir) # @added 20190408 - Feature #2882: Mirage - periodic_check # Remove the training_dir for mirage_periodic_check_metrics if not # anomalous if not anomalous: try: mirage_periodic_check_metrics = list(self.redis_conn.smembers('mirage.periodic_check.metrics')) except: logger.error('error :: failed to get mirage_periodic_check_metrics from Redis') mirage_periodic_check_metrics = [] if metric in mirage_periodic_check_metrics: timeseries_dir = base_name.replace('.', '/') training_dir = '%s/%s/%s' % ( settings.IONOSPHERE_DATA_FOLDER, str(metric_timestamp), str(timeseries_dir)) if os.path.exists(training_dir): try: rmtree(training_dir) logger.info('removed Mirage periodic check training_data dir - %s' % training_dir) except: logger.error('error :: failed to rmtree Mirage periodic check training_dir - %s' % training_dir) del mirage_periodic_check_metrics
[docs] def run(self): """ Called when the process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) pass now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) pass else: logger.info('bin/%s.d log management done' % skyline_app) def smtp_trigger_alert(alert, metric, second_order_resolution_seconds, context): # Spawn processes pids = [] spawned_pids = [] pid_count = 0 try: p = Process(target=self.spawn_alerter_process, args=(alert, metric, second_order_resolution_seconds, context)) pids.append(p) pid_count += 1 p.start() spawned_pids.append(p.pid) except: logger.error('error :: failed to spawn_alerter_process') logger.info(traceback.format_exc()) p_starts = time() while time() - p_starts <= 15: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. break else: # We only enter this if we didn't 'break' above. logger.info('%s :: timed out, killing the spawn_trigger_alert process' % (skyline_app)) for p in pids: p.terminate() # p.join() for p in pids: if p.is_alive(): logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive()))) p.join() """ DEVELOPMENT ONLY # @added 20160806 - Bug #1558: Memory leak in Analyzer # Debug with garbage collection - http://code.activestate.com/recipes/65333/ """ if ENABLE_MEMORY_PROFILING and garbage_collection_enabled: # Debug with garbage collection - http://code.activestate.com/recipes/65333/ gc.enable() gc.set_debug(gc.DEBUG_LEAK) # As per http://stackoverflow.com/a/1641280 # This got useable understandable data with gc before = defaultdict(int) after = defaultdict(int) for i in get_objects(): before[type(i)] += 1 if LOCAL_DEBUG: logger.info('debug :: Memory usage in run at start: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) while 1: now = time() """ DEVELOPMENT ONLY # @added 20160806 - Bug #1558: Memory leak in Analyzer # Debug with garbage collection - http://code.activestate.com/recipes/65333/ """ if ENABLE_MEMORY_PROFILING and garbage_collection_enabled: # As per http://stackoverflow.com/a/1641280 # This got useable understandable data with gc before = defaultdict(int) after = defaultdict(int) for i in get_objects(): before[type(i)] += 1 if LOCAL_DEBUG: logger.info('debug :: Memory usage before looking for checks: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) # Make sure Redis is up try: self.redis_conn.ping() except: logger.info('skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(10) logger.info('connecting to redis at socket path %s' % settings.REDIS_SOCKET_PATH) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) if self.redis_conn.ping(): logger.info('connected to redis') continue """ Determine if any metric to analyze or Ionosphere alerts to be sent """ while True: # Report app up self.redis_conn.setex(skyline_app, 120, now) # @added 20161228 - Feature #1828: ionosphere - mirage Redis data features # If Ionosphere is going to pass alerts back to the app # here we are going to have break out and force a alerting # only run. ionosphere_alerts = None ionosphere_alerts_returned = False metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))] # @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Do not pospone the Ionosphere alerts check on based on whether # there are checks on not # if len(metric_var_files) == 0: if not ionosphere_alerts_returned: # @modified 20161228 - Feature #1830: Ionosphere alerts try: ionosphere_alerts = list(self.redis_conn.scan_iter(match='ionosphere.mirage.alert.*')) ionosphere_alerts_returned = True except: logger.error(traceback.format_exc()) logger.error('error :: failed to scan ionosphere.mirage.alert.* from Redis') ionosphere_alerts = [] if len(ionosphere_alerts) == 0: ionosphere_alerts_returned = False else: logger.info('Ionosphere alert requested :: %s' % str(ionosphere_alerts)) # @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Do not pospone the Ionosphere alerts check # if not ionosphere_alerts_returned: # logger.info('sleeping no metrics...') # sleep(10) # @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Move this len(metric_var_files) from above and apply the # appropriatte sleep if len(metric_var_files) == 0: if not ionosphere_alerts_returned: logger.info('sleeping no metrics...') sleep(10) else: sleep(1) # Clean up old files now_timestamp = time() stale_age = now_timestamp - settings.MIRAGE_STALE_SECONDS for current_file in listdir(settings.MIRAGE_CHECK_PATH): if os.path.isfile(settings.MIRAGE_CHECK_PATH + "/" + current_file): t = os.stat(settings.MIRAGE_CHECK_PATH + "/" + current_file) c = t.st_ctime # delete file if older than a week if c < stale_age: os.remove(settings.MIRAGE_CHECK_PATH + "/" + current_file) logger.info('removed stale check - %s' % (current_file)) # Discover metric to analyze metric_var_files = '' # @added 20161228 - Feature #1830: Ionosphere alerts # Prioritises Ionosphere alerts if ionosphere_alerts_returned: break metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))] if len(metric_var_files) > 0: break # @modified 20161228 - Feature #1830: Ionosphere alerts # Only spawn process if this is not an Ionosphere alert if not ionosphere_alerts_returned: metric_var_files_sorted = sorted(metric_var_files) # metric_check_file = settings.MIRAGE_CHECK_PATH + "/" + metric_var_files_sorted[0] processing_check_file = metric_var_files_sorted[0] logger.info('processing %s' % processing_check_file) # Remove any existing algorithm.error files from any previous runs # that did not cleanup for any reason pattern = '%s.*.algorithm.error' % skyline_app try: for f in os.listdir(settings.SKYLINE_TMP_DIR): if re.search(pattern, f): try: os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f)) logger.info('cleaning up old error file - %s' % (str(f))) except OSError: pass except: logger.error('failed to cleanup mirage_algorithm.error files - %s' % (traceback.format_exc())) # Spawn processes pids = [] spawned_pids = [] pid_count = 0 MIRAGE_PROCESSES = 1 # @modified 20161224 - send mirage metrics to graphite # run_timestamp = int(now) run_timestamp = int(time()) for i in range(1, MIRAGE_PROCESSES + 1): p = Process(target=self.spin_process, args=(i, run_timestamp)) pids.append(p) pid_count += 1 logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(MIRAGE_PROCESSES))) p.start() spawned_pids.append(p.pid) # Send wait signal to zombie processes # for p in pids: # p.join() # Self monitor processes and terminate if any spin_process has run # for longer than 180 seconds - 20160512 @earthgecko p_starts = time() while time() - p_starts <= settings.MAX_ANALYZER_PROCESS_RUNTIME: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('%s :: %s spin_process/es completed in %.2f seconds' % ( skyline_app, str(MIRAGE_PROCESSES), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app)) for p in pids: p.terminate() # p.join() for p in pids: if p.is_alive(): logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive()))) p.join() # Log the last reported error by any algorithms that errored in the # spawned processes from algorithms.py for completed_pid in spawned_pids: logger.info('spin_process with pid %s completed' % (str(completed_pid))) for algorithm in settings.MIRAGE_ALGORITHMS: algorithm_error_file = '%s/%s.%s.%s.algorithm.error' % ( settings.SKYLINE_TMP_DIR, skyline_app, str(completed_pid), algorithm) if os.path.isfile(algorithm_error_file): logger.info( 'error - spin_process with pid %s has reported an error with the %s algorithm' % ( str(completed_pid), algorithm)) try: with open(algorithm_error_file, 'r') as f: error_string = f.read() logger.error('%s' % str(error_string)) except: logger.error('failed to read %s error file' % algorithm) try: os.remove(algorithm_error_file) except OSError: pass # Grab data from the queue and populate dictionaries exceptions = dict() anomaly_breakdown = dict() while 1: try: key, value = self.mirage_anomaly_breakdown_q.get_nowait() if key not in anomaly_breakdown.keys(): anomaly_breakdown[key] = value else: anomaly_breakdown[key] += value except Empty: break while 1: try: key, value = self.mirage_exceptions_q.get_nowait() if key not in exceptions.keys(): exceptions[key] = value else: exceptions[key] += value except Empty: break for metric_variable in self.metric_variables: if metric_variable[0] == 'metric_name': metric_name = metric_variable[1] if metric_variable[0] == 'metric_value': metric_value = metric_variable[1] if metric_variable[0] == 'hours_to_resolve': hours_to_resolve = metric_variable[1] # if metric_variable[0] == 'metric_timestamp': # metric_timestamp = metric_variable[1] logger.info('analysis done - %s' % metric_name) # Send alerts # Calculate hours second order resolution to seconds logger.info('analyzed at %s hours resolution' % hours_to_resolve) second_order_resolution_seconds = int(hours_to_resolve) * 3600 logger.info('analyzed at %s seconds resolution' % second_order_resolution_seconds) # Remove metric check file metric_check_file = '%s/%s' % (settings.MIRAGE_CHECK_PATH, processing_check_file) if os.path.isfile(metric_check_file): try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass # Remove the metric directory timeseries_dir = metric_name.replace('.', '/') metric_data_dir = '%s/%s' % (settings.MIRAGE_CHECK_PATH, timeseries_dir) if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('removed - %s' % metric_data_dir) except: logger.error('error :: failed to rmtree %s' % metric_data_dir) if settings.MIRAGE_ENABLE_ALERTS: # @added 20161228 - Feature #1830: Ionosphere alerts # Branch #922: Ionosphere # Bringing Ionosphere online - do alert on Ionosphere metrics try: ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics')) except: logger.error(traceback.format_exc()) logger.error('error :: failed to get ionosphere.unique_metrics from Redis') ionosphere_unique_metrics = [] # @added 20161228 - Feature #1830: Ionosphere alerts # Branch #922: Ionosphere # Send alerts for Ionosphere alert_context = 'Mirage' if ionosphere_alerts_returned: alert_context = 'Ionosphere' ionosphere_unique_metrics = [] logger.info('Ionosphere alerts requested emptying ionosphere_unique_metrics so Mirage will alert') exceptions = dict() run_timestamp = int(time()) ionosphere_alert_on = list(self.redis_conn.scan_iter(match='ionosphere.mirage.alert.*')) for cache_key in ionosphere_alert_on: try: alert_on = self.redis_conn.get(cache_key) send_alert_for = literal_eval(alert_on) value = float(send_alert_for[0]) base_name = str(send_alert_for[1]) metric_timestamp = int(float(send_alert_for[2])) triggered_algorithms = send_alert_for[3] second_order_resolution_seconds = int(send_alert_for[4]) anomalous_metric = [value, base_name, metric_timestamp, second_order_resolution_seconds] self.anomalous_metrics.append(anomalous_metric) anomaly_breakdown = dict() for algorithm in triggered_algorithms: anomaly_breakdown[algorithm] = 1 self.redis_conn.delete(cache_key) except: logger.error(traceback.format_exc()) logger.error('error :: failed to add an Ionosphere anomalous_metric for %s' % base_name) # @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop # To reduce the amount of I/O used by Mirage in this loop check # and reduce the number of log entries for 'not alerting - Ionosphere metric' # a check is made if the metric_name has already been check, if # so continue not_alerting_for_ionosphere = 'none' # @added 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Only check Ionosphere is up once per cycle ionosphere_up = False for alert in settings.ALERTS: # @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop not_an_ionosphere_metric_check_done = 'none' for metric in self.anomalous_metrics: # @added 20161228 - Feature #1830: Ionosphere alerts # Branch #922: Ionosphere # Bringing Ionosphere online - do alert on Ionosphere # metrics if Ionosphere is up metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric[1])) if metric_name in ionosphere_unique_metrics: # @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop if not_alerting_for_ionosphere == metric_name: continue # @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Only check Ionosphere is up once per cycle # ionosphere_up = False if not ionosphere_up: try: ionosphere_up = self.redis_conn.get('ionosphere') except Exception as e: logger.error('error :: could not query Redis for ionosphere key: %s' % str(e)) if ionosphere_up: # @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Wrapped this block up on conditional based on # ionosphere_alerts_returned if not ionosphere_alerts_returned: logger.info('not alerting - Ionosphere metric - %s' % str(metric[1])) # @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop not_alerting_for_ionosphere = metric_name continue else: logger.error('error :: Ionosphere not report up') logger.info('taking over alerting from Ionosphere if alert is matched on - %s' % str(metric[1])) else: # @modified 20181114 - Bug #2682: Reduce mirage ionosphere alert loop # logger.info('not an Ionosphere metric checking whether to alert - %s' % str(metric[1])) if not_an_ionosphere_metric_check_done == metric_name: # Do not log multiple times for this either not_an_ionosphere_metric_check_done = metric_name else: if not ionosphere_alerts_returned: logger.info('not an Ionosphere metric checking whether to alert - %s' % str(metric[1])) not_an_ionosphere_metric_check_done = metric_name ALERT_MATCH_PATTERN = alert[0] METRIC_PATTERN = metric[1] try: alert_match_pattern = re.compile(ALERT_MATCH_PATTERN) pattern_match = alert_match_pattern.match(METRIC_PATTERN) except: pattern_match = False # @modified 20160806 - Reintroduced the original # substring matching after wildcard matching, to allow # more flexibility if not pattern_match: if alert[0] in metric[1]: pattern_match = True if pattern_match: cache_key = 'mirage.last_alert.%s.%s' % (alert[1], metric[1]) try: last_alert = self.redis_conn.get(cache_key) if not last_alert: if ionosphere_alerts_returned: # @modified 20190410 - Feature #2882: Mirage - periodic_check # Only set if not set try: second_order_resolution_seconds + 1 set_second_order_resolution_seconds = False except: set_second_order_resolution_seconds = True if set_second_order_resolution_seconds: try: second_order_resolution_seconds = int(metric[3]) * 3600 except: logger.error(traceback.format_exc()) logger.error('error :: failed to determine full_duration from the Ionosphere alert for %s' % (metric[1])) logger.info('using settings.FULL_DURATION - %s' % (str(settings.FULL_DURATION))) second_order_resolution_seconds = settings.FULL_DURATION self.redis_conn.setex(cache_key, alert[2], packb(metric[0])) # trigger_alert(alert, metric, second_order_resolution_seconds, context) try: if alert[1] != 'smtp': trigger_alert(alert, metric, second_order_resolution_seconds, alert_context) else: smtp_trigger_alert(alert, metric, second_order_resolution_seconds, alert_context) logger.info('sent %s alert: For %s' % (alert[1], metric[1])) except Exception as e: logger.error('error :: could not send %s alert for %s: %s' % (alert[1], metric[1], e)) except Exception as e: logger.error('error :: could not query Redis for cache_key') if settings.NEGATE_ANALYZER_ALERTS: if len(self.anomalous_metrics) == 0: for negate_alert in settings.ALERTS: for not_anomalous_metric in self.not_anomalous_metrics: NEGATE_ALERT_MATCH_PATTERN = negate_alert[0] NOT_ANOMALOUS_METRIC_PATTERN = not_anomalous_metric[1] alert_match_pattern = re.compile(NEGATE_ALERT_MATCH_PATTERN) negate_pattern_match = alert_match_pattern.match(NOT_ANOMALOUS_METRIC_PATTERN) if negate_pattern_match: try: logger.info('negate alert sent: For %s' % (not_anomalous_metric[1])) trigger_negater(negate_alert, not_anomalous_metric, second_order_resolution_seconds, metric_value) except Exception as e: logger.error('error :: could not send alert: %s' % e) # Log progress if len(self.anomalous_metrics) > 0: logger.info('seconds since last anomaly :: %.2f' % (time() - now)) logger.info('total anomalies :: %d' % len(self.anomalous_metrics)) logger.info('exception stats :: %s' % exceptions) logger.info('anomaly breakdown :: %s' % anomaly_breakdown) # Log to Graphite run_time = time() - run_timestamp logger.info('seconds to run :: %.2f' % run_time) graphite_run_time = '%.2f' % run_time send_metric_name = skyline_app_graphite_namespace + '.run_time' send_graphite_metric(skyline_app, send_metric_name, graphite_run_time) if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED: try: sent_to_crucible = str(len(self.sent_to_crucible)) except: sent_to_crucible = '0' logger.info('sent_to_crucible :: %s' % sent_to_crucible) send_metric_name = '%s.sent_to_crucible' % skyline_app_graphite_namespace send_graphite_metric(skyline_app, send_metric_name, sent_to_crucible) if settings.PANORAMA_ENABLED: try: sent_to_panorama = str(len(self.sent_to_panorama)) except: sent_to_panorama = '0' logger.info('sent_to_panorama :: %s' % sent_to_panorama) send_metric_name = '%s.sent_to_panorama' % skyline_app_graphite_namespace send_graphite_metric(skyline_app, send_metric_name, sent_to_panorama) if settings.IONOSPHERE_ENABLED: try: sent_to_ionosphere = str(len(self.sent_to_ionosphere)) except: sent_to_ionosphere = '0' logger.info('sent_to_ionosphere :: %s' % sent_to_ionosphere) send_metric_name = '%s.sent_to_ionosphere' % skyline_app_graphite_namespace send_graphite_metric(skyline_app, send_metric_name, sent_to_ionosphere) # Reset counters self.anomalous_metrics[:] = [] self.not_anomalous_metrics[:] = [] # Reset metric_variables self.metric_variables[:] = [] self.sent_to_crucible[:] = [] self.sent_to_panorama[:] = [] self.sent_to_ionosphere[:] = [] """ DEVELOPMENT ONLY # @added 20160806 - Bug #1558: Memory leak in Analyzer # Debug with garbage collection - http://code.activestate.com/recipes/65333/ """ if ENABLE_MEMORY_PROFILING and garbage_collection_enabled: if settings.ENABLE_DEBUG or LOCAL_DEBUG: for i in get_objects(): after[type(i)] += 1 gc_results = [(k, after[k] - before[k]) for k in after if after[k] - before[k]] for gc_result in gc_results: logger.info('debug :: %s' % str(gc_result)) # @added 20160806 - Bug #1558: Memory leak in Analyzer # Debug with garbage collection - http://code.activestate.com/recipes/65333/ # show the dirt ;-) try: logger.info('garbage collecting') all_the_garbage = self.dump_garbage() except: logger.error('error :: during garbage collecting') logger.info(traceback.format_exc()) all_the_garbage = 'gc errored' if settings.ENABLE_DEBUG or LOCAL_DEBUG: logger.info(all_the_garbage) logger.info('garbage collected') if LOCAL_DEBUG: logger.info('debug :: Memory usage end of run: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) # Sleep if it went too fast if time() - now < 1: logger.info('sleeping due to low run time...') # sleep(10) sleep(1)