Source code for mirage.mirage_labelled_metrics

"""
mirage_labelled_metrics.py
"""
import logging
try:
    from Queue import Empty
except:
    from queue import Empty
# from redis import StrictRedis
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list() to reduce memory and number of
# processes
# from multiprocessing import Process, Manager, Queue
from multiprocessing import Process, Queue
import traceback
import re
# imports required for surfacing graphite JSON formatted timeseries for use in
# Mirage
import json
import sys
import os
from shutil import rmtree
from ast import literal_eval
from math import ceil
import datetime

# @added 20220722 - Task #4624: Change all dict copy to deepcopy
import copy

import settings
# @modified 20160922 - Branch #922: Ionosphere
# Added the send_anomalous_metric_to skyline_functions.py
from skyline_functions import (
    write_data_to_file, fail_check, send_anomalous_metric_to,
    # @modified 20220726 - Task #2732: Prometheus to Skyline
    #                      Branch #4300: prometheus
    # Moved send_graphite_metric
    # mkdir_p, send_graphite_metric, filesafe_metricname,
    mkdir_p, filesafe_metricname,
    # @added 20191030 - Bug #3266: py3 Redis binary objects not strings
    #                   Branch #3262: py3
    # Added a single functions to deal with Redis connection and the
    # charset='utf-8', decode_responses=True arguments required in py3
    get_redis_conn, get_redis_conn_decoded,
)

from mirage_algorithms import run_selected_algorithm
from algorithm_exceptions import TooShort, Stale, Boring

# @added 20220504 - Feature #2580: illuminance
from functions.illuminance.add_illuminance_entries import add_illuminance_entries

# @added 20220715 - Task #2732: Prometheus to Skyline
#                   Branch #4300: prometheus
from functions.victoriametrics.get_victoriametrics_metric import get_victoriametrics_metric
from functions.prometheus.metric_name_labels_parser import metric_name_labels_parser

# @added 20220726 - Task #2732: Prometheus to Skyline
#                   Branch #4300: prometheus
from functions.graphite.send_graphite_metric import send_graphite_metric

from functions.metrics.get_base_name_from_labelled_metrics_name import get_base_name_from_labelled_metrics_name

# @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak
from custom_algorithms import run_custom_algorithm_on_timeseries

# @added 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
from functions.timeseries.normalized_variance import normalized_variance

# @added 20230522 - metric_type.longterm_expire
from functions.timeseries.strictly_increasing_monotonicity import strictly_increasing_monotonicity

skyline_app = 'mirage'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

python_version = int(sys.version_info[0])

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

try:
    # @modified 20200606 - Bug #3572: Apply list to settings import
    BATCH_PROCESSING_NAMESPACES = list(settings.BATCH_PROCESSING_NAMESPACES)
except:
    BATCH_PROCESSING_NAMESPACES = []

# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
try:
    # @modified 20200606 - Bug #3572: Apply list to settings import
    # from settings import KNOWN_NEGATIVE_METRICS
    KNOWN_NEGATIVE_METRICS = list(settings.KNOWN_NEGATIVE_METRICS)
except:
    KNOWN_NEGATIVE_METRICS = []

# @added 20200607 - Feature #3566: custom_algorithms
try:
    CUSTOM_ALGORITHMS = settings.CUSTOM_ALGORITHMS
except:
    CUSTOM_ALGORITHMS = None
try:
    DEBUG_CUSTOM_ALGORITHMS = settings.DEBUG_CUSTOM_ALGORITHMS
except:
    DEBUG_CUSTOM_ALGORITHMS = False

# @added 20200913 - Branch #3068: SNAB
#                   Task #3744: POC matrixprofile
#                   Info #1792: Shapelet extraction
try:
    SNAB_ENABLED = settings.SNAB_ENABLED
except:
    SNAB_ENABLED = False
try:
    # @modified 20220722 - Task #4624: Change all dict copy to deepcopy
    # SNAB_CHECKS = settings.SNAB_CHECKS.copy()
    SNAB_CHECKS = copy.deepcopy(settings.SNAB_CHECKS)
except:
    SNAB_CHECKS = {}
# @added 20200916 - Branch #3068: SNAB
#                   Task #3744: POC matrixprofile
mirage_snab_only_checks_redis_set = 'mirage.snab_only_checks'

# @added 20201026 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle feedback metrics in a similar style to Ionosphere
try:
    SKYLINE_FEEDBACK_NAMESPACES = list(settings.SKYLINE_FEEDBACK_NAMESPACES)
except:
    # Let us take a guess
    try:
        graphite_host = str(settings.GRAPHITE_HOST)
        graphite_hostname = graphite_host.split('.', -1)[0]
        SKYLINE_FEEDBACK_NAMESPACES = [settings.SERVER_METRICS_NAME, graphite_hostname]
    except:
        SKYLINE_FEEDBACK_NAMESPACES = [this_host]

# @added 20210701 - Feature #4152: DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES
try:
    DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES = list(settings.DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES)
except:
    DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES = []

# @added 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
#                   Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
#                   Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Not introduced as a settings, making this the default behaviour
# try:
#     MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = settings.MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# except:
#     MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = False
MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = True

# @added 20210323 - Feature #3642: Anomaly type classification
try:
    LUMINOSITY_CLASSIFY_ANOMALIES = settings.LUMINOSITY_CLASSIFY_ANOMALIES
except:
    LUMINOSITY_CLASSIFY_ANOMALIES = False

# @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak
try:
    MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = settings.MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS
except:
    MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = 0
MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = 3

# @added 20230424 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
# to allow Mirage to set a key in a hash and to allow Analyzer labelled_metrics
# to skip analysis of those metrics for the expiry period.  This is to reduce
# metrics that are experiencing a normal anomalous daily peak to not have to
# be analysed by Analyzer every run and pushed to Mirage to check as this
# results in Mirage getting lots of unnecessary checks which caused feedback
# for the period in question.
anomalous_daily_peak_expiry = 180

skyline_app_graphite_namespace = 'skyline.%s%s.labelled_metrics' % (skyline_app, SERVER_METRIC_PATH)
failed_checks_dir = '%s_failed' % settings.MIRAGE_CHECK_PATH
# @added 20191107 - Branch #3262: py3
alert_test_file = '%s/%s_alert_test.txt' % (settings.SKYLINE_TMP_DIR, skyline_app)

# In Skyline a metric is either a counter (derivative) or a gauge
skyline_metric_types = {'COUNTER': 1, 'GAUGE': 0}
skyline_metric_types_by_id = {}
for o_key in list(skyline_metric_types.keys()):
    skyline_metric_types_by_id[skyline_metric_types[o_key]] = o_key

MIRAGE_LABELLED_CHECK_PATH = '%s_labelled_metrics' % settings.MIRAGE_CHECK_PATH


[docs]class MirageLabelledMetrics(Thread): """ The MirageLabelledMetrics thread """ def __init__(self, parent_pid): """ Initialize the Mirage """ super().__init__() self.daemon = True self.parent_pid = parent_pid self.current_pid = os.getpid() self.mirage_labelled_metrics_exceptions_q = Queue() self.mirage_labelled_metrics_anomaly_breakdown_q = Queue() self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: os.kill(self.current_pid, 0) os.kill(self.parent_pid, 0) except: # @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics # Log warning logger.warning('warning :: parent or current process dead') sys.exit(0)
# @added 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Bug #1460: panorama check file fails # Panorama check file fails #24 # Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def mirage_load_metric_vars(self, metric_vars_file): """ Load the metric variables for a check from a metric check variables file :param metric_vars_file: the path and filename to the metric variables files :type metric_vars_file: str :return: the metric_vars list or ``False`` :rtype: list """ if os.path.isfile(metric_vars_file): logger.info( 'loading metric variables from metric_check_file - %s' % ( str(metric_vars_file))) else: logger.error( 'error :: loading metric variables from metric_check_file - file not found - %s' % ( str(metric_vars_file))) return False metric_vars = [] with open(metric_vars_file) as f: for line in f: no_new_line = line.replace('\n', '') no_equal_line = no_new_line.replace(' = ', ',') array = str(no_equal_line.split(',', 1)) add_line = literal_eval(array) metric_vars.append(add_line) # @added 20200429 - Feature #3486: analyzer_batch # Feature #3480: batch_processing # Allow the check file to already hold a valid python list on one line # so that a check can be added by simply echoing to debug metric_vars # line from to log for any failed checks into a new Mirage check file # The original above pattern is still the default, this is for the check # files to be added by the operator from the log or for debugging. try_literal_eval = False if metric_vars: if isinstance(metric_vars, list): pass else: try_literal_eval = True logger.info('mirage_labelled_metrics :: metric_vars is not a list, set to try_literal_eval') if len(metric_vars) < 2: try_literal_eval = True logger.info('mirage_labelled_metrics :: metric_vars is not a list of lists, set to try_literal_eval') else: try_literal_eval = True logger.info('mirage_labelled_metrics :: metric_vars is not defined, set to try_literal_eval') if try_literal_eval: try: with open(metric_vars_file) as f: for line in f: metric_vars = literal_eval(line) if metric_vars: break except: logger.error(traceback.format_exc()) logger.error('metric_vars not loaded with literal_eval') metric_vars = [] string_keys = ['metric'] float_keys = ['value'] int_keys = ['hours_to_resolve', 'metric_timestamp'] # @added 20200916 - Branch #3068: SNAB # Task #3744: POC matrixprofile boolean_keys = ['snab_only_check'] # @added 20210304 - Feature #3642: Anomaly type classification # Feature #3970: custom_algorithm - adtk_level_shift # Added triggered_algorithms to mirage_check_file list_keys = ['triggered_algorithms'] metric_vars_array = [] for var_array in metric_vars: # @modified 20181023 - Feature #2618: alert_slack # Wrapped in try except for debugging issue where the # hours_to_resolve was interpolating to hours_to_resolve = "t" try: key = None value = None if var_array[0] in string_keys: key = var_array[0] _value_str = str(var_array[1]).replace("'", '') value_str = str(_value_str).replace('"', '') value = str(value_str) if var_array[0] == 'metric': metric = value if var_array[0] in float_keys: key = var_array[0] _value_str = str(var_array[1]).replace("'", '') value_str = str(_value_str).replace('"', '') value = float(value_str) if var_array[0] in int_keys: key = var_array[0] _value_str = str(var_array[1]).replace("'", '') value_str = str(_value_str).replace('"', '') value = int(float(value_str)) # @added 20200916 - Branch #3068: SNAB # Task #3744: POC matrixprofile # Handle new snab_only_check boolean if var_array[0] in boolean_keys: key = var_array[0] logger.debug( 'debug :: boolean key - key: %s, value: %s' % ( str(var_array[0]), str(var_array[1]))) if str(var_array[1]) == '"True"': value = True else: value = False # @added 20210304 - Feature #3642: Anomaly type classification # Feature #3970: custom_algorithm - adtk_level_shift # Added triggered_algorithms to mirage_check_file if var_array[0] in list_keys: key = var_array[0] logger.debug( 'debug :: list key - key: %s, value: %s' % ( str(var_array[0]), str(var_array[1]))) _value_str = str(var_array[1]).replace("'", '') try: value = literal_eval(var_array[1]) except Exception as e: logger.error( 'error :: loading metric variables - failed to literal_eval list for %s, %s - %s' % ( str(key), str(var_array[1]), e)) value = [] if key: metric_vars_array.append([key, value]) if len(metric_vars_array) == 0: logger.error( 'error :: loading metric variables - none found from %s' % ( str(metric_vars_file))) return False except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to load metric variables from check file - %s' % (metric_vars_file)) return False logger.info('mirage_labelled_metrics :: debug :: metric_vars for %s' % str(metric)) logger.info('mirage_labelled_metrics :: debug :: %s' % str(metric_vars_array)) return metric_vars_array
[docs] def create_check_file_from_hash_key(self, check_item): """ Create a check file from an analyzer_labelled_metrics.mirage_check key """ check = None hash_key = None redis_hash = 'analyzer_labelled_metrics.mirage_check' metric_prefix = None if check_item.startswith('analyzer_labelled_metrics.mirage_check.'): hash_key = check_item.replace('analyzer_labelled_metrics.mirage_check.', '', 1) metric_prefix = 'labelled_metrics' metric_data = {} try: metric_data_str = self.redis_conn_decoded.hget(redis_hash, hash_key) if metric_data_str: metric_data = literal_eval(metric_data_str) except Exception as err: logger.error('error :: mirage_labelled_metrics :: create_check_file_from_hash_key :: failed to get %s from %s from Redis hash - %s' % ( str(hash_key), redis_hash, err)) if metric_data: try: self.redis_conn_decoded.hdel(redis_hash, hash_key) logger.info('mirage_labelled_metrics :: create_check_file_from_hash_key :: removed %s from %s from Redis hash' % ( str(hash_key), redis_hash)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: create_check_file_from_hash_key :: failed to hdel %s from %s from Redis hash - %s' % ( str(hash_key), redis_hash, err)) try: base_name = metric_data['metric'] metric_id = metric_data['metric_id'] if metric_prefix: metric = '%s.%s' % (metric_prefix, str(metric_id)) else: metric = str(base_name) metric_timestamp = metric_data['timestamp'] value = metric_data['value'] triggered_algorithms = metric_data['triggered_algorithms'] try: use_hours_to_resolve = metric_data['hours_to_resolve'] except: use_hours_to_resolve = 168 try: snab_only_check = metric_data['snab_only_check'] except: snab_only_check = False if metric_prefix == 'labelled_metrics': metric_data['source'] = 'victoriametrics' metric_dict = {} try: metric_dict = metric_name_labels_parser(skyline_app, base_name) except Exception as err: logger.error('error :: mirage_labelled_metrics :: create_check_file_from_hash_key :: metric_name_labels_parser failed for %s - %s' % ( base_name, err)) for i_key in list(metric_dict.keys()): metric_data['metric_dict'][i_key] = metric_dict[i_key] anomaly_check_file = '%s/%s.%s.txt' % (MIRAGE_LABELLED_CHECK_PATH, str(metric_timestamp), metric) try: with open(anomaly_check_file, 'w') as fh: fh.write('metric = "%s"\nvalue = "%s"\nhours_to_resolve = "%s"\nmetric_timestamp = "%s"\nsnab_only_check = "%s"\ntriggered_algorithms = %s\n' % ( metric, str(value), str(use_hours_to_resolve), str(metric_timestamp), str(snab_only_check), str(triggered_algorithms))) mirage_anomaly_check_file_created = True except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to write anomaly_check_file %s - %s' % ( anomaly_check_file, err)) if mirage_anomaly_check_file_created: os.chmod(anomaly_check_file, mode=0o644) check = '%s.%s.txt' % (str(metric_timestamp), metric) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to create anomaly check file from metric_data: %s - %s' % ( str(metric_data), err)) return check, metric_data
# @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak
[docs] def clear_trigger_history(self, metric): """ Clear last item from the trigger history """ trigger_history = {} try: raw_trigger_history = self.redis_conn_decoded.hget('mirage.trigger_history', metric) if raw_trigger_history: trigger_history = literal_eval(raw_trigger_history) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to evaluate data from mirage.trigger_history Redis hash key - %s' % ( str(err))) return False if trigger_history: last_history_key = list(trigger_history.keys())[0] new_trigger_history = {} for history_timestamp in list(trigger_history.keys()): if history_timestamp != last_history_key: new_trigger_history[history_timestamp] = trigger_history[history_timestamp] if new_trigger_history: try: self.redis_conn_decoded.hset('mirage.trigger_history', metric, str(new_trigger_history)) logger.info('mirage_labelled_metrics :: removed last event for %s from mirage.trigger_history' % metric) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to set key in mirage.trigger_history Redis hash key - %s' % ( str(err))) return True
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes # def spin_process(self, i, run_timestamp):
[docs] def spin_process(self, i, run_timestamp, assigned_checks): """ Assign a metrics for a process to analyze. """ if not assigned_checks: logger.info('mirage_labelled_metrics :: no checks to assign to process, nothing to do') return process_start_timestamp = int(time()) # Make process-specific dicts exceptions = defaultdict(int) anomaly_breakdown = defaultdict(int) # TODO - testing redis_metrics_processed_key = 'mirage_labelled_metrics.%s.metrics_processed' % str(i) try: exists = self.redis_conn_decoded.exists(redis_metrics_processed_key) if exists: last_redis_metrics_processed_key = 'mirage_labelled_metrics.%s.metrics_processed.last' % str(i) self.redis_conn_decoded.rename(redis_metrics_processed_key, last_redis_metrics_processed_key) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to rename %s Redis hash - %s' % ( redis_metrics_processed_key, err)) # @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES feedback_labelled_metric_ids = [] feedback_labelled_metric_ids_skipped = [] analyzer_labelled_metrics_busy = False if settings.SKYLINE_FEEDBACK_NAMESPACES: try: analyzer_labelled_metrics_busy = self.redis_conn_decoded.get('analyzer_labelled_metrics.busy') except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to get analyzer_labelled_metrics.busy Redis key' % ( err)) # @added 20230605 - Feature #4932: mute_alerts_on mute_alerts_on = [] if not analyzer_labelled_metrics_busy: mute_alerts_on_dict = {} try: mute_alerts_on_dict = self.redis_conn_decoded.hgetall('metrics_manager.mute_alerts_on') except Exception as err: logger.error('error :: failed to hgetall metrics_manager.mute_alerts_on - %s' % ( err)) mute_alerts_on = [i_metric for i_metric in list(mute_alerts_on_dict.keys()) if i_metric.startswith('labelled_metrics.')] if mute_alerts_on: logger.info('mirage_labelled_metrics :: there are %s mute_alert_on labelled_metrics currently which shall be add to feedback_labelled_metric_ids if not set' % str(len(mute_alerts_on))) if analyzer_labelled_metrics_busy: logger.info('mirage_labelled_metrics :: analyzer_labelled_metrics_busy found') try: feedback_labelled_metric_ids = list(self.redis_conn_decoded.smembers('aet.metrics_manager.feedback.labelled_metric_ids')) except Exception as err: logger.error('error :: mirage_labelled_metrics :: smembers failed on Redis set aet.metrics_manager.feedback.labelled_metric_ids - %s' % ( err)) # @added 20230605 - Feature #4932: mute_alerts_on if not feedback_labelled_metric_ids and mute_alerts_on: for labelled_metric in mute_alerts_on: metric_id = labelled_metric.split('.')[-1] feedback_labelled_metric_ids.append(str(metric_id)) logger.info('mirage_labelled_metrics :: added %s mute_alert_on metric ids to feedback_labelled_metric_ids if not set' % str(len(mute_alerts_on))) if not analyzer_labelled_metrics_busy: analyzer_labelled_metrics_busy = True logger.info('mirage_labelled_metrics :: analyzer_labelled_metrics_busy set to True because mute_alert_on labelled_metrics were found') redis_set = 'analyzer.waterfall_alerts.sent_to_mirage' literal_analyzer_waterfall_alerts = [] try: literal_analyzer_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set)) except: literal_analyzer_waterfall_alerts = [] analyzer_waterfall_alerts = [] for literal_waterfall_alert in literal_analyzer_waterfall_alerts: waterfall_alert = literal_eval(literal_waterfall_alert) analyzer_waterfall_alerts.append(waterfall_alert) # @added 20230424 - Feature #4724: custom_algorithms - anomalous_daily_peak # Added expiry to record metrics identified as normal by anomalous_daily_peaks current_now = int(time()) current_aligned_ts = int(process_start_timestamp // 60 * 60) normal_daily_peaks_keys = [] for i in sorted(list(range(1, 3)), reverse=True): key_ts = current_aligned_ts - (60 * i) key = 'mirage.normal_daily_peak_metrics.%s' % str(key_ts) normal_daily_peaks_keys.append(key) normal_daily_peak_metrics_expiry = {} for normal_daily_peaks_key in normal_daily_peaks_keys: try: current_key_data = self.redis_conn_decoded.hgetall(normal_daily_peaks_key) except Exception as err: logger.error('error :: mirage_labelled_metrics :: hgetall failed on Redis hash %s - %s' % ( str(normal_daily_peaks_key), err)) for labelled_metric in list(current_key_data): if current_now > int(float(current_key_data[labelled_metric])): try: self.redis_conn_decoded.hdel(normal_daily_peaks_key, labelled_metric) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to remove %s from Redis hash %s - %s' % ( labelled_metric, normal_daily_peaks_key, err)) else: normal_daily_peak_metrics_expiry[labelled_metric] = int(float(current_key_data[labelled_metric])) checks_processed = 0 ionosphere_unique_metrics = [] for check_item in assigned_checks: check = str(check_item) if int(time()) >= (process_start_timestamp + 50): logger.info('mirage_labelled_metrics :: run time limit reached - stopping') break checks_processed += 1 metric_data = { 'source': 'graphite', } is_labelled_metric = False if check_item.startswith('analyzer_labelled_metrics.mirage_check.'): is_labelled_metric = True if is_labelled_metric: # Create a check file for backwards compatibility try: check, metric_data = self.create_check_file_from_hash_key(check_item) except Exception as err: logger.error('error :: mirage_labelled_metrics :: create_check_file_from_hash_key failed for %s - %s' % ( str(check_item), err)) continue else: continue logger.info('mirage_labelled_metrics :: checking metric_data: %s' % str(metric_data)) metric_id = 0 try: metric_id = metric_data['metric_id'] except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to determine metric_id from metric_data for %s - %s' % ( check_item, err)) # TODO - testing try: key = 'labelled_metrics.%s-%s' % (str(metric_id), str(metric_data['timestamp'])) self.redis_conn_decoded.hset(redis_metrics_processed_key, key, str(metric_data)) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add metric to %s Redis hash - %s' % (redis_metrics_processed_key, err)) metric_check_file = '%s/%s' % ( # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # settings.MIRAGE_CHECK_PATH, str(metric_var_files_sorted[0])) MIRAGE_LABELLED_CHECK_PATH, check) check_file_name = os.path.basename(str(metric_check_file)) check_file_timestamp = check_file_name.split('.', 1)[0] check_file_metricname_txt = check_file_name.split('.', 1)[1] check_file_metricname = check_file_metricname_txt.replace('.txt', '') check_file_metricname_dir = check_file_metricname.replace('.', '/') metric_failed_check_dir = '%s/%s/%s' % (failed_checks_dir, check_file_metricname_dir, check_file_timestamp) # Load metric variables # @modified 20160822 - Bug #1460: panorama check file fails # Changed to panorama style skyline_functions load_metric_vars # self.load_metric_vars(metric_check_file) # Load and validate metric variables try: # @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Bug #1460: panorama check file fails # Panorama check file fails #24 # Get rid of the skyline_functions imp as imp is deprecated in py3 anyway # metric_vars = load_metric_vars(skyline_app, str(metric_check_file)) metric_vars_array = self.mirage_load_metric_vars(str(metric_check_file)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to load metric variables from check file - %s' % str(metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue metric = None # @added 20200106 - Branch #3262: py3 # Task #3034: Reduce multiprocessing Manager list usage # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # redis_set_to_delete = 'mirage.metric_variables' redis_metric_variables_set = 'mirage_labelled_metrics.%s.metric_variables' % str(i) redis_set_to_delete = redis_metric_variables_set try: self.redis_conn.delete(redis_set_to_delete) logger.info('mirage_labelled_metrics :: deleted Redis set - %s' % redis_set_to_delete) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to delete Redis set - %s' % redis_set_to_delete) try: key = 'metric' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric = str(value_list[0]) metric_name = ['metric_name', metric] # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.metric_variables.append(metric_name) # @modified 20200903 - Task #3730: Validate Mirage running multiple processes redis_set = 'mirage_labelled_metrics.metric_variables' data = str(metric_name) try: # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # self.redis_conn.sadd(redis_set, data) self.redis_conn.sadd(redis_metric_variables_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) logger.info('mirage_labelled_metrics :: debug :: added metric_name %s from check file - %s' % (str(metric_name), metric_check_file)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to read metric variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue if not metric: logger.error('error :: mirage_labelled_metrics :: failed to load metric variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue value = None try: key = 'value' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] value = float(value_list[0]) metric_value = ['metric_value', value] # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.metric_variables.append(metric_value) redis_set = 'mirage_labelled_metrics.metric_variables' data = str(metric_value) try: # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # self.redis_conn.sadd(redis_set, data) self.redis_conn.sadd(redis_metric_variables_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to read value variable from check file - %s - %s' % (metric_check_file, err)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue if not value: # @modified 20181119 - Bug #2708: Failing to load metric vars if value == 0.0: pass else: logger.error('error :: mirage_labelled_metrics :: failed to load value variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue hours_to_resolve = None try: key = 'hours_to_resolve' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] hours_to_resolve = int(value_list[0]) hours_to_resolve_list = ['hours_to_resolve', hours_to_resolve] # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.metric_variables.append(hours_to_resolve_list) redis_set = 'mirage_labelled_metrics.metric_variables' data = str(hours_to_resolve_list) try: # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # self.redis_conn.sadd(redis_set, data) self.redis_conn.sadd(redis_metric_variables_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) except: logger.error('error :: mirage_labelled_metrics :: failed to read hours_to_resolve variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue if not hours_to_resolve: logger.error('error :: mirage_labelled_metrics :: failed to load hours_to_resolve variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue metric_timestamp = None try: key = 'metric_timestamp' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric_timestamp = int(value_list[0]) metric_timestamp_list = ['metric_timestamp', metric_timestamp] # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.metric_variables.append(metric_timestamp_list) redis_set = 'mirage_labelled_metrics.metric_variables' data = str(metric_timestamp_list) try: # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # self.redis_conn.sadd(redis_set, data) self.redis_conn.sadd(redis_metric_variables_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) except: logger.error('error :: mirage_labelled_metrics :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue if not metric_timestamp: logger.error('error :: mirage_labelled_metrics :: failed to load metric_timestamp variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) continue # @added 20200916 - Branch #3068: SNAB # Task #3744: POC matrixprofile snab_only_check = None try: key = 'snab_only_check' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] snab_only_check = value_list[0] except: snab_only_check = None snab_only_check_list = ['snab_only_check', snab_only_check] redis_set = 'mirage_labelled_metrics.metric_variables' data = str(snab_only_check_list) try: self.redis_conn.sadd(redis_metric_variables_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # @added 20210304 - Feature #3642: Anomaly type classification # Feature #3970: custom_algorithm - adtk_level_shift # Added triggered_algorithms to mirage_check_file try: key = 'triggered_algorithms' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] triggered_algorithms = value_list[0] except: triggered_algorithms = [] metric_data_dir = '%s/%s' % (settings.MIRAGE_DATA_FOLDER, str(metric)) # Ignore any metric check with a timestamp greater than MIRAGE_STALE_SECONDS int_metric_timestamp = int(metric_timestamp) int_run_timestamp = int(run_timestamp) metric_timestamp_age = int_run_timestamp - int_metric_timestamp if metric_timestamp_age > settings.MIRAGE_STALE_SECONDS: logger.info('mirage_labelled_metrics :: stale check :: %s check request is %s seconds old - discarding' % (metric, str(metric_timestamp_age))) # Remove metric check file if os.path.isfile(metric_check_file): os.remove(metric_check_file) logger.info('mirage_labelled_metrics :: removed check file - %s' % (metric_check_file)) else: logger.info('mirage_labelled_metrics :: could not remove check file - %s' % (metric_check_file)) # Remove the metric directory if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('mirage_labelled_metrics :: removed data dir - %s' % metric_data_dir) except: logger.error('error :: mirage_labelled_metrics :: failed to rmtree - %s' % metric_data_dir) # @added 20200903 - Task #3730: Validate Mirage running multiple processes redis_set = 'mirage_labelled_metrics.stale_check_discarded' try: self.redis_conn.sadd(redis_set, str(metric)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(metric), str(redis_set))) continue # Calculate hours second order resolution to seconds second_order_resolution_seconds = int(hours_to_resolve) * 3600 int_second_order_resolution_seconds = int(float(second_order_resolution_seconds)) second_resolution_timestamp = int_metric_timestamp - int_second_order_resolution_seconds labelled_metric_base_name = None labelled_metric_name = str(metric) try: labelled_metric_base_name = get_base_name_from_labelled_metrics_name(skyline_app, labelled_metric_name) except Exception as err: logger.error('error :: mirage_labelled_metrics :: get_base_name_from_labelled_metrics_name failed for %s - %s' % ( str(labelled_metric_name), str(err))) trigger_anomaly = False test_alert_and_trigger = False test_alert = False if 'test_alert' in metric_data: test_alert = metric_data['test_alert'] if 'trigger_anomaly' in metric_data: trigger_anomaly = metric_data['trigger_anomaly'] test_alert_and_trigger = True if test_alert: logger.info('test_alert found for %s set trigger_anomaly: %s' % ( str(metric), str(trigger_anomaly))) alert_tested_key = 'mirage.test_alerts.done.%s' % metric try: self.redis_conn_decoded.setex(alert_tested_key, 300, int(time())) logger.info('test_alert created Redis key %s' % alert_tested_key) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: failed to create Redis key %s - %s' % ( alert_tested_key, err)) # Remove any old json file related to the metric metric_json_file = '%s/%s.json' % (metric_data_dir, str(metric)) try: os.remove(metric_json_file) except OSError: pass # @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES feedback_metric = False feedback_key = 'mirage_labelled_metrics.feedback.expiry.%s' % str(metric_id) if analyzer_labelled_metrics_busy: if str(metric_id) in feedback_labelled_metric_ids: feedback_metric = True feedback_metric_expiry = 0 if feedback_metric: try: feedback_metric_expiry = self.redis_conn_decoded.get(feedback_key) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to get Redis key %s - %s' % ( str(feedback_key), err)) feedback_metric_expiry = 0 if feedback_metric_expiry: logger.info('mirage_labelled_metrics :: feedback metric expiry exists, removing check :: %s ' % metric) # Remove metric check file if os.path.isfile(metric_check_file): os.remove(metric_check_file) logger.info('mirage_labelled_metrics :: removed check file - %s' % (metric_check_file)) else: logger.info('mirage_labelled_metrics :: could not remove check file - %s' % (metric_check_file)) # Remove the metric directory if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('mirage_labelled_metrics :: removed data dir - %s' % metric_data_dir) except: logger.error('error :: mirage_labelled_metrics :: failed to rmtree - %s' % metric_data_dir) redis_set = 'analyzer.waterfall_alerts.sent_to_mirage' for waterfall_alert in analyzer_waterfall_alerts: if waterfall_alert[0] == metric: if int(waterfall_alert[1]) == metric_timestamp: try: self.redis_conn.srem(redis_set, str(waterfall_alert)) logger.info('mirage_labelled_metrics :: removed waterfall alert item from Redis set %s - %s' % ( redis_set, str(waterfall_alert))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to remove waterfall alert item for %s at %s from Redis set %s' % ( metric, str(metric_timestamp), redis_set)) continue data_source = metric_data['source'] # Get data from graphite if data_source == 'graphite': logger.error('error :: mirage_labelled_metrics :: data_source set to graphite for %s' % ( metric)) # Get data from victoriametrics if data_source == 'victoriametrics': base_name = metric_data['metric'] logger.info( 'retrieve data :: surfacing %s time series from victoriametrics for %s seconds' % ( base_name, str(second_order_resolution_seconds))) # @added 20230522 - metric_type.longterm_expire # Check that monotonicity of the metric at second_order_resolution_seconds # to determine whether it is different from the recorded longterm type current_metric_type = None try: current_metric_type = self.redis_conn_decoded.hget('skyline.labelled_metrics.id.type', str(metric_id)) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to determine metric_type from skyline.labelled_metrics.id.type - %s' % ( err)) test_timeseries = [] update_metric_type = False save_test_data = False if current_metric_type: try: test_timeseries = get_victoriametrics_metric( skyline_app, base_name, second_resolution_timestamp, metric_timestamp, 'list', 'object', metric_data={}, plot_parameters={}, do_not_type=True) except Exception as err: logger.error('error :: mirage_labelled_metrics :: get_victoriametrics_metric failed getting test_timeseries for %s - %s' % ( base_name, err)) if test_timeseries: is_strictly_increasing_monotonic = None try: is_strictly_increasing_monotonic = strictly_increasing_monotonicity(test_timeseries) except Exception as err: logger.error('error :: mirage_labelled_metrics :: get_victoriametrics_metric failed getting test_timeseries for %s - %s' % ( base_name, err)) determined_metric_type = None if str(is_strictly_increasing_monotonic) != 'None': if is_strictly_increasing_monotonic: determined_metric_type = '1' else: determined_metric_type = '0' if str(determined_metric_type) != 'None': if current_metric_type != determined_metric_type: update_metric_type = True else: if determined_metric_type == '0': save_test_data = False if update_metric_type: logger.info('mirage_labelled_metrics :: detected change in metric_type updating from %s to %s for %s ' % ( current_metric_type, determined_metric_type, metric)) try: self.redis_conn_decoded.hset('skyline.labelled_metrics.id.type', str(metric_id), determined_metric_type) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to determine update skyline.labelled_metrics.id.type with metric_type - %s' % ( err)) # Set a new expire in the longterm_expire hash to have metrics_manager # recheck the metric at 30 days. try: new_expire = int(metric_timestamp) - 3600 self.redis_conn_decoded.hset('skyline.labelled_metrics.id.type.longterm_expire', str(metric_id), str(new_expire)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to update skyline.labelled_metrics.id.type.longterm_expire with new expire value - %s' % ( err)) if save_test_data: output_object_path = os.path.dirname(metric_json_file_saved) if not os.path.isdir(output_object_path): try: mkdir_p(output_object_path) logger.info( 'output_object_path - %s' % str(output_object_path)) except Exception as err: logger.error( 'error :: failed to create output_object_path - %s - %s' % ( str(output_object_path,), err)) with open(metric_json_file_saved, 'w') as f: f.write(json.dumps(test_timeseries)) os.chmod(metric_json_file_saved, mode=0o644) # @modified 20230522 - metric_type.longterm_expire # Only surface if the test data was not saved if not save_test_data: try: # get_victoriametrics_metric automatically applies the rate and # step required no downsampling or nonNegativeDerivative is # required. metric_json_file_saved = get_victoriametrics_metric( skyline_app, base_name, second_resolution_timestamp, metric_timestamp, 'json', metric_json_file, metric_data) if metric_json_file_saved: logger.info('mirage_labelled_metrics :: %s time series data saved to %s' % (metric, metric_json_file_saved)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: get_victoriametrics_metric failed for %s - %s' % ( str(metric_json_file), err)) # Check there is a json timeseries file to test if not os.path.isfile(metric_json_file): logger.error( 'error :: retrieve failed - failed to surface %s time series from %s' % ( metric, data_source)) # @added 20200905 - Feature #3734: waterfall alerts # Try a metric 3 times before removing the check file remove_check_file = True check_failed_key = 'mirage.check.data_retrieval_failed.%s.%s' % ( str(int_metric_timestamp), metric) fail_count = 0 try: fail_count = self.redis_conn.get(check_failed_key) except: fail_count = 0 if not fail_count: fail_count = 0 fail_count += 1 if fail_count < 3: remove_check_file = False try: self.redis_conn.setex(check_failed_key, 300, fail_count) logger.info('mirage_labelled_metrics :: updated fail_count to %s in %s' % (str(fail_count), check_failed_key)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to set Redis key %s with %s' % ( str(check_failed_key), str(fail_count))) else: logger.error('error :: mirage_labelled_metrics :: fail_count is %s in %s, removing check file' % (str(fail_count), check_failed_key)) if remove_check_file: # Remove metric check file try: os.remove(metric_check_file) except OSError: pass # Remove the metric directory try: rmtree(metric_data_dir) logger.info('mirage_labelled_metrics :: removed data dir - %s' % metric_data_dir) except: logger.error('error :: mirage_labelled_metrics :: failed to rmtree %s' % metric_data_dir) continue logger.info('mirage_labelled_metrics :: retrieved data :: for %s at %s seconds' % ( metric, str(second_order_resolution_seconds))) # Make process-specific dicts # exceptions = defaultdict(int) # anomaly_breakdown = defaultdict(int) self.check_if_parent_is_alive() timeseries = [] try: with open((metric_json_file), 'r') as f: timeseries = json.loads(f.read()) logger.info('mirage_labelled_metrics :: data points surfaced :: %s' % (str(len(timeseries)))) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to create timeseries from %s - %s' % ( str(metric_json_file), err)) timeseries = [] # @added 20170212 - Feature #1886: Ionosphere learn # Only process if the metric has sufficient data first_timestamp = None try: first_timestamp = int(timeseries[0][0]) except Exception as err: logger.error('error :: mirage_labelled_metrics :: could not determine first timestamp - %s' % err) timestamp_now = int(time()) valid_if_before_timestamp = timestamp_now - int(settings.FULL_DURATION) valid_mirage_timeseries = True if first_timestamp: if first_timestamp > valid_if_before_timestamp: valid_mirage_timeseries = False else: valid_mirage_timeseries = False logger.warning('warning :: no first_timestamp, valid_mirage_timeseries: %s' % str(valid_mirage_timeseries)) valid_mirage_timeseries = True redis_metric_name = '%s' % str(metric) metric_id = 0 if is_labelled_metric: metric_id = metric_data['metric_id'] redis_metric_name = 'labelled_metrics.%s' % str(metric_id) # @added 20200425 - Feature #3508: ionosphere.untrainable_metrics # Determine if any metrcs have negatives values some they can be # added to the ionosphere.untrainable_metrics Redis set run_negatives_present = False # @added 20200607 - Feature #3566: custom_algorithms algorithms_run = list(settings.MIRAGE_ALGORITHMS) # @added 20200904 - Feature #3734: waterfall alerts anomalous = None # @added 20201001 - Branch #3068: SNAB # Task #3748: POC SNAB # Add timings analysis_start_time = time() try: if valid_mirage_timeseries: logger.info('mirage_labelled_metrics :: analyzing :: %s at %s seconds' % (metric, second_order_resolution_seconds)) # @modified 20230118 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # Added current_func anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms, current_func='mirage_labelled_metrics') logger.info('mirage_labelled_metrics :: analysed :: %s - anomalous: %s' % (metric, str(anomalous))) else: logger.info('mirage_labelled_metrics :: not analyzing :: %s at %s seconds as there is not sufficiently older datapoints in the timeseries - not valid_mirage_timeseries' % (metric, second_order_resolution_seconds)) anomalous = False if timeseries: datapoint = timeseries[-1][1] else: datapoint = 0 # @added 20220315 - Feature #4482: Test alerts # Allow to test on sparse metrics if test_alert or test_alert_and_trigger: anomalous = True logger.info('mirage_labelled_metrics :: test_alert - setting anomalous to True for %s' % metric) ensemble = [True] triggered_algorithms = ['testing'] algorithms_run = ['histogram_bins'] negatives_found = False # It could have been deleted by the Roomba except TypeError: # @added 20200430 - Feature #3480: batch_processing # Added logging here as the DeletedByRoomba exception is # generally not related to that but related to some other fail # in the processing of the run algorithms phase. # It could have been deleted by the Roomba, but Mirage does not use # Redis data so probably, definitely was not :) logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: added as DeletedByRoomba but possibly not see traceback above') exceptions['DeletedByRoomba'] += 1 logger.info('mirage_labelled_metrics :: exceptions :: DeletedByRoomba') except TooShort: exceptions['TooShort'] += 1 logger.info('mirage_labelled_metrics :: exceptions :: TooShort') except Stale: exceptions['Stale'] += 1 logger.info('mirage_labelled_metrics :: exceptions :: Stale') except Boring: exceptions['Boring'] += 1 logger.info('mirage_labelled_metrics :: exceptions :: Boring') except Exception as err: exceptions['Other'] += 1 logger.info('mirage_labelled_metrics :: exceptions :: Other') logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: unhandled error - %s' % err) # @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES if feedback_metric: try: self.redis_conn_decoded.setex(feedback_key, 300, int(time())) except Exception as err: logger.error('error :: mirage_labelled_metrics :: setex failed on %s - %s' % ( feedback_key, err)) # @added 20220420 - Feature #4530: namespace.analysed_events parent_namespace = metric.split('.', maxsplit=1)[0] if is_labelled_metric: try: parent_namespace = metric_data['metric_dict']['labels']['_tenant_id'] except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to determine parent_namespace from metric_data: %s - %s' % ( str(metric_data), err)) date_string = str(strftime('%Y-%m-%d', gmtime())) namespace_analysed_events_hash = 'namespace.analysed_events.%s.%s' % (skyline_app, date_string) try: self.redis_conn.hincrby(namespace_analysed_events_hash, parent_namespace, 1) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to increment %s Redis hash - %s' % ( namespace_analysed_events_hash, err)) try: self.redis_conn.expire(namespace_analysed_events_hash, (86400 * 15)) logger.info('mirage_labelled_metrics :: updated %s Redis hash' % namespace_analysed_events_hash) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to set expire %s Redis hash - %s' % ( namespace_analysed_events_hash, err)) # @added 20201001 - Branch #3068: SNAB # Task #3748: POC SNAB # Add timings analysis_run_time = time() - analysis_start_time logger.info('mirage_labelled_metrics :: algorithms analysis completed in %.2f seconds' % ( analysis_run_time)) # @added 20210309 - Task #3730: Validate Mirage running multiple processes # Reimplement mirage.checks.done count try: self.redis_conn.incr('mirage_labelled_metrics.checks.done') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to increment mirage_labelled_metrics.checks.done Redis key - %s' % str(err)) # @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion # base_name = metric.replace(settings.FULL_NAMESPACE, '', 1) if metric.startswith(settings.FULL_NAMESPACE): base_name = metric.replace(settings.FULL_NAMESPACE, '', 1) else: base_name = metric # @added 20200904 - Feature #3734: waterfall alerts # Remove the metric from the waterfall_alerts Redis set # [metric, timestamp, value, added_to_waterfall_timestamp] # waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp. waterfall_panorama_data] redis_set = 'analyzer.waterfall_alerts.sent_to_mirage' get_waterfall_alerts_once = True if not get_waterfall_alerts_once: literal_analyzer_waterfall_alerts = [] try: literal_analyzer_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set)) except: literal_analyzer_waterfall_alerts = [] analyzer_waterfall_alerts = [] for literal_waterfall_alert in literal_analyzer_waterfall_alerts: waterfall_alert = literal_eval(literal_waterfall_alert) analyzer_waterfall_alerts.append(waterfall_alert) if test_alert or test_alert_and_trigger: anomalous = True logger.info('mirage_labelled_metrics :: test_alert - setting anomalous to True for %s' % metric) ensemble = [True] triggered_algorithms = ['testing'] algorithms_run = ['histogram_bins'] negatives_found = False # @added 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days irregular_unstable_timeseries = False if anomalous: low_variance = 0.009 normalized_var = None start_normalized_var = time() try: normalized_var = normalized_variance(timeseries) except Exception as err: logger.error('error :: mirage_labelled_metrics :: normalized_variance failed on timeseries for %s - %s' % ( metric, err)) if isinstance(normalized_var, dict): err = normalized_var['error'] logger.error('error :: mirage_labelled_metrics :: normalized_variance reported an error with timeseries for %s - %s' % ( metric, err)) normalized_var = None if isinstance(normalized_var, float): if normalized_var <= low_variance: irregular_unstable_timeseries = True logger.info('mirage_labelled_metrics :: normalized_variance ran with result: %s (took %.6f seconds), for %s' % ( str(normalized_var), (time() - start_normalized_var), metric)) # @added 20230515 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days if anomalous and irregular_unstable_timeseries: logger.info('mirage_labelled_metrics :: checking irregular_unstable %s' % ( metric)) result = True start_irregular_unstable = time() try: custom_algorithm = 'irregular_unstable' custom_algorithms_to_run = {} custom_algorithms_to_run[custom_algorithm] = { 'namespaces': ['labelled_metrics'], 'algorithm_source': '/opt/skyline/github/skyline/skyline/custom_algorithms/irregular_unstable.py', 'algorithm_parameters': { 'low_variance': 0.009, 'labelled_metric_name': labelled_metric_name, 'metric': labelled_metric_base_name, 'debug_logging': True, }, 'max_execution_time': 3.0, 'consensus': 1, 'algorithms_allowed_in_consensus': ['irregular_unstable'], # 'debug_logging': False, 'debug_logging': True, 'run_3sigma_algorithms': False, 'run_before_3sigma': False, 'run_only_if_consensus': False, 'trigger_history_override': False, 'use_with': ['mirage'], } # use_debug_logging_here = False use_debug_logging_here = True result, anomalyScore = run_custom_algorithm_on_timeseries(skyline_app, os.getpid(), labelled_metric_name, timeseries, 'irregular_unstable', custom_algorithms_to_run[custom_algorithm], use_debug_logging_here, current_func='mirage_labelled_metrics') algorithms_run.append(custom_algorithm) ensemble.append(result) if DEBUG_CUSTOM_ALGORITHMS or use_debug_logging_here: logger.debug('debug :: mirage_labelled_metrics :: run_custom_algorithm_on_timeseries run irregular_unstable with result - %s, anomalyScore - %s' % ( str(result), str(anomalyScore))) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: run_custom_algorithm_on_timeseries irregular_unstable failed on %s - %s' % ( str(metric), err)) result = None logger.info('mirage_labelled_metrics :: irregular_unstable ran with result: %s (took %.6f seconds), for %s - %s' % ( str(result), (time() - start_irregular_unstable), labelled_metric_name, labelled_metric_base_name)) # Although fine in a notebook does not have the desired effect # in the runtime so convert to a str and check # if result is False: if str(result) == 'False': logger.info('mirage_labelled_metrics :: irregular_unstable is overrriding anomalous result for %s' % ( labelled_metric_name)) anomalous = False # Clear last item from the trigger history as anomalous_daily_peak # is a 3sigma method after all try: cleared_trigger_history = self.clear_trigger_history(labelled_metric_name) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: clear_trigger_history failed on %s - %s' % ( str(labelled_metric_name), err)) # @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak # Determine if an anomaly is a normal peak value of normal magnitude # that occurs daily in a 7 day period if anomalous and MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS: logger.info('mirage_labelled_metrics :: checking anomalous_daily_peak %s' % ( metric)) result = True start_anomalous_daily_peak = time() try: custom_algorithm = 'anomalous_daily_peak' custom_algorithms_to_run = {} custom_algorithms_to_run[custom_algorithm] = { 'namespaces': ['labelled_metrics'], 'algorithm_source': '/opt/skyline/github/skyline/skyline/custom_algorithms/anomalous_daily_peak.py', 'algorithm_parameters': { 'number_of_daily_peaks': MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS, 'within_percent_of_normal_peaks': 20.0, # @added 20230424 - Feature #4724: custom_algorithms - anomalous_daily_peak # Added expiry to record metrics identified as normal by anomalous_daily_peaks 'expiry': anomalous_daily_peak_expiry, 'debug_logging': True, }, 'max_execution_time': 2.0, 'consensus': 1, 'algorithms_allowed_in_consensus': ['anomalous_daily_peak'], # 'debug_logging': False, 'debug_logging': True, 'run_3sigma_algorithms': False, 'run_before_3sigma': False, 'run_only_if_consensus': False, 'trigger_history_override': False, 'use_with': ['mirage'], } # use_debug_logging_here = False use_debug_logging_here = True # @modified 20230118 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # Added current_func result, anomalyScore = run_custom_algorithm_on_timeseries(skyline_app, os.getpid(), metric, timeseries, 'anomalous_daily_peak', custom_algorithms_to_run[custom_algorithm], use_debug_logging_here, current_func='mirage_labelled_metrics') algorithms_run.append(custom_algorithm) ensemble.append(result) if DEBUG_CUSTOM_ALGORITHMS or use_debug_logging_here: logger.debug('debug :: mirage_labelled_metrics :: run_custom_algorithm_on_timeseries run anomalous_daily_peak with result - %s, anomalyScore - %s' % ( str(result), str(anomalyScore))) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: run_custom_algorithm_on_timeseries anomalous_daily_peak failed on %s - %s' % ( str(metric), err)) result = None logger.info('mirage_labelled_metrics :: anomalous_daily_peak ran with result: %s (took %.6f seconds), for %s' % ( str(result), (time() - start_anomalous_daily_peak), metric)) # Although fine in a notebook does not have the desired effect # in the runtime so convert to a str and check # if result is False: if str(result) == 'False': logger.info('mirage_labelled_metrics :: anomalous_daily_peak is overrriding anomalous result for %s' % ( metric)) anomalous = False # Clear last item from the trigger history as anomalous_daily_peak # is a 3sigma method after all try: cleared_trigger_history = self.clear_trigger_history(metric) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: clear_trigger_history failed on %s - %s' % ( str(metric), err)) # @added 20230424 - Feature #4724: custom_algorithms - anomalous_daily_peak # Added expiry to record metrics identified as normal by anomalous_daily_peaks current_now = int(time()) current_aligned_ts = int(process_start_timestamp // 60 * 60) expire_at = current_now + anomalous_daily_peak_expiry redis_hash = 'mirage.normal_daily_peak_metrics.%s' % str(current_aligned_ts) try: self.redis_conn_decoded.hset(redis_hash, redis_metric_name, str(expire_at)) self.redis_conn_decoded.expire(redis_hash, str(anomalous_daily_peak_expiry * 2)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis hash %s - %s' % ( str(redis_metric_name), str(redis_hash), err)) # @added 20220504 - Feature #2580: illuminance # @modified 20230419 - Feature #2580: illuminance # Moved out of the if anomalous block. Record illuminance for all # Get the anomaly breakdown - who returned True? triggered_algorithms = [] for index, boolean_value in enumerate(ensemble): if boolean_value: # @modified 20200607 - Feature #3566: custom_algorithms # algorithm = settings.MIRAGE_ALGORITHMS[index] algorithm = algorithms_run[index] anomaly_breakdown[algorithm] += 1 triggered_algorithms.append(algorithm) if test_alert: triggered_algorithms = ['testing'] if triggered_algorithms: illuminance_dict = {} use_key = str(base_name) if is_labelled_metric: use_key = str(metric_id) illuminance_dict[use_key] = { 'timestamp': int(metric_timestamp), 'value': float(datapoint), 'triggered_algorithms_count': len(triggered_algorithms)} logger.info('mirage_labelled_metrics :: calling add_illuminance_entries with %s entries to add' % ( str(len(illuminance_dict)))) current_illuminance_dict = {} try: current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict) except Exception as err: logger.error('error :: mirage_labelled_metrics :: add_illuminance_entries failed - %s' % ( err)) logger.info('mirage_labelled_metrics :: illuminance Redis hash now has %s entries' % ( str(len(current_illuminance_dict)))) if not anomalous: not_anomalous_metric = [datapoint, base_name] # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.not_anomalous_metrics.append(not_anomalous_metric) redis_set = 'mirage_labelled_metrics.not_anomalous_metrics' data = str(not_anomalous_metric) try: self.redis_conn.sadd(redis_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # @added 20200904 - Feature #3734: waterfall alerts # Remove the metric from the waterfall_alerts Redis set # [metric, timestamp, value, added_to_waterfall_timestamp] # waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data] redis_set = 'analyzer.waterfall_alerts.sent_to_mirage' for waterfall_alert in analyzer_waterfall_alerts: if waterfall_alert[0] == base_name: if int(waterfall_alert[1]) == metric_timestamp: try: self.redis_conn.srem(redis_set, str(waterfall_alert)) logger.info('mirage_labelled_metrics :: removed waterfall alert item from Redis set %s - %s' % ( redis_set, str(waterfall_alert))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to remove waterfall alert item for %s at %s from Redis set %s' % ( base_name, str(metric_timestamp), redis_set)) # @added 20201128 - Feature #3734: waterfall alerts # If the check just done is new than an existing analyzer # waterfall alert metric timestamp remove those keys as well if int(waterfall_alert[1]) < metric_timestamp: try: self.redis_conn.srem(redis_set, str(waterfall_alert)) logger.info('mirage_labelled_metrics :: removed waterfall alert item with older timestamp from Redis set %s - %s' % ( redis_set, str(waterfall_alert))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to remove waterfall alert item for %s at %s from Redis set %s' % ( base_name, str(metric_timestamp), redis_set)) # @added 20210330 - Feature #3994: Panorama - mirage not anomalous # A hash is added to the mirage.panorama.not_anomalous_metrics for # every metric that is found to be not anomalous. This provides # data for /panorama?not_anomalous and /panorama?not_anomalous_metric # method which are used for plots in the webapp and json response. # The mirage.panorama.not_anomalous_metrics Redis hash is managed in # analyzer/metrics_manager not_anomalous_timestamp = None try: not_anomalous_timestamp = int(timeseries[-1][0]) except: not_anomalous_timestamp = int(metric_timestamp) redis_hash = 'mirage_labelled_metrics.panorama.not_anomalous_metrics' try: data = { base_name: { 'timestamp': not_anomalous_timestamp, 'value': datapoint, 'hours_to_resolve': int(hours_to_resolve), } } self.redis_conn.hset(redis_hash, time(), str(data)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis hash %s' % ( str(data), str(redis_hash))) logger.info('mirage_labelled_metrics :: not anomalous :: %s with %s (at full duration), %s (at SECOND_ORDER_RESOLUTION_HOURS)' % ( metric, value, str(datapoint))) # If it's anomalous, add it to list if anomalous: # @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion # base_name = metric.replace(settings.FULL_NAMESPACE, '', 1) if metric.startswith(settings.FULL_NAMESPACE): base_name = metric.replace(settings.FULL_NAMESPACE, '', 1) else: base_name = metric # metric_timestamp = int(timeseries[-1][0]) metric_timestamp = int_metric_timestamp # Get the anomaly breakdown - who returned True? # @modified 20230419 - Feature #2580: illuminance # Moved out of the if anomalous block. Determine # triggered_algorithms for all to record illuminance # triggered_algorithms = [] # for index, boolean_value in enumerate(ensemble): # if boolean_value: # # @modified 20200607 - Feature #3566: custom_algorithms # # algorithm = settings.MIRAGE_ALGORITHMS[index] # algorithm = algorithms_run[index] # anomaly_breakdown[algorithm] += 1 # triggered_algorithms.append(algorithm) # if test_alert: # triggered_algorithms = ['testing'] # @modified 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json # Branch #3068: SNAB # Added second_order_resolution_seconds, triggered_algorithms and algorithms_run # anomalous_metric = [datapoint, base_name, metric_timestamp] # @modified 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days # Added snab_algorithms_to_run snab_algorithms_to_run = [] if SNAB_ENABLED: if irregular_unstable_timeseries: snab_algorithms_to_run = ['irregular_unstable'] anomalous_metric = [datapoint, base_name, metric_timestamp, second_order_resolution_seconds, triggered_algorithms, algorithms_run, snab_algorithms_to_run] if not ionosphere_unique_metrics: try: ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics')) except: ionosphere_unique_metrics = [] # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.anomalous_metrics.append(anomalous_metric) if metric not in ionosphere_unique_metrics: redis_set = 'mirage.anomalous_metrics' data = str(anomalous_metric) try: self.redis_conn.sadd(redis_set, data) logger.info('mirage_labelled_metrics :: add %s to mirage.anomalous_metrics Redis set' % ( str(data))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to mirage.anomalous_metrics Redis set' % ( str(data))) redis_set = 'mirage_labelled_metrics.anomalous_metrics' data = str(anomalous_metric) try: self.redis_conn.sadd(redis_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to mirage.anomalous_metrics Redis set' % ( str(data))) # @added 20220504 - Feature #2580: illuminance # @modified 20230419 - Feature #2580: illuminance # Moved out of the if anomalous block to above. Determine # triggered_algorithms and record illuminance for all that # triggered # illuminance_dict = {} # use_key = str(base_name) # if is_labelled_metric: # use_key = str(metric_id) # illuminance_dict[use_key] = { # 'timestamp': int(metric_timestamp), # 'value': float(datapoint), # 'triggered_algorithms_count': len(triggered_algorithms)} # logger.info('mirage_labelled_metrics :: calling add_illuminance_entries with %s entries to add' % ( # str(len(illuminance_dict)))) # current_illuminance_dict = {} # try: # current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict) # except Exception as err: # logger.error('error :: mirage_labelled_metrics :: add_illuminance_entries failed - %s' % ( # err)) # logger.info('mirage_labelled_metrics :: illuminance Redis hash now has %s entries' % ( # str(len(current_illuminance_dict)))) logger.info('mirage_labelled_metrics :: anomaly detected :: %s with %s (at SECOND_ORDER_RESOLUTION_HOURS), %s (at FULL_DURATION)' % ( metric, str(datapoint), str(value))) # It runs so fast, this allows us to process 30 anomalies/min # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # Removed limit # sleep(2) # @added 20170206 - Bug #1904: Handle non filesystem friendly metric names in check files sane_metricname = filesafe_metricname(str(base_name)) # @added 20200425 - Feature #3508: ionosphere.untrainable_metrics # Determine if any metrcs have negatives values some they can be # added to the ionosphere.untrainable_metrics Redis set if run_negatives_present and negatives_found: redis_set = 'ionosphere.untrainable_metrics' try: last_negative_timestamp = int(negatives_found[-1][0]) last_negative_value = negatives_found[-1][1] remove_after_timestamp = int(last_negative_timestamp + second_order_resolution_seconds) data = str([base_name, metric_timestamp, datapoint, last_negative_timestamp, last_negative_value, second_order_resolution_seconds, remove_after_timestamp]) self.redis_conn.sadd(redis_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # If Crucible or Panorama are enabled determine details determine_anomaly_details = False if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED: determine_anomaly_details = True if settings.PANORAMA_ENABLED: determine_anomaly_details = True # If Ionosphere is enabled determine details try: ionosphere_enabled = settings.IONOSPHERE_ENABLED if settings.IONOSPHERE_ENABLED: determine_anomaly_details = True except: ionosphere_enabled = False if determine_anomaly_details: # metric_timestamp = str(int(timeseries[-1][0])) from_timestamp = str(int(timeseries[1][0])) timeseries_dir = base_name.replace('.', '/') cache_key = 'mirage.last_alert.smtp.%s' % (base_name) last_alert = False try: # @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value # Feature #3486: analyzer_batch # Feature #3480: batch_processing # Changed the last_alert cache key to hold the last # anomaly timestamp # last_alert = self.redis_conn.get(cache_key) last_alert = self.redis_conn_decoded.get(cache_key) except Exception as e: logger.error('error :: mirage_labelled_metrics :: could not query Redis for cache_key: %s' % str(e)) # @added 20170308 - Feature #1960: ionosphere_layers # Allow Ionosphere to send Panorama checks, it is an ionosphere_metric if not ionosphere_unique_metrics: try: # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics')) ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics')) except: ionosphere_unique_metrics = [] added_at = str(int(time())) # If Panorama is enabled - create a Panorama check # @modified 20170308 - Feature #1960: ionosphere_layers # Allow Ionosphere to send Panorama checks for ionosphere_metrics # if settings.PANORAMA_ENABLED: send_to_panorama = False redis_metric_name = '%s%s' % (str(settings.FULL_NAMESPACE), str(base_name)) # TODO - testing if is_labelled_metric: redis_metric_name = 'labelled_metrics.%s' % str(metric_id) if settings.PANORAMA_ENABLED: send_to_panorama = True if redis_metric_name in ionosphere_unique_metrics: send_to_panorama = False # @added 20220315 - Feature #4482: Test alerts # Allow for full testing with the injection of an anomaly on a # metric if test_alert or test_alert_and_trigger: logger.info('mirage_labelled_metrics :: test_alert sending triggered anomaly on %s to Panorama' % ( metric)) send_to_panorama = True # Panorama must have at least one triggered algorithm original_triggered_algorithms = list(triggered_algorithms) if len(triggered_algorithms) == 0: triggered_algorithms = [algorithms_run[0]] if send_to_panorama: if not os.path.exists(settings.PANORAMA_CHECK_PATH): mkdir_p(settings.PANORAMA_CHECK_PATH) # Note: # The values are enclosed is single quoted intentionally # as the imp.load_source used results in a shift in the # decimal position when double quoted, e.g. # value = "5622.0" gets imported as # 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2 # single quoting results in the desired, # 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0 source = 'graphite' if base_name.startswith('labelled_metrics.'): source = 'victoriametrics' panaroma_anomaly_data = 'metric = \'%s\'\n' \ 'value = \'%s\'\n' \ 'from_timestamp = \'%s\'\n' \ 'metric_timestamp = \'%s\'\n' \ 'algorithms = %s\n' \ 'triggered_algorithms = %s\n' \ 'app = \'%s\'\n' \ 'source = \'%s\'\n' \ 'added_by = \'%s\'\n' \ 'added_at = \'%s\'\n' \ % (base_name, str(datapoint), from_timestamp, # @modified 20200607 - Feature #3566: custom_algorithms # str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS), str(int_metric_timestamp), str(algorithms_run), triggered_algorithms, skyline_app, source, this_host, added_at) # Create an anomaly file with details about the anomaly panaroma_anomaly_file = '%s/%s.%s.txt' % ( settings.PANORAMA_CHECK_PATH, added_at, sane_metricname) try: write_data_to_file( skyline_app, panaroma_anomaly_file, 'w', panaroma_anomaly_data) logger.info('mirage_labelled_metrics :: added panorama anomaly file :: %s' % (panaroma_anomaly_file)) # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Move to Redis set block below # self.sent_to_panorama.append(base_name) except: logger.error('error :: mirage_labelled_metrics :: failed to add panorama anomaly file :: %s' % (panaroma_anomaly_file)) logger.error(traceback.format_exc()) # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Moved from the above self.sent_to_panorama redis_set = 'mirage_labelled_metrics.sent_to_panorama' data = str(base_name) try: self.redis_conn.sadd(redis_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # @added 20210323 - Feature #3642: Anomaly type classification if LUMINOSITY_CLASSIFY_ANOMALIES: redis_set = 'luminosity.classify_anomalies' data_dict = { 'metric': metric, 'timestamp': int_metric_timestamp, 'value': datapoint, 'algorithms': algorithms_run, 'triggered_algorithms': triggered_algorithms, 'app': skyline_app, 'added_at': int(added_at), } data = [metric, int_metric_timestamp, int(added_at), data_dict] try: self.redis_conn.sadd(redis_set, str(data)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # @added 20220315 - Feature #4482: Test alerts # Allow for full testing with the injection of an anomaly on a # metric if test_alert or test_alert_and_trigger: triggered_algorithms = list(original_triggered_algorithms) # @added 20200904 - Feature #3734: waterfall alerts # Remove the metric from the waterfall_alerts Redis set # [metric, timestamp, value, added_to_waterfall_timestamp] # waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data] redis_set = 'analyzer.waterfall_alerts.sent_to_mirage' for waterfall_alert in analyzer_waterfall_alerts: if waterfall_alert[0] == base_name: if int(waterfall_alert[1]) == metric_timestamp: try: self.redis_conn.srem(redis_set, str(waterfall_alert)) logger.info('mirage_labelled_metrics :: removed waterfall alert item from Redis set %s - %s' % ( redis_set, str(waterfall_alert))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to remove waterfall alert item for %s at %s from Redis set %s' % ( base_name, str(metric_timestamp), redis_set)) # If crucible is enabled - save timeseries and create a # crucible check if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED: from_timestamp = str(int(timeseries[1][0])) timeseries_dir = base_name.replace('.', '/') crucible_anomaly_dir = str(settings.CRUCIBLE_DATA_FOLDER) + '/' + timeseries_dir + '/' + metric_timestamp if not os.path.exists(crucible_anomaly_dir): mkdir_p(crucible_anomaly_dir) # Note: # The value is enclosed is single quoted intentionally # as the imp.load_source used in crucible results in a # shift in the decimal position when double quoted, e.g. # value = "5622.0" gets imported as # 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2 # single quoting results in the desired, # 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0 crucible_anomaly_data = 'metric = \'%s\'\n' \ 'value = \'%s\'\n' \ 'from_timestamp = \'%s\'\n' \ 'metric_timestamp = \'%s\'\n' \ 'algorithms = %s\n' \ 'triggered_algorithms = %s\n' \ 'anomaly_dir = \'%s\'\n' \ 'graphite_metric = True\n' \ 'run_crucible_tests = False\n' \ 'added_by = \'%s\'\n' \ 'added_at = \'%s\'\n' \ % (base_name, str(datapoint), from_timestamp, # @modified 20200607 - Feature #3566: custom_algorithms # str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS), str(int_metric_timestamp), str(algorithms_run), triggered_algorithms, crucible_anomaly_dir, skyline_app, added_at) # Create an anomaly file with details about the anomaly crucible_anomaly_file = '%s/%s.txt' % (crucible_anomaly_dir, sane_metricname) try: write_data_to_file( skyline_app, crucible_anomaly_file, 'w', crucible_anomaly_data) logger.info('mirage_labelled_metrics :: added crucible anomaly file :: %s' % (crucible_anomaly_file)) # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.sent_to_crucible.append(base_name) except: logger.error('error :: mirage_labelled_metrics :: failed to add crucible anomaly file :: %s' % (crucible_anomaly_file)) logger.error(traceback.format_exc()) # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Moved from the above self.sent_to_crucible redis_set = 'mirage.sent_to_crucible' data = str(base_name) try: self.redis_conn.sadd(redis_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # Create timeseries json file with the timeseries json_file = '%s/%s.json' % (crucible_anomaly_dir, base_name) timeseries_json = str(timeseries).replace('[', '(').replace(']', ')') try: write_data_to_file(skyline_app, json_file, 'w', timeseries_json) logger.info('mirage_labelled_metrics :: added crucible timeseries file :: %s' % (json_file)) except: logger.error('error :: mirage_labelled_metrics :: failed to add crucible timeseries file :: %s' % (json_file)) logger.error(traceback.format_exc()) # Create a crucible check file crucible_check_file = '%s/%s.%s.txt' % (settings.CRUCIBLE_CHECK_PATH, metric_timestamp, sane_metricname) try: write_data_to_file( skyline_app, crucible_check_file, 'w', crucible_anomaly_data) logger.info('mirage_labelled_metrics :: added crucible check :: %s,%s' % (base_name, metric_timestamp)) except: logger.error('error :: mirage_labelled_metrics :: failed to add crucible check file :: %s' % (crucible_check_file)) logger.error(traceback.format_exc()) # @added 20230510 - Feature #4902: Prevent training on metrics newer than 7 days new_metric_added_at = False if ionosphere_enabled and not last_alert: try: new_metric_added_at = self.redis_conn_decoded.hget('metrics_manager.untrainable_new_metrics', labelled_metric_base_name) except Exception as err: logger.error('error :: failed to hget from metrics_manager.untrainable_new_metrics - %s' % ( err)) if new_metric_added_at: try: new_until = int(float(new_metric_added_at)) + (86400 * 7) new_until_date = datetime.datetime.fromtimestamp(new_until).strftime('%Y-%m-%d %H:%M:%S') logger.info('not sending %s to Ionosphere as still a new metric until %s' % ( labelled_metric_base_name, new_until_date)) except Exception as err: logger.error('error :: failed to determine when %s matures - %s' % ( labelled_metric_base_name, err)) ionosphere_enabled = False # @added 20160922 - Branch #922: Ionosphere # Also added the send_anomalous_metric_to skyline_functions.py # function if ionosphere_enabled: if not last_alert: # @modified 20161228 Feature #1830: Ionosphere alerts # Added full_duration which needs to be recorded to allow Mirage metrics # to be profiled on Redis timeseries data at FULL_DURATION # e.g. mirage.redis.24h.json full_duration = str(second_order_resolution_seconds) # @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Added ionosphere_parent_id, always zero from Analyzer and Mirage ionosphere_parent_id = 0 send_anomalous_metric_to( skyline_app, 'ionosphere', timeseries_dir, str(int_metric_timestamp), base_name, str(datapoint), from_timestamp, triggered_algorithms, timeseries, full_duration, str(ionosphere_parent_id), # @added 20201001 - Task #3748: POC SNAB # Added algorithms_run required to determine the anomalyScore # so this needs to be sent to Ionosphere so Ionosphere # can send it back on an alert algorithms_run) # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Moved to the mirage.sent_to_ionosphere Redis set Redis set # block below # self.sent_to_ionosphere.append(base_name) # @added 20200804 - Feature #3462: Add IONOSPHERE_MANAGE_PURGE # Feature #3472: ionosphere.training_data Redis set # Feature #3474: webapp api - training_data # Add training data to the ionosphere.training_data so that # the ionosphere purge_old_data_dirs can happen less # frequently for reduced I/O redis_set = 'ionosphere.training_data' data = [base_name, int(int_metric_timestamp), second_order_resolution_seconds] try: logger.info('mirage_labelled_metrics :: adding to Redis set %s - %s' % ( redis_set, str(data))) self.redis_conn.sadd(redis_set, str(data)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to %s Redis set' % (str(data), redis_set)) else: logger.info('mirage_labelled_metrics :: alert expiry key exists not sending to Ionosphere :: %s' % base_name) # @added 20200904 - Feature #3734: waterfall alerts # Remove the metric from the waterfall_alerts Redis set # [metric, timestamp, value, added_to_waterfall_timestamp] # waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data] # Do not remove if this is only for training_data creation if redis_metric_name in ionosphere_unique_metrics: redis_set = 'analyzer.waterfall_alerts.sent_to_mirage' mirage_waterfall_data = [] for waterfall_alert in analyzer_waterfall_alerts: if waterfall_alert[0] == base_name: if int(waterfall_alert[1]) == metric_timestamp: mirage_waterfall_data = waterfall_alert try: self.redis_conn.srem(redis_set, str(waterfall_alert)) logger.info('mirage_labelled_metrics :: removed waterfall alert item from Redis set %s - %s' % ( redis_set, str(waterfall_alert))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to remove waterfall alert item for %s at %s from Redis set %s' % ( base_name, str(metric_timestamp), redis_set)) # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Moved from the above self.sent_to_ionosphere if not last_alert: redis_set = 'mirage_labelled_metrics.sent_to_ionosphere' data = str(base_name) try: self.redis_conn.sadd(redis_set, data) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) # @added 20220315 - Feature #4482: Test alerts # Allow for full testing with the injection of an anomaly on a # metric if test_alert or test_alert_and_trigger: logger.info('mirage_labelled_metrics :: test_alert not sending anomaly on %s to ionosphere' % ( metric)) ionosphere_unique_metrics = [] # @added 20200904 - Feature #3734: waterfall alerts # Add mirage waterfall alert # Only add if this is an ionosphere_enabled metric_check_file if redis_metric_name in ionosphere_unique_metrics: if mirage_waterfall_data: # waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data] waterfall_data = mirage_waterfall_data redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere' try: self.redis_conn.sadd(redis_set, str(waterfall_data)) logger.info('mirage_labelled_metrics :: added to Redis set %s - %s' % (redis_set, str(waterfall_data))) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(waterfall_data), str(redis_set))) metric_var_files = [] timeseries = [] if os.path.isfile(metric_check_file): # Remove metric check file try: os.remove(metric_check_file) logger.info('mirage_labelled_metrics :: removed check file - %s' % metric_check_file) except OSError: logger.error('error :: mirage_labelled_metrics :: failed to remove check file - %s' % metric_check_file) # Remove the metric directory if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('mirage_labelled_metrics :: removed data dir - %s' % metric_data_dir) except: logger.error('error :: mirage_labelled_metrics :: failed to rmtree %s' % metric_data_dir) # Add values to the queue so the parent process can collate try: for key, ab_value in anomaly_breakdown.items(): self.mirage_labelled_metrics_anomaly_breakdown_q.put((key, ab_value)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed iterate to self.mirage_labelled_metrics_anomaly_breakdown_q - %s' % err) try: for key, e_value in exceptions.items(): self.mirage_labelled_metrics_exceptions_q.put((key, e_value)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed iterate to self.mirage_labelled_metrics_exceptions_q - %s' % err)
[docs] def run(self): """ Called when the process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('mirage_labelled_metrics :: starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('mirage_labelled_metrics :: log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) else: logger.info('mirage_labelled_metrics :: bin/%s.d log management done' % skyline_app) if not os.path.exists(MIRAGE_LABELLED_CHECK_PATH): mkdir_p(MIRAGE_LABELLED_CHECK_PATH) # @added 20200903 - Task #3730: Validate Mirage running multiple processes last_sent_to_graphite = int(time()) # @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts filesafe_names_dict = {} last_redis_self_key_update = 0 while 1: now = time() # Make sure Redis is up try: self.redis_conn.ping() except: logger.error('error :: mirage_labelled_metrics :: skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(10) logger.info('mirage_labelled_metrics :: attempting to connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) try: self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to connect to Redis - %s' % err) try: self.redis_conn.ping() logger.info('mirage_labelled_metrics :: connected to redis') except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to ping Redis - %s' % err) # Determine if any metric to analyze or Ionosphere alerts to be sent while True: now = time() # Report app up update_redis_self_key = False if not last_redis_self_key_update: update_redis_self_key = True if last_redis_self_key_update and now >= (last_redis_self_key_update + 20): update_redis_self_key = True if update_redis_self_key: try: # redis_is_up = self.redis_conn.setex(skyline_app, 120, now) redis_is_up = self.redis_conn.setex('mirage_labelled_metrics', 120, now) last_redis_self_key_update = int(now) if redis_is_up: try: self.redis_conn.setex('redis', 120, now) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: could not update the Redis redis key - %s' % ( err)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to update Redis key for mirage_labelled_metrics up - %s' % err) # @added 20161228 - Feature #1828: ionosphere - mirage Redis data features # If Ionosphere is going to pass alerts back to the app # here we are going to have break out and force a alerting # only run. ionosphere_alerts_returned = False # @added 20220315 - Feature #4482: Test alerts # Allow for full testing with the injection of an anomaly on a # metric test_alerts = {} test_alert_metrics = [] metric_var_files = [] ionosphere_alerts_returned = False # @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback # Feature #2484: FULL_DURATION feature profiles # Move this len(metric_var_files) from above and apply the # appropriatte sleep if len(metric_var_files) == 0: sleep_for = 10 next_send_to_graphite = last_sent_to_graphite + 60 seconds_to_next_send_to_graphite = next_send_to_graphite - int(time()) if seconds_to_next_send_to_graphite < 10: if seconds_to_next_send_to_graphite > 1: sleep_for = seconds_to_next_send_to_graphite else: break logger.info('mirage_labelled_metrics :: sleeping no metrics...') # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # sleep(10) sleep(sleep_for) batch_processing_metrics = [] # Clean up old files now_timestamp = time() stale_age = now_timestamp - settings.MIRAGE_STALE_SECONDS for current_file in os.listdir(MIRAGE_LABELLED_CHECK_PATH): if os.path.isfile(MIRAGE_LABELLED_CHECK_PATH + "/" + current_file): t = os.stat(MIRAGE_LABELLED_CHECK_PATH + "/" + current_file) c = t.st_ctime # @added 20220113 - Feature #3486: analyzer_batch # Feature #3480: batch_processing # Do not remove batch_processing checks for b_metric in batch_processing_metrics: if b_metric in current_file: continue # delete file if older than a week if c < stale_age: os.remove(MIRAGE_LABELLED_CHECK_PATH + "/" + current_file) logger.info('mirage_labelled_metrics :: removed stale check - %s' % (current_file)) # @added 20200903 - Task #3730: Validate Mirage running multiple processes redis_set = 'mirage_labelled_metrics.stale_check_discarded' try: self.redis_conn.sadd(redis_set, str(current_file)) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to add %s to Redis set %s' % ( str(current_file), str(redis_set))) # @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts # Handle filesafe names filesafe_names_list = [] if filesafe_names_dict: filesafe_names_list = list(filesafe_names_dict.keys()) # Discover metric to analyze metric_var_files = [] labelled_metrics_to_check_dict = {} try: labelled_metrics_to_check_dict = self.redis_conn_decoded.hgetall('analyzer_labelled_metrics.mirage_check') except Exception as err: logger.error('failed to cleanup mirage_algorithm.error files - %s' % (traceback.format_exc())) if labelled_metrics_to_check_dict: for key in list(labelled_metrics_to_check_dict.keys()): check_key = 'analyzer_labelled_metrics.mirage_check.%s' % key metric_var_files.append(check_key) if len(metric_var_files) > 0: break process_metric_check_files = False metric_var_files_sorted = sorted(metric_var_files) # metric_check_file = settings.MIRAGE_CHECK_PATH + "/" + metric_var_files_sorted[0] if metric_var_files_sorted: process_metric_check_files = True # @added 20221014 - Bug #4696: analyzer - anomalous metrics sets not flushing # Task #4614: Support labelled metrics # Set the default dicts before the if as it was not being set inside # at times causing the log entries to hang. exceptions = {} anomaly_breakdown = {} if process_metric_check_files: # @added 20200903 - Task #3730: Validate Mirage running multiple processes check_files_to_process = len(metric_var_files_sorted) logger.info('mirage_labelled_metrics :: %s checks to process' % str(check_files_to_process)) # Remove any existing algorithm.error files from any previous runs # that did not cleanup for any reason pattern = '%s.*.algorithm.error' % skyline_app try: for f in os.listdir(settings.SKYLINE_TMP_DIR): if re.search(pattern, f): try: os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f)) logger.info('mirage_labelled_metrics :: cleaning up old error file - %s' % (str(f))) except OSError: pass except: logger.error('failed to cleanup mirage_algorithm.error files - %s' % (traceback.format_exc())) # Spawn processes pids = [] spawned_pids = [] pid_count = 0 # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # MIRAGE_PROCESSES = 1 if len(metric_var_files) > 1: try: MIRAGE_PROCESSES = int(settings.MIRAGE_PROCESSES) if len(metric_var_files) < MIRAGE_PROCESSES: MIRAGE_PROCESSES = len(metric_var_files) except: MIRAGE_PROCESSES = 1 else: MIRAGE_PROCESSES = 1 # Was testing just 1 process # MIRAGE_PROCESSES = 1 # @modified 20230120 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # Removed this limitation because mirage_labelled_metrics can be # assigned 100s of checks and with the change to stumpy analysis # with skyline_matrixprofile has increased the run time. # With matrixprofile being run with skyline_matrixprofile via # run_custom_algorithm_on_timeseries was achieving around 91 # checks in 41 seconds, now running skyline_matrixproile direct # with "stumpy-mp.stump" is taking around 65 seconds to do 88 # checks. Do not limit to 1 process unless there are under 10 # checks. The reason being that initialisation of stumpy.stump # even with jit caching takes between 1 and 3 seconds, # thereafter any further metrics analysed with stump in the run # take between 0.02 and 0.6 seconds (no init required), this is # more on the 0.6 second side when busy. Therefore if there are # less than 6 checks (ballpark figure, depends on load) it is # more efficient and quicker to just use 1 process rather than # more. # MIRAGE_PROCESSES = 1 if len(metric_var_files) <= 6: MIRAGE_PROCESSES = 1 run_timestamp = int(time()) for i in range(1, MIRAGE_PROCESSES + 1): checks_per_processor = int(ceil(float(len(metric_var_files_sorted)) / float(MIRAGE_PROCESSES))) if i == MIRAGE_PROCESSES: assigned_max = len(metric_var_files_sorted) else: assigned_max = min(len(metric_var_files_sorted), i * checks_per_processor) assigned_min = (i - 1) * checks_per_processor assigned_keys = range(assigned_min, assigned_max) # Compile assigned metrics assigned_checks = [metric_var_files_sorted[index] for index in assigned_keys] logger.info('mirage_labelled_metrics :: processing %s checks' % str(len(assigned_checks))) p = Process(target=self.spin_process, args=(i, run_timestamp, assigned_checks)) pids.append(p) pid_count += 1 logger.info('mirage_labelled_metrics :: starting %s of %s spin_process/es' % (str(pid_count), str(MIRAGE_PROCESSES))) p.start() # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # spawned_pids.append(p.pid) spawned_pids.append([p.pid, i]) logger.info('mirage_labelled_metrics :: started spin_process %s with pid %s' % (str(pid_count), str(p.pid))) # Self monitor processes and terminate if any spin_process has run # for longer than 180 seconds - 20160512 @earthgecko p_starts = time() while time() - p_starts <= settings.MAX_ANALYZER_PROCESS_RUNTIME: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('mirage_labelled_metrics :: %s :: %s spin_process/es completed in %.2f seconds' % ( skyline_app, str(MIRAGE_PROCESSES), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('mirage_labelled_metrics :: %s :: timed out, killing all spin_process processes' % (skyline_app)) for p in pids: try: # @modified 20230410 - moved p.join before p.terminate p.join() p.terminate() # @modified 20221125 - added join back # p.join() except Exception as err: logger.error('error :: mirage_labelled_metrics :: %s :: error terminating pid - %s' % ( skyline_app, err)) for p in pids: try: if p.is_alive(): logger.info('mirage_labelled_metrics :: %s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive()))) p.join() except Exception as err: logger.error('error :: mirage_labelled_metrics :: %s :: error joining pid - %s' % ( skyline_app, err)) # @added 20200607 - Feature #3508: ionosphere.untrainable_metrics # Check to non 3sigma algorithm errors too check_algorithm_errors = ['negatives_present'] for algorithm in list(settings.MIRAGE_ALGORITHMS): check_algorithm_errors.append(algorithm) # @added 20200607 - Feature #3566: custom_algorithms if CUSTOM_ALGORITHMS: for custom_algorithm in settings.CUSTOM_ALGORITHMS: check_algorithm_errors.append(custom_algorithm) # Grab data from the queue and populate dictionaries exceptions = {} anomaly_breakdown = {} while 1: try: key, value = self.mirage_labelled_metrics_anomaly_breakdown_q.get_nowait() if key not in list(anomaly_breakdown.keys()): anomaly_breakdown[key] = value else: anomaly_breakdown[key] += value except Empty: # @added 20191113 - Branch #3262: py3 # Log logger.info('mirage_labelled_metrics :: anomaly_breakdown.keys are empty') break except Exception as err: logger.error('error :: mirage_labelled_metrics :: grabbing data from the queue and populating anomaly_breakdown - %s' % err) break while 1: try: key, value = self.mirage_labelled_metrics_exceptions_q.get_nowait() if key not in list(exceptions.keys()): exceptions[key] = value else: exceptions[key] += value except Empty: # @added 20191113 - Branch #3262: py3 # Log logger.info('mirage_labelled_metrics :: exceptions.keys are empty') break except Exception as err: logger.error('error :: mirage_labelled_metrics :: grabbing data from the queue and populating exceptions - %s' % err) break # @added 20191021 - Bug #3288: Always send anomaly_breakdown and exception metrics # Branch #3262: py3 exceptions_metrics = ['Boring', 'Stale', 'TooShort', 'Other'] for i_exception in exceptions_metrics: if i_exception not in list(exceptions.keys()): exceptions[i_exception] = 0 # @modified 20200607 - Feature #3566: custom_algorithms # for i_anomaly_breakdown in settings.MIRAGE_ALGORITHMS: for i_anomaly_breakdown in check_algorithm_errors: if i_anomaly_breakdown not in list(anomaly_breakdown.keys()): anomaly_breakdown[i_anomaly_breakdown] = 0 # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # for completed_pid in spawned_pids: for completed_pid, mirage_process in spawned_pids: logger.info('mirage_labelled_metrics :: spin_process with pid %s completed' % (str(completed_pid))) # @modified 20200607 - Feature #3566: custom_algorithms # Feature #3508: ionosphere.untrainable_metrics # Check to non 3sigma algorithm errors too and wrapped in try try: # for algorithm in settings.MIRAGE_ALGORITHMS: for algorithm in check_algorithm_errors: algorithm_error_file = '%s/%s.%s.%s.algorithm.error' % ( settings.SKYLINE_TMP_DIR, skyline_app, str(completed_pid), algorithm) if os.path.isfile(algorithm_error_file): logger.info( 'error :: spin_process with pid %s has reported an error with the %s algorithm' % ( str(completed_pid), algorithm)) try: with open(algorithm_error_file, 'r') as f: error_string = f.read() logger.error('%s' % str(error_string)) except: logger.error('error :: mirage_labelled_metrics :: failed to read %s error file' % algorithm) try: os.remove(algorithm_error_file) except OSError: pass except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to check algorithm errors') redis_metrics_processed_key = 'mirage_labelled_metrics.%s.metrics_processed' % str(mirage_process) redis_metrics_processed = {} try: redis_metrics_processed = self.redis_conn_decoded.hgetall(redis_metrics_processed_key) # if redis_metrics_processed: # self.redis_conn_decoded.delete(redis_metrics_processed_key) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: %s Redis hash operation failed - %s' % (redis_metrics_processed_key, err)) # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Use Redis set and not self.metric_variables metric_variables = [] # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # literal_metric_variables = list(self.redis_conn.smembers('mirage.metric_variables')) # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # Handle per process # literal_metric_variables = list(self.redis_conn_decoded.smembers('mirage.metric_variables')) metric_variable_redis_set = 'mirage_labelled_metrics.%s.metric_variables' % str(mirage_process) try: literal_metric_variables = list(self.redis_conn_decoded.smembers(metric_variable_redis_set)) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: smembers failed on Redis %s - %s' % ( metric_variable_redis_set, err)) literal_metric_variables = [] for item_list_string in literal_metric_variables: list_item = literal_eval(item_list_string) metric_variables.append(list_item) # @added 20200903 - Task #3730: Validate Mirage running multiple processes # Handle per process try: self.redis_conn.delete(metric_variable_redis_set) # logger.info('mirage_labelled_metrics :: deleted Redis set - %s' % metric_variable_redis_set) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to delete Redis set - %s' % metric_variable_redis_set) # @added 20191113 - Branch #3262: py3 # Set default values metric_name = None metric_value = None hours_to_resolve = 0 # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # for metric_variable in self.metric_variables: for metric_variable in metric_variables: if metric_variable[0] == 'metric_name': metric_name = metric_variable[1] if metric_variable[0] == 'metric_value': metric_value = metric_variable[1] if metric_variable[0] == 'hours_to_resolve': hours_to_resolve = metric_variable[1] # if metric_variable[0] == 'metric_timestamp': # metric_timestamp = metric_variable[1] # logger.info('mirage_labelled_metrics :: analysis done - %s' % str(metric_name)) logger.info('mirage_labelled_metrics :: process %s checked %s metrics' % ( str(mirage_process), str(len(redis_metrics_processed)))) # Send alerts # Calculate hours second order resolution to seconds # @modified 20191113 - Branch #3262: py3 # Only if set if hours_to_resolve: logger.info('mirage_labelled_metrics :: analyzed at %s hours resolution' % hours_to_resolve) second_order_resolution_seconds = int(hours_to_resolve) * 3600 logger.info('mirage_labelled_metrics :: analyzed at %s seconds resolution' % str(second_order_resolution_seconds)) # Remove metric check files for check_item in list(redis_metrics_processed.keys()): metric_check_file = 'None' try: metric_data = literal_eval(redis_metrics_processed[check_item]) metric_id = metric_data['metric_id'] metric_name = 'labelled_metrics.%s' % str(metric_id) processing_check_file = '%s.txt' % str(metric_name) metric_check_file = '%s/%s' % (MIRAGE_LABELLED_CHECK_PATH, processing_check_file) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to interpolate metric_check_file - %s' % err) if os.path.isfile(metric_check_file): try: os.remove(metric_check_file) logger.info('mirage_labelled_metrics :: removed check file - %s' % metric_check_file) except OSError: pass except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to remove metric_check_file - %s' % metric_check_file) # Remove the metric directory # @modified 20191113 - Branch #3262: py3 # Convert None to str # timeseries_dir = metric_name.replace('.', '/') metric_data_dir = 'None' try: metric_name_str = str(metric_name) timeseries_dir = metric_name_str.replace('.', '/') metric_data_dir = '%s/%s' % (MIRAGE_LABELLED_CHECK_PATH, timeseries_dir) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to interpolate metric_data_dir') metric_data_dir = 'None' if os.path.exists(metric_data_dir): try: rmtree(metric_data_dir) logger.info('mirage_labelled_metrics :: removed - %s' % metric_data_dir) except: logger.error('error :: mirage_labelled_metrics :: failed to rmtree %s' % metric_data_dir) # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage mirage_anomalous_metrics = [] try: # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # literal_mirage_anomalous_metrics = list(self.redis_conn.smembers('mirage.anomalous_metrics')) literal_mirage_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage_labelled_metrics.anomalous_metrics')) for metric_list_string in literal_mirage_anomalous_metrics: metric = literal_eval(metric_list_string) mirage_anomalous_metrics.append(metric) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to determine list from mirage_labelled_metrics.anomalous_metrics Redis set') mirage_anomalous_metrics = [] # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage mirage_not_anomalous_metrics = [] try: # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # literal_mirage_not_anomalous_metrics = list(self.redis_conn.smembers('mirage.not_anomalous_metrics')) literal_mirage_not_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage_labelled_metrics.not_anomalous_metrics')) for metric_list_string in literal_mirage_not_anomalous_metrics: metric = literal_eval(metric_list_string) mirage_not_anomalous_metrics.append(metric) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to determine list from mirage_labelled_metrics.not_anomalous_metrics Redis set') mirage_not_anomalous_metrics = [] # Log progress # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # if len(self.anomalous_metrics) > 0: if len(mirage_anomalous_metrics) > 0: logger.info('mirage_labelled_metrics :: seconds since last anomaly :: %.2f' % (time() - now)) # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # logger.info('mirage_labelled_metrics :: total anomalies :: %d' % len(self.anomalous_metrics)) logger.info('mirage_labelled_metrics :: total anomalies :: %d' % len(mirage_anomalous_metrics)) logger.info('mirage_labelled_metrics :: exception stats :: %s' % str(exceptions)) logger.info('mirage_labelled_metrics :: anomaly breakdown :: %s' % str(anomaly_breakdown)) # Log to Graphite if process_metric_check_files: n_time = time() run_time = n_time - run_timestamp logger.info('mirage_labelled_metrics :: process took %.2f seconds to run' % run_time) # graphite_run_time = '%.2f' % run_time # send_metric_name = skyline_app_graphite_namespace + '.run_time' # send_graphite_metric(self, skyline_app, send_metric_name, graphite_run_time) try: self.redis_conn_decoded.hset('mirage_labelled_metrics.run_times', str(n_time), str(run_time)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to add run_time to mirage_labelled_metrics.run_times Redis hash - %s' % err) # @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Use Redis sets instead of Manager().list() delete_redis_sets = [ 'mirage_labelled_metrics.anomalous_metrics', 'mirage_labelled_metrics.not_anomalous_metrics', 'mirage_labelled_metrics.metric_variables', # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # Handle once per minute # 'mirage.sent_to_crucible', # 'mirage.sent_to_panorama', # 'mirage.sent_to_ionosphere', ] for i_redis_set in delete_redis_sets: redis_set_to_delete = i_redis_set try: self.redis_conn.delete(redis_set_to_delete) logger.info('mirage_labelled_metrics :: deleted Redis set - %s' % redis_set_to_delete) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to delete Redis set - %s' % redis_set_to_delete) # @added 20200903 - Task #3730: Validate Mirage running multiple processes # Send checks.stale_discarded and checks.pending metrics if int(time()) >= (last_sent_to_graphite + 60): stale_check_discarded = [] try: stale_check_discarded = list(self.redis_conn_decoded.smembers('mirage_labelled_metrics.stale_check_discarded')) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to get mirage_labelled_metrics.stale_check_discarded set from Redis') stale_check_discarded = [] stale_check_discarded_count = len(stale_check_discarded) logger.info('mirage_labelled_metrics :: checks.stale_discarded :: %s' % str(stale_check_discarded_count)) send_metric_name = '%s.checks.stale_discarded' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, str(stale_check_discarded_count)) checks_pending = [f_pending for f_pending in os.listdir(MIRAGE_LABELLED_CHECK_PATH) if os.path.isfile(os.path.join(MIRAGE_LABELLED_CHECK_PATH, f_pending))] checks_pending_count = len(checks_pending) logger.info('mirage_labelled_metrics :: checks.pending :: %s' % str(checks_pending_count)) send_metric_name = '%s.checks.pending' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, str(checks_pending_count)) run_times = [] try: run_times_dict = self.redis_conn_decoded.hgetall('mirage_labelled_metrics.run_times') self.redis_conn_decoded.delete('mirage_labelled_metrics.run_times') if run_times_dict: for n_time, run_time_str in run_times_dict.items(): run_times.append(float(run_time_str)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to determine run_times from mirage_labelled_metrics.run_times Redis hash - %s' % err) if run_times: run_time = sum(run_times) else: run_time = 0 logger.info('mirage_labelled_metrics :: seconds to run :: %.2f' % run_time) graphite_run_time = '%.2f' % run_time send_metric_name = skyline_app_graphite_namespace + '.run_time' send_graphite_metric(self, skyline_app, send_metric_name, graphite_run_time) try: self.redis_conn_decoded.hset('mirage_labelled_metrics.run_time', 'timestamp', str(time())) self.redis_conn_decoded.hset('mirage_labelled_metrics.run_time', 'value', str(run_time)) except Exception as err: logger.error('error :: mirage_labelled_metrics :: failed to add keys to mirage_labelled_metrics.run_time Redis hash - %s' % err) # @modified 20210309 - Task #3730: Validate Mirage running multiple processes # Reimplement mirage.checks.done count as increment key # checks_done = [] # try: # checks_done = list(self.redis_conn_decoded.smembers('mirage.checks.done')) checks_done = 0 try: # @modified 20230205 - Task #4844: Replace Redis getset with set with get # As of Redis version 6.2.0, this command is regarded as deprecated. # It can be replaced by SET with the GET argument when migrating or writing new code. # checks_done_str = self.redis_conn_decoded.getset('mirage_labelled_metrics.checks.done', 0) checks_done_str = self.redis_conn_decoded.set('mirage_labelled_metrics.checks.done', 0, get=True) if checks_done_str: checks_done = int(checks_done_str) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to get mirage_labelled_metrics.checks.done key from Redis') checks_done = 0 # checks_done_count = len(checks_done) # logger.info('mirage_labelled_metrics :: checks.done :: %s' % str(checks_done_count)) logger.info('mirage_labelled_metrics :: checks.done :: %s' % str(checks_done)) send_metric_name = '%s.checks.done' % skyline_app_graphite_namespace # send_graphite_metric(self, skyline_app, send_metric_name, str(checks_done_count)) send_graphite_metric(self, skyline_app, send_metric_name, str(checks_done)) # @modified 20200903 - Task #3730: Validate Mirage running multiple processes # Only send panorama, ionosphere and crucible metrics once a minute if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED: try: # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # sent_to_crucible = str(len(self.sent_to_crucible))# # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # sent_to_crucible = str(len(list(self.redis_conn.smembers('mirage.sent_to_crucible')))) sent_to_crucible = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_crucible')))) except: sent_to_crucible = '0' logger.info('mirage_labelled_metrics :: sent_to_crucible :: %s' % sent_to_crucible) send_metric_name = '%s.sent_to_crucible' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, sent_to_crucible) if settings.PANORAMA_ENABLED: try: # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # sent_to_panorama = str(len(self.sent_to_panorama)) # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # sent_to_panorama = str(len(list(self.redis_conn.smembers('mirage.sent_to_panorama')))) sent_to_panorama = str(len(list(self.redis_conn_decoded.smembers('mirage_labelled_metrics.sent_to_panorama')))) except: sent_to_panorama = '0' logger.info('mirage_labelled_metrics :: sent_to_panorama :: %s' % sent_to_panorama) send_metric_name = '%s.sent_to_panorama' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, sent_to_panorama) if settings.IONOSPHERE_ENABLED: try: # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # sent_to_ionosphere = str(len(self.sent_to_ionosphere)) # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # sent_to_ionosphere = str(len(list(self.redis_conn.smembers('mirage.sent_to_ionosphere')))) sent_to_ionosphere = str(len(list(self.redis_conn_decoded.smembers('mirage_labelled_metrics.sent_to_ionosphere')))) except Exception as e: logger.error('error :: mirage_labelled_metrics :: could not determine sent_to_ionosphere: %s' % e) sent_to_ionosphere = '0' logger.info('mirage_labelled_metrics :: sent_to_ionosphere :: %s' % sent_to_ionosphere) send_metric_name = '%s.sent_to_ionosphere' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, sent_to_ionosphere) last_sent_to_graphite = int(time()) delete_redis_sets = [ 'mirage_labelled_metrics.sent_to_crucible', 'mirage_labelled_metrics.sent_to_panorama', 'mirage_labelled_metrics.sent_to_ionosphere', 'mirage_labelled_metrics.stale_check_discarded', # @modified 20210309 - Task #3730: Validate Mirage running multiple processes # Reimplement mirage.checks.done count as increment key # 'mirage.checks.done', # @added 20200916 - Branch #3068: SNAB # Task #3744: POC matrixprofile # The main mirage process deletes this set not mirage_labelled_metrics # mirage_snab_only_checks_redis_set, ] for i_redis_set in delete_redis_sets: redis_set_to_delete = i_redis_set try: self.redis_conn.delete(redis_set_to_delete) logger.info('mirage_labelled_metrics :: deleted Redis set - %s' % redis_set_to_delete) except: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: failed to delete Redis set - %s' % redis_set_to_delete) # @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts # Refresh try: filesafe_names_dict = self.redis_conn_decoded.hgetall('metrics_manager.filesafe_base_names') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: mirage_labelled_metrics :: hgetall metrics_manager.filesafe_base_names failed - %s' % err) # Sleep if it went too fast # if time() - now < 1: if time() - now < 59: logger.info('mirage_labelled_metrics :: sleeping due to low run time...') # sleep(10) sleep(1)