"""
mirage.py
"""
import logging
try:
from Queue import Empty
except:
from queue import Empty
# from redis import StrictRedis
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list() to reduce memory and number of
# processes
# from multiprocessing import Process, Manager, Queue
from multiprocessing import Process, Queue
# from os import kill, getpid
import traceback
import re
# imports required for surfacing graphite JSON formatted timeseries for use in
# Mirage
import json
import sys
import os
# import errno
# import imp
from os import listdir
import datetime
# import os.path
import resource
from shutil import rmtree
from ast import literal_eval
from os.path import join, isfile
# @added 20220722 - Task #4624: Change all dict copy to deepcopy
import copy
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Added Unpacker
from msgpack import Unpacker, packb
import requests
try:
import urlparse
except ImportError:
# @modified 20191113 - Branch #3262: py3
# import urllib.parse
import urllib.parse as urlparse
import settings
# @modified 20160922 - Branch #922: Ionosphere
# Added the send_anomalous_metric_to skyline_functions.py
from skyline_functions import (
write_data_to_file, fail_check, send_anomalous_metric_to,
# @modified 20220726 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
# Moved send_graphite_metric
# mkdir_p, send_graphite_metric, filesafe_metricname,
mkdir_p, filesafe_metricname,
# @added 20170603 - Feature #2034: analyse_derivatives
nonNegativeDerivative, in_list,
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
get_redis_conn, get_redis_conn_decoded,
# @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url
# Bug #3778: Handle single encoded forward slash requests to Graphite
# sanitise_graphite_url,
# @added 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
encode_graphite_metric_name,
# @added 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
get_graphite_metric,
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
sort_timeseries,
)
# @added 20200425 - Feature #3512: matched_or_regexed_in_list function
# Feature #3508: ionosphere.untrainable_metrics
# Feature #3486: analyzer_batch
from matched_or_regexed_in_list import matched_or_regexed_in_list
from mirage_alerters import trigger_alert
from negaters import trigger_negater
from mirage_algorithms import run_selected_algorithm
from algorithm_exceptions import TooShort, Stale, Boring
# @added 20220315 - Feature #4482: Test alerts
from functions.redis.get_test_alerts import get_test_alerts
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.mirage.downsample_full_duration_and_merge_graphite import downsample_full_duration_and_merge_graphite
# @modified 20230430 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Not required as downsample_full_duration_and_merge_graphite uses it
# from functions.timeseries.downsample import downsample_timeseries
# @added 20220504 - Feature #2580: illuminance
from functions.illuminance.add_illuminance_entries import add_illuminance_entries
# @added 20220726 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
from functions.graphite.send_graphite_metric import send_graphite_metric
# @added 20220805 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
from functions.metrics.get_base_name_from_labelled_metrics_name import get_base_name_from_labelled_metrics_name
from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
# @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak
from custom_algorithms import run_custom_algorithm_on_timeseries
# @added 20230418 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
from functions.timeseries.normalized_variance import normalized_variance
LOCAL_DEBUG = False
# ENABLE_MEMORY_PROFILING - DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Added all the memory profiling blocks - mem_top, pympler, objgraph, gc
# Garbage collection et al, should not be run in anything but development model,
# therefore these variables are hard coded and not accessible via settings.py,
# if you are in here reading this then knock yourself out. gc and dump_garbage
# can be useful for getting an idea about what all the objects in play are, but
# garbage collection will just take longer and longer to run.
ENABLE_MEMORY_PROFILING = False
garbage_collection_enabled = False
if ENABLE_MEMORY_PROFILING:
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# As per http://stackoverflow.com/a/1641280
# This got useable understandable data
if garbage_collection_enabled:
from gc import get_objects
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
import gc
skyline_app = 'mirage'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(sys.version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
MIRAGE_PERIODIC_CHECK = settings.MIRAGE_PERIODIC_CHECK
except:
MIRAGE_PERIODIC_CHECK = False
# @added 20200413 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
try:
from settings import BATCH_PROCESSING
except:
BATCH_PROCESSING = None
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
BATCH_PROCESSING_NAMESPACES = list(settings.BATCH_PROCESSING_NAMESPACES)
except:
BATCH_PROCESSING_NAMESPACES = []
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
# from settings import KNOWN_NEGATIVE_METRICS
KNOWN_NEGATIVE_METRICS = list(settings.KNOWN_NEGATIVE_METRICS)
except:
KNOWN_NEGATIVE_METRICS = []
# @added 20200604 - Mirage - populate_redis
try:
from settings import MIRAGE_AUTOFILL_TOOSHORT
except:
MIRAGE_AUTOFILL_TOOSHORT = False
# @added 20200607 - Feature #3566: custom_algorithms
try:
CUSTOM_ALGORITHMS = settings.CUSTOM_ALGORITHMS
except:
CUSTOM_ALGORITHMS = None
try:
DEBUG_CUSTOM_ALGORITHMS = settings.DEBUG_CUSTOM_ALGORITHMS
except:
DEBUG_CUSTOM_ALGORITHMS = False
# @added 20200723 - Feature #3472: ionosphere.training_data Redis set
# Feature #3566: custom_algorithms
try:
MIRAGE_ALWAYS_METRICS = list(settings.MIRAGE_ALWAYS_METRICS)
except:
MIRAGE_ALWAYS_METRICS = []
# @added 20200610 - Feature #3560: External alert config
try:
EXTERNAL_ALERTS = settings.EXTERNAL_ALERTS
except:
EXTERNAL_ALERTS = {}
if EXTERNAL_ALERTS:
from external_alert_configs import get_external_alert_configs
# @added 20200913 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Info #1792: Shapelet extraction
try:
SNAB_ENABLED = settings.SNAB_ENABLED
except:
SNAB_ENABLED = False
try:
# @modified 20220722 - Task #4624: Change all dict copy to deepcopy
# SNAB_CHECKS = settings.SNAB_CHECKS.copy()
SNAB_CHECKS = copy.deepcopy(settings.SNAB_CHECKS)
except:
SNAB_CHECKS = {}
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
mirage_snab_only_checks_redis_set = 'mirage.snab_only_checks'
# @added 20201026 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle feedback metrics in a similar style to Ionosphere
try:
SKYLINE_FEEDBACK_NAMESPACES = list(settings.SKYLINE_FEEDBACK_NAMESPACES)
except:
# Let us take a guess
try:
graphite_host = str(settings.GRAPHITE_HOST)
graphite_hostname = graphite_host.split('.', -1)[0]
SKYLINE_FEEDBACK_NAMESPACES = [settings.SERVER_METRICS_NAME, graphite_hostname]
except:
SKYLINE_FEEDBACK_NAMESPACES = [this_host]
# @added 20210701 - Feature #4152: DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES
try:
DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES = list(settings.DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES)
except:
DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES = []
# @added 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Not introduced as a settings, making this the default behaviour
# try:
# MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = settings.MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# except:
# MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = False
MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = True
# @added 20210323 - Feature #3642: Anomaly type classification
try:
LUMINOSITY_CLASSIFY_ANOMALIES = settings.LUMINOSITY_CLASSIFY_ANOMALIES
except:
LUMINOSITY_CLASSIFY_ANOMALIES = False
# @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak
try:
MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = settings.MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS
except:
MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = True
# @added 20221206 - Feature #4734: mirage_vortex
# Feature #4732: flux vortex
try:
VORTEX_ENABLED = settings.VORTEX_ENABLED
except:
VORTEX_ENABLED = False
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
failed_checks_dir = '%s_failed' % settings.MIRAGE_CHECK_PATH
# @added 20191107 - Branch #3262: py3
alert_test_file = '%s/%s_alert_test.txt' % (settings.SKYLINE_TMP_DIR, skyline_app)
[docs]class Mirage(Thread):
"""
The Mirage thread
"""
def __init__(self, parent_pid):
"""
Initialize the Mirage
"""
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# super(Mirage, self).__init__()
super().__init__()
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = os.getpid()
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Task #3032: Debug number of Python processes and memory use
# Branch #3002: docker
# Reduce amount of Manager instances that are used as each requires a
# copy of entire memory to be copied into each subprocess so this
# results in a python process per Manager instance, using as much
# memory as the parent. OK on a server, not so much in a container.
# Disabled all the Manager().list() below and replaced with Redis sets
# self.anomalous_metrics = Manager().list()
self.mirage_exceptions_q = Queue()
self.mirage_anomaly_breakdown_q = Queue()
# self.not_anomalous_metrics = Manager().list()
# self.metric_variables = Manager().list()
# self.ionosphere_metrics = Manager().list()
# self.sent_to_crucible = Manager().list()
# self.sent_to_panorama = Manager().list()
# self.sent_to_ionosphere = Manager().list()
# @added 20170603 - Feature #2034: analyse_derivatives
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
# @modified 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Use get_redis_conn and get_redis_conn_decoded to use on Redis sets when the bytes
# types need to be decoded as utf-8 to str
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(
# password=settings.REDIS_PASSWORD,
# unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(
# unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
os.kill(self.current_pid, 0)
os.kill(self.parent_pid, 0)
except:
# @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics
# Log warning
logger.warning('warning :: parent or current process dead')
sys.exit(0)
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
[docs] def spawn_alerter_process(self, alert, metric, second_order_resolution_seconds, context, triggered_algorithms):
"""
Spawn a process to trigger an alert. This is used by smtp alerters so
that matplotlib objects are cleared down and the alerter cannot create
a memory leak in this manner and plt.savefig keeps the object in memory
until the process terminates. Seeing as data is being surfaced and
processed in the alert_smtp context, multiprocessing the alert creation
and handling prevents any memory leaks in the parent.
# @added 20160814 - Bug #1558: Memory leak in Analyzer
# Issue #21 Memory leak in Analyzer
# https://github.com/earthgecko/skyline/issues/21
"""
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(alert, metric, second_order_resolution_seconds, context, triggered_algorithms)
# @modified 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until):
# @modified 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Deprecating self.surface_graphite_metric_data in all mirage functions
# (EXCEPT for in populate_redis) so that the same function can be used
# for all graphite requests. get_graphite_metric does the derivative,
# zero_fill and last_known_value functions so no longer required here.
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Removed high_res parameter and will no longer be used not that the
# FULL_DURATION data is being backwards resampled
# def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until, high_res=False):
[docs] def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until):
# @added 20160803 - Unescaped Graphite target - https://github.com/earthgecko/skyline/issues/20
# bug1546: Unescaped Graphite target
# @modified 20191107 - Branch #3263: py3
# Commented out colon
# new_metric_namespace = metric_name.replace(':', '\:')
# metric_namespace = new_metric_namespace.replace('(', '\(')
metric_namespace = metric_name.replace('(', '\\(')
metric_name = metric_namespace.replace(')', '\\)')
# @added 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
encoded_graphite_metric_name = encode_graphite_metric_name(skyline_app, metric_name)
try:
# We use absolute time so that if there is a lag in mirage the correct
# timeseries data is still surfaced relevant to the anomalous datapoint
# timestamp
if settings.GRAPHITE_PORT != '':
# @modified 20190520 - Branch #3002: docker
# Use GRAPHITE_RENDER_URI
# url = '%s://%s:%s/render/?from=%s&until=%s&target=%s&format=json' % (
# settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
# str(settings.GRAPHITE_PORT), graphite_from, graphite_until,
# metric_name)
url = '%s://%s:%s/%s/?from=%s&until=%s&target=%s&format=json' % (
settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
str(settings.GRAPHITE_PORT), settings.GRAPHITE_RENDER_URI,
# @modified 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
# graphite_from, graphite_until, metric_name)
graphite_from, graphite_until, encoded_graphite_metric_name)
else:
# @modified 20190520 - Branch #3002: docker
# Use GRAPHITE_RENDER_URI
# url = '%s://%s/render/?from=%s&until=%s&target=%s&format=json' % (
# settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
# graphite_from, graphite_until, metric_name)
url = '%s://%s/%s/?from=%s&until=%s&target=%s&format=json' % (
settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
settings.GRAPHITE_RENDER_URI, graphite_from, graphite_until,
# @modified 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
# metric_name)
encoded_graphite_metric_name)
r = requests.get(url)
js = r.json()
datapoints = js[0]['datapoints']
except:
logger.error(traceback.format_exc())
logger.error('error :: surface_graphite_metric_data :: failed to get data from Graphite')
return False
try:
converted = []
for datapoint in datapoints:
try:
new_datapoint = [float(datapoint[1]), float(datapoint[0])]
converted.append(new_datapoint)
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
except: # nosec
continue
parsed = urlparse.urlparse(url)
target = urlparse.parse_qs(parsed.query)['target'][0]
metric_data_folder = str(settings.MIRAGE_DATA_FOLDER) + "/" + target
mkdir_p(metric_data_folder)
# @added 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# No longer used
# if high_res:
# with open(metric_data_folder + "/" + target + '.high_res', 'w') as f:
# f.write(json.dumps(converted))
# f.close()
# return True
with open(metric_data_folder + "/" + target + '.json', 'w') as f:
f.write(json.dumps(converted))
f.close()
return True
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: surface_graphite_metric_data :: failed to convert Graphite data and write to file - %s' % err)
return False
# @added 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Bug #1460: panorama check file fails
# Panorama check file fails #24
# Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def mirage_load_metric_vars(self, metric_vars_file):
"""
Load the metric variables for a check from a metric check variables file
:param metric_vars_file: the path and filename to the metric variables files
:type metric_vars_file: str
:return: the metric_vars list or ``False``
:rtype: list
"""
if os.path.isfile(metric_vars_file):
logger.info(
'loading metric variables from metric_check_file - %s' % (
str(metric_vars_file)))
else:
logger.error(
'error :: loading metric variables from metric_check_file - file not found - %s' % (
str(metric_vars_file)))
return False
metric_vars = []
with open(metric_vars_file) as f:
for line in f:
no_new_line = line.replace('\n', '')
no_equal_line = no_new_line.replace(' = ', ',')
array = str(no_equal_line.split(',', 1))
add_line = literal_eval(array)
metric_vars.append(add_line)
# @added 20200429 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Allow the check file to already hold a valid python list on one line
# so that a check can be added by simply echoing to debug metric_vars
# line from to log for any failed checks into a new Mirage check file
# The original above pattern is still the default, this is for the check
# files to be added by the operator from the log or for debugging.
try_literal_eval = False
if metric_vars:
if isinstance(metric_vars, list):
pass
else:
try_literal_eval = True
logger.info('metric_vars is not a list, set to try_literal_eval')
if len(metric_vars) < 2:
try_literal_eval = True
logger.info('metric_vars is not a list of lists, set to try_literal_eval')
else:
try_literal_eval = True
logger.info('metric_vars is not defined, set to try_literal_eval')
if try_literal_eval:
try:
with open(metric_vars_file) as f:
for line in f:
metric_vars = literal_eval(line)
if metric_vars:
break
except:
logger.error(traceback.format_exc())
logger.error('metric_vars not loaded with literal_eval')
metric_vars = []
string_keys = ['metric']
float_keys = ['value']
int_keys = ['hours_to_resolve', 'metric_timestamp']
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
boolean_keys = ['snab_only_check']
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms to mirage_check_file
list_keys = ['triggered_algorithms']
metric_vars_array = []
for var_array in metric_vars:
# @modified 20181023 - Feature #2618: alert_slack
# Wrapped in try except for debugging issue where the
# hours_to_resolve was interpolating to hours_to_resolve = "t"
try:
key = None
value = None
if var_array[0] in string_keys:
key = var_array[0]
_value_str = str(var_array[1]).replace("'", '')
value_str = str(_value_str).replace('"', '')
value = str(value_str)
if var_array[0] == 'metric':
metric = value
if var_array[0] in float_keys:
key = var_array[0]
_value_str = str(var_array[1]).replace("'", '')
value_str = str(_value_str).replace('"', '')
value = float(value_str)
if var_array[0] in int_keys:
key = var_array[0]
_value_str = str(var_array[1]).replace("'", '')
value_str = str(_value_str).replace('"', '')
value = int(float(value_str))
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Handle new snab_only_check boolean
if var_array[0] in boolean_keys:
key = var_array[0]
logger.debug(
'debug :: boolean key - key: %s, value: %s' % (
str(var_array[0]), str(var_array[1])))
if str(var_array[1]) == '"True"':
value = True
else:
value = False
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms to mirage_check_file
if var_array[0] in list_keys:
key = var_array[0]
logger.debug(
'debug :: list key - key: %s, value: %s' % (
str(var_array[0]), str(var_array[1])))
_value_str = str(var_array[1]).replace("'", '')
try:
value = literal_eval(var_array[1])
except Exception as e:
logger.error(
'error :: loading metric variables - failed to literal_eval list for %s, %s - %s' % (
str(key), str(var_array[1]), e))
value = []
if key:
metric_vars_array.append([key, value])
if len(metric_vars_array) == 0:
logger.error(
'error :: loading metric variables - none found from %s' % (
str(metric_vars_file)))
return False
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to load metric variables from check file - %s' % (metric_vars_file))
return False
logger.info('debug :: metric_vars for %s' % str(metric))
logger.info('debug :: %s' % str(metric_vars_array))
return metric_vars_array
[docs] def dump_garbage(self):
"""
DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
show us what's the garbage about
"""
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
# force collection
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info('debug :: GARBAGE')
try:
gc.collect()
gc_collect_ok = True
except:
logger.error('error :: gc.collect failed')
logger.error(traceback.format_exc())
gc_collect_ok = False
if gc_collect_ok:
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info('debug :: GARBAGE OBJECTS')
for x in gc.garbage:
s = str(x)
if len(s) > 80:
s = s[:80]
# print type(x), "\n ", s
try:
log_string = type(x), "\n ", s
log_string = 'unused variable for testing only'
except:
logger.error(traceback.format_exc())
logger.error('error :: print x and s')
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info(log_string)
else:
return None
# @added 20200604 - Mirage - populate_redis
[docs] def populate_redis(self, i, metric):
"""
Get FULL_DURATION data from Graphite for a metric and populate Redis
"""
# Check if it has been done via the mirage.redis_populate key
redis_populated = False
redis_populated_key = 'mirage.redis_populated.%s' % metric
try:
redis_populated = self.redis_conn_decoded.get(redis_populated_key)
except Exception as e:
logger.error(
'error :: populate_redis :: could not query cache_key - %s - %s' % (
redis_populated_key, e))
redis_populated = False
# Do not handle batch processing metrics
batch_processing_metrics = []
try:
# @modified 20220113 - Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# Use aet.analyzer.batch_processing_metrics
# batch_processing_metrics = list(self.redis_conn_decoded.smembers('analyzer.batch_processing_metrics'))
batch_processing_metrics = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to get analyzer.batch_processing_metrics from Redis')
batch_processing_metrics = None
if batch_processing_metrics:
if metric in batch_processing_metrics:
redis_populated = True
logger.info('populate_redis :: %s is a batch processing metric, not handling, creating Redis key %s' % (
metric, redis_populated_key))
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, int(time()))
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
if redis_populated:
logger.info('populate_redis :: the Redis key %s already exists, it has been done' % (redis_populated_key))
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
time_now = int(time())
time_from = int(time_now - settings.FULL_DURATION)
# Calculate graphite from and until parameters from the metric timestamp
graphite_until = datetime.datetime.fromtimestamp(int(float(time_now))).strftime('%H:%M_%Y%m%d')
graphite_from = datetime.datetime.fromtimestamp(int(time_from)).strftime('%H:%M_%Y%m%d')
# Remove any old json file related to the metric
metric_data_folder = '%s/%s' % (settings.MIRAGE_DATA_FOLDER, metric)
metric_json_file = '%s/%s.json' % (metric_data_folder, str(metric))
try:
os.remove(metric_json_file)
except OSError:
pass
# Get data from graphite
logger.info('populate_redis :: surfacing %s time series from Graphite' % (metric))
try:
# @modified 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
# The self.surface_graphite_metric_data is used here because the
# get_graphite_metric function automatically applies
# nonNegativeDerivative to a metric if it is a derivative_metric and
# in the context of populating Redis, that is not desired
# @modified 20230616
# Deprecate self.surface_graphite_metric_data in mirage so that the
# same function can be used for all graphite requests
# self.surface_graphite_metric_data(metric, graphite_from, graphite_until)
metric_json_file_saved = get_graphite_metric(
skyline_app, metric, graphite_from,
graphite_until, 'json', metric_json_file)
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: get_graphite_metric failed to surface_graphite_metric_data to populate %s' % (
str(metric_json_file)))
# Check there is a json timeseries file to use
if not os.path.isfile(metric_json_file):
logger.error(
'error :: populate_redis :: retrieve failed - failed to surface %s time series from graphite' % (
metric))
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
logger.info('populate_redis :: retrieved data :: for %s' % (
metric))
self.check_if_parent_is_alive()
timeseries = []
try:
with open((metric_json_file), 'r') as f:
timeseries = json.loads(f.read())
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to get timeseries from json - %s' % metric_json_file)
timeseries = []
if not timeseries:
logger.info('populate_redis :: no timeseries data for %s, setting redis_populated_key and removing from mirage.populate_redis' % metric)
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
try:
os.remove(metric_json_file)
except OSError:
pass
FULL_NAMESPACE = settings.FULL_NAMESPACE
pipe = None
logger.info('populate_redis :: time series data for %s, populating Redis with %s data points' % (
metric, str(len(timeseries))))
try:
pipe = self.redis_conn.pipeline()
except Exception as e:
logger.error('error :: populate_redis :: error on Redis pipe: %s' % (str(e)))
pipe = None
redis_populated = False
try:
for metric_data in timeseries:
key = ''.join((FULL_NAMESPACE, metric))
try:
pipe.append(str(key), packb(metric_data))
except Exception as e:
logger.error('error :: populate_redis :: error on pipe.append: %s' % (str(e)))
pipe.execute()
redis_populated = True
except Exception as e:
logger.error('error :: populate_redis :: error on pipe.execute: %s' % (str(e)))
if redis_populated:
del timeseries
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
# Add to Redis set so that Analyzer sorts and deduplicates the data
# on the next run
try:
self.redis_conn.sadd('mirage.filled', metric)
logger.info('populate_redis :: add %s to Redis set mirage.filled for Analyzer to sort and deduplicate the Redis data' % metric)
except Exception as e:
logger.error('error :: populate_redis :: failed add metric to Redis set mirage.filled: %s' % e)
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes
# def spin_process(self, i, run_timestamp):
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# def spin_process(self, i, run_timestamp, metric_check_filename):
[docs] def spin_process(self, i, run_timestamp, processing_check_files):
"""
Assign a metric for a process to analyze.
"""
# if int(i) > 1:
# i_less_one = int(i) - 1
# sleep_for_str = '0.%s' % str(i_less_one)
# logger.info('process %s sleeping for %s' % (str(i), sleep_for_str))
# sleep(float(sleep_for_str))
# Discover metric to analyze
# metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))]
# Check if this process is unnecessary
# if len(metric_var_files) == 0:
# logger.info('no check files found, nothing to do')
# return
# metric_var_files_sorted = sorted(metric_var_files)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
# Ensure the process locks the check
# metric_check_filename = None
# for i_metric_check_file in metric_var_files_sorted:
# check_assigned = False
# cache_key = 'mirage.check.lock.%s' % str(i_metric_check_file)
# try:
# check_assigned = self.redis_conn.get(cache_key)
# if not check_assigned:
# try:
# self.redis_conn.setex(cache_key, 120, int(time()))
# metric_check_filename = str(i_metric_check_file)
# logger.info('assigned self check file and set Redis key - %s' % (cache_key))
# self.redis_conn.sadd('mirage.checks.done', metric_check_filename)
# break
# except:
# logger.error(traceback.format_exc())
# logger.error('error :: failed to set Redis key - %s' % cache_key)
# else:
# logger.info('already assigned, Redis key exists - %s' % (cache_key))
#
# except:
# logger.error(traceback.format_exc())
# logger.error('error :: failed to check if Redis key exists - %s' % cache_key)
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# if not metric_check_filename:
if not processing_check_files:
logger.info('no check to assign to process, nothing to do')
return
logger.info('assigned %s checks to process' % str(len(processing_check_files)))
start_time = time()
mirage_periodic_check_metrics = []
if MIRAGE_PERIODIC_CHECK:
try:
mirage_periodic_check_metrics = list(self.redis_conn_decoded.smembers('mirage.periodic_check.metrics'))
except:
logger.error('error :: failed to get mirage_periodic_check_metrics from Redis')
mirage_periodic_check_metrics = []
try:
derivative_metrics = list(self.redis_conn_decoded.smembers('aet.metrics_manager.derivative_metrics'))
except:
derivative_metrics = []
try:
non_derivative_monotonic_metrics = list(settings.NON_DERIVATIVE_MONOTONIC_METRICS)
except:
non_derivative_monotonic_metrics = []
try:
ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics'))
except:
ionosphere_unique_metrics = []
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
for metric_check_filename in processing_check_files:
metric_check_file = '%s/%s' % (
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# settings.MIRAGE_CHECK_PATH, str(metric_var_files_sorted[0]))
settings.MIRAGE_CHECK_PATH, metric_check_filename)
check_file_name = os.path.basename(str(metric_check_file))
check_file_timestamp = check_file_name.split('.', 1)[0]
check_file_metricname_txt = check_file_name.split('.', 1)[1]
check_file_metricname = check_file_metricname_txt.replace('.txt', '')
check_file_metricname_dir = check_file_metricname.replace('.', '/')
metric_failed_check_dir = '%s/%s/%s' % (failed_checks_dir, check_file_metricname_dir, check_file_timestamp)
# Load metric variables
# @modified 20160822 - Bug #1460: panorama check file fails
# Changed to panorama style skyline_functions load_metric_vars
# self.load_metric_vars(metric_check_file)
# Load and validate metric variables
try:
# @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Bug #1460: panorama check file fails
# Panorama check file fails #24
# Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
# metric_vars = load_metric_vars(skyline_app, str(metric_check_file))
metric_vars_array = self.mirage_load_metric_vars(str(metric_check_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to load metric variables from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
metric = None
# @added 20200106 - Branch #3262: py3
# Task #3034: Reduce multiprocessing Manager list usage
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# redis_set_to_delete = 'mirage.metric_variables'
redis_metric_variables_set = 'mirage.%s.metric_variables' % str(i)
redis_set_to_delete = redis_metric_variables_set
try:
self.redis_conn.delete(redis_set_to_delete)
logger.info('deleted Redis set - %s' % redis_set_to_delete)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % redis_set_to_delete)
try:
key = 'metric'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
metric = str(value_list[0])
metric_name = ['metric_name', metric]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(metric_name)
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
redis_set = 'mirage.metric_variables'
data = str(metric_name)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
logger.info('debug :: added metric_name %s from check file - %s' % (str(metric_name), metric_check_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
if not metric:
logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
value = None
try:
key = 'value'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
value = float(value_list[0])
metric_value = ['metric_value', value]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(metric_value)
redis_set = 'mirage.metric_variables'
data = str(metric_value)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
if not value:
# @modified 20181119 - Bug #2708: Failing to load metric vars
if value == 0.0:
pass
else:
logger.error('error :: failed to load value variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
hours_to_resolve = None
try:
key = 'hours_to_resolve'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
hours_to_resolve = int(value_list[0])
hours_to_resolve_list = ['hours_to_resolve', hours_to_resolve]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(hours_to_resolve_list)
redis_set = 'mirage.metric_variables'
data = str(hours_to_resolve_list)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error('error :: failed to read hours_to_resolve variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
if not hours_to_resolve:
logger.error('error :: failed to load hours_to_resolve variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
metric_timestamp = None
try:
key = 'metric_timestamp'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
metric_timestamp = int(value_list[0])
metric_timestamp_list = ['metric_timestamp', metric_timestamp]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(metric_timestamp_list)
redis_set = 'mirage.metric_variables'
data = str(metric_timestamp_list)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
if not metric_timestamp:
logger.error('error :: failed to load metric_timestamp variable from check file - %s' % (metric_check_file))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
snab_only_check = None
try:
key = 'snab_only_check'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
snab_only_check = value_list[0]
except:
snab_only_check = None
snab_only_check_list = ['snab_only_check', snab_only_check]
redis_set = 'mirage.metric_variables'
data = str(snab_only_check_list)
try:
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms to mirage_check_file
try:
key = 'triggered_algorithms'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
triggered_algorithms = value_list[0]
except:
triggered_algorithms = []
metric_data_dir = '%s/%s' % (settings.MIRAGE_DATA_FOLDER, str(metric))
# Ignore any metric check with a timestamp greater than MIRAGE_STALE_SECONDS
int_metric_timestamp = int(metric_timestamp)
int_run_timestamp = int(run_timestamp)
metric_timestamp_age = int_run_timestamp - int_metric_timestamp
periodic_mirage_check = False
if MIRAGE_PERIODIC_CHECK:
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# Only call once
# try:
# mirage_periodic_check_metrics = list(self.redis_conn_decoded.smembers('mirage.periodic_check.metrics'))
# except:
# logger.error('error :: failed to get mirage_periodic_check_metrics from Redis')
# mirage_periodic_check_metrics = []
redis_metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric))
if redis_metric_name in mirage_periodic_check_metrics:
logger.info('this is a periodic Mirage check for %s' % metric)
periodic_mirage_check = True
# @added 20200413 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Do not evaluate batch metrics against MIRAGE_STALE_SECONDS
if BATCH_PROCESSING:
# Is this a analyzer_batch related anomaly
analyzer_batch_anomaly = None
analyzer_batch_metric_anomaly_key = 'analyzer_batch.anomaly.%s.%s' % (
str(metric_timestamp), metric)
try:
# analyzer_batch_anomaly = self.redis_conn.get(analyzer_batch_metric_anomaly_key)
analyzer_batch_anomaly = self.redis_conn_decoded.get(analyzer_batch_metric_anomaly_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s - %s' % (
analyzer_batch_metric_anomaly_key, e))
analyzer_batch_anomaly = None
if analyzer_batch_anomaly:
logger.info('batch processing - identified as an analyzer_batch triggered anomaly from the presence of the Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
logger.info('batch processing - not identified as an analyzer_batch triggered anomaly as no Redis key found - %s' % analyzer_batch_metric_anomaly_key)
if analyzer_batch_anomaly:
logger.info('batch processing - setting metric_timestamp_age from %s to 1 so that will not be discarded as stale on %s' % (
str(metric_timestamp_age), metric))
metric_timestamp_age = 1
if metric_timestamp_age > settings.MIRAGE_STALE_SECONDS:
logger.info('stale check :: %s check request is %s seconds old - discarding' % (metric, str(metric_timestamp_age)))
# Remove metric check file
if os.path.isfile(metric_check_file):
os.remove(metric_check_file)
logger.info('removed check file - %s' % (metric_check_file))
else:
logger.info('could not remove check file - %s' % (metric_check_file))
# Remove the metric directory
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree - %s' % metric_data_dir)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
redis_set = 'mirage.stale_check_discarded'
try:
self.redis_conn.sadd(redis_set, str(metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(metric), str(redis_set)))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
# Calculate hours second order resolution to seconds
second_order_resolution_seconds = int(hours_to_resolve) * 3600
# Calculate graphite from and until parameters from the metric timestamp
graphite_until = datetime.datetime.fromtimestamp(int(float(metric_timestamp))).strftime('%H:%M_%Y%m%d')
int_second_order_resolution_seconds = int(float(second_order_resolution_seconds))
second_resolution_timestamp = int_metric_timestamp - int_second_order_resolution_seconds
graphite_from = datetime.datetime.fromtimestamp(int(second_resolution_timestamp)).strftime('%H:%M_%Y%m%d')
# Remove any old json file related to the metric
metric_json_file = '%s/%s.json' % (metric_data_dir, str(metric))
try:
os.remove(metric_json_file)
except OSError:
pass
# Get data from graphite
logger.info(
'retrieve data :: surfacing %s time series from graphite for %s seconds' % (
metric, str(second_order_resolution_seconds)))
# @modified 20191113 - Branch #3262: py3
# Wrapped in try
try:
# @modified 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
# Deprecate self.surface_graphite_metric_data in mirage so that the
# same function can be used for all graphite requests
# self.surface_graphite_metric_data(metric, graphite_from, graphite_until)
metric_json_file_saved = get_graphite_metric(
skyline_app, metric, second_resolution_timestamp,
metric_timestamp, 'json', metric_json_file)
if metric_json_file_saved:
logger.info('%s time series data saved' % metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: get_graphite_metric failed to surface_graphite_metric_data to populate %s' % (
str(metric_json_file)))
# Check there is a json timeseries file to test
if not os.path.isfile(metric_json_file):
logger.error(
'error :: retrieve failed - failed to surface %s time series from graphite' % (
metric))
# @added 20200905 - Feature #3734: waterfall alerts
# Try a metric 3 times before removing the check file
remove_check_file = True
check_failed_key = 'mirage.check.data_retrieval_failed.%s.%s' % (
str(int_metric_timestamp), metric)
fail_count = 0
try:
fail_count = self.redis_conn.get(check_failed_key)
except:
fail_count = 0
if not fail_count:
fail_count = 0
fail_count += 1
if fail_count < 3:
remove_check_file = False
try:
self.redis_conn.setex(check_failed_key, 300, fail_count)
logger.info('updated fail_count to %s in %s' % (str(fail_count), check_failed_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to set Redis key %s with %s' % (
str(check_failed_key), str(fail_count)))
else:
logger.error('error :: fail_count is %s in %s, removing check file' % (str(fail_count), check_failed_key))
if remove_check_file:
# Remove metric check file
try:
os.remove(metric_check_file)
except OSError:
pass
# Remove the metric directory
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree %s' % metric_data_dir)
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
logger.info('retrieved data :: for %s at %s seconds' % (
metric, str(second_order_resolution_seconds)))
# Make process-specific dicts
exceptions = defaultdict(int)
anomaly_breakdown = defaultdict(int)
self.check_if_parent_is_alive()
timeseries = []
try:
with open((metric_json_file), 'r') as f:
timeseries = json.loads(f.read())
logger.info('data points surfaced :: %s' % (str(len(timeseries))))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to create timeseries from %s - %s' % (
str(metric_json_file), err))
timeseries = []
# @added 20170212 - Feature #1886: Ionosphere learn
# Only process if the metric has sufficient data
first_timestamp = None
try:
first_timestamp = int(timeseries[0][0])
except Exception as err:
logger.error('error :: could not determine first timestamp - %s' % err)
timestamp_now = int(time())
valid_if_before_timestamp = timestamp_now - int(settings.FULL_DURATION)
valid_mirage_timeseries = True
if first_timestamp:
if first_timestamp > valid_if_before_timestamp:
valid_mirage_timeseries = False
else:
valid_mirage_timeseries = False
logger.warning('warning :: no first_timestamp, valid_mirage_timeseries: %s' % str(valid_mirage_timeseries))
# @added 20170603 - Feature #2034: analyse_derivatives
# Convert the values of metrics strictly increasing monotonically
# to their deriative products
known_derivative_metric = False
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# Only call once
# try:
# # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# # Branch #3262: py3
# # derivative_metrics = list(self.redis_conn.smembers('derivative_metrics'))
# # @modified 20211012 - Feature #4280: aet.metrics_manager.derivative_metrics Redis hash
# # derivative_metrics = list(self.redis_conn_decoded.smembers('derivative_metrics'))
# derivative_metrics = list(self.redis_conn_decoded.smembers('aet.metrics_manager.derivative_metrics'))
# except:
# derivative_metrics = []
redis_metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric))
if redis_metric_name in derivative_metrics:
known_derivative_metric = True
if known_derivative_metric:
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# Only call once
# try:
# # @modified 20200606 - Bug #3572: Apply list to settings import
# non_derivative_monotonic_metrics = list(settings.NON_DERIVATIVE_MONOTONIC_METRICS)
# except:
# non_derivative_monotonic_metrics = []
skip_derivative = in_list(redis_metric_name, non_derivative_monotonic_metrics)
if skip_derivative:
known_derivative_metric = False
# @modified 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
# Deprecate self.surface_graphite_metric_data in mirage so that the
# same function can be used for all graphite requests.
# get_graphite_metric does the derivative, zero_fill and last_known_value
# functions so no longer required here.
check_for_derivative = False
if check_for_derivative:
if known_derivative_metric and valid_mirage_timeseries:
try:
derivative_timeseries = nonNegativeDerivative(timeseries)
timeseries = derivative_timeseries
except:
logger.error('error :: nonNegativeDerivative failed')
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
if snab_only_check:
snab_recheck_key = 'snab.recheck.%s' % metric
snab_recheck_key_exists = False
try:
snab_recheck_key_exists = self.redis_conn_decoded.get(snab_recheck_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s' % (
snab_recheck_key))
original_added_at = 0
if snab_recheck_key_exists:
logger.info('snab recheck key exists - %s' % snab_recheck_key)
try:
original_added_at = int(snab_recheck_key_exists)
except:
# The key expired
original_added_at = int(time()) - 300
else:
logger.info('snab recheck key does not exists - %s' % snab_recheck_key)
snab_recheck_original_anomaly_timestamp_key = 'snab.recheck.anomaly_timestamp.%s' % metric
snab_recheck_original_anomaly_timestamp_key_exists = False
try:
snab_recheck_original_anomaly_timestamp_key_exists = self.redis_conn_decoded.get(snab_recheck_original_anomaly_timestamp_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s' % (
snab_recheck_key))
original_anomaly_timestamp = int(timeseries[-1][0])
if snab_recheck_original_anomaly_timestamp_key_exists:
logger.info('snab.recheck.anomaly_timestamp key exists - %s' % snab_recheck_original_anomaly_timestamp_key)
try:
original_anomaly_timestamp = int(snab_recheck_original_anomaly_timestamp_key_exists)
except:
# The key expired
pass
else:
logger.info('snab.recheck.anomaly_timestamp key does not exists - %s' % snab_recheck_original_anomaly_timestamp_key)
snab_json_file_created = False
if snab_recheck_key_exists and snab_recheck_original_anomaly_timestamp_key_exists:
# Create timeseries json file with the timeseries
use_snab_timestamp = metric_timestamp
try:
use_snab_timestamp = int(timeseries[-1][0])
except:
pass
snab_json_file = '%s/%s.%s.json' % (
# settings.SNAB_DATA_DIR, str(int(metric_timestamp)), str(metric))
settings.SNAB_DATA_DIR, str(int(use_snab_timestamp)), str(metric))
timeseries_json = str(timeseries).replace('[', '(').replace(']', ')')
try:
snab_data_last_datapoint = [timeseries[-1][0], timeseries[-1][1]]
except:
snab_data_last_datapoint = [None, None, 'there was no timeseries data']
if timeseries_json:
try:
write_data_to_file(skyline_app, snab_json_file, 'w', timeseries_json)
logger.info('added snab timeseries file with last entry - %s :: %s' % (
str(snab_data_last_datapoint), snab_json_file))
except:
logger.error(traceback.format_exc())
logger.error(
'error :: failed to add snab timeseries file :: %s' %
snab_json_file)
if not os.path.isfile(snab_json_file):
logger.error('error - the snab_json_file was not created - %s' % (
str(snab_json_file)))
else:
logger.info('snab_json_file exists - %s' % snab_json_file)
snab_json_file_created = True
else:
logger.error(
'error :: no timeseries_json to add snab timeseries file :: %s' %
snab_json_file)
else:
logger.info('not adding snab_json_file as snab recheck keys no longer not exist')
if snab_json_file_created:
data = {
'metric': metric,
'anomaly_data': snab_json_file,
'timestamp': int(timeseries[-1][0]),
'original_anomaly_timestamp': original_anomaly_timestamp,
'value': timeseries[-1][1],
'original_added_at': original_added_at,
}
try:
self.redis_conn.sadd(mirage_snab_only_checks_redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to %s Redis set' % (
metric, str(mirage_snab_only_checks_redis_set)))
# Remove metric check file
try:
os.remove(metric_check_file)
except OSError:
pass
# Remove the metric directory
try:
rmtree(metric_data_dir)
logger.info('removed data dir for snab_check_only - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree for snab_check_only - %s' % metric_data_dir)
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# return
continue
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
run_negatives_present = False
if settings.IONOSPHERE_ENABLED and valid_mirage_timeseries:
run_negatives_present = True
known_negative_metric, known_negative_metric_matched_by = matched_or_regexed_in_list(skyline_app, metric, KNOWN_NEGATIVE_METRICS)
if known_negative_metric:
run_negatives_present = False
logger.info('will not check %s for negative values' % (metric))
else:
logger.info('will check %s for negative values' % (metric))
del known_negative_metric_matched_by
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Add timings
snab_check_namespace = False
if SNAB_ENABLED and SNAB_CHECKS:
for app in SNAB_CHECKS:
if app == skyline_app:
for snab_context in SNAB_CHECKS[app]:
if snab_check_namespace:
break
for algorithm in SNAB_CHECKS[app][snab_context]:
if snab_check_namespace:
break
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
if namespace in redis_metric_name:
snab_check_namespace = True
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check if %s is a snab_check_metric' % redis_metric_name)
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
test_alerts = {}
test_alert_and_trigger = False
try:
test_alerts = get_test_alerts(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: get_test_alerts failed - %s' % err)
if test_alerts:
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_metric = test_alerts[test_alert_timestamp]['metric']
if test_metric != metric:
continue
try:
trigger_anomaly = test_alerts[test_alert_timestamp]['trigger_anomaly']
except KeyError:
trigger_anomaly = False
if not trigger_anomaly:
continue
logger.info('test_alert found for %s with trigger_anomaly: %s' % (
str(test_metric), str(trigger_anomaly)))
test_alert_and_trigger = True
break
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed determine test_alert details from test_alerts: %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
if test_alert_and_trigger:
alert_tested_key = 'mirage.test_alerts.done.%s' % metric
try:
self.redis_conn_decoded.setex(alert_tested_key, 300, int(time()))
logger.info('test_alert created Redis key %s' % alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to create Redis key %s - %s' % (
alert_tested_key, err))
# @added 20200607 - Feature #3566: custom_algorithms
algorithms_run = list(settings.MIRAGE_ALGORITHMS)
# @added 20200904 - Feature #3734: waterfall alerts
anomalous = None
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Add timings
analysis_start_time = time()
# @added 20230608 -
MIRAGE_NAMESPACE_MINIMUM_RESOLUTIONS = {}
downsample_namespace = False
original_timeseries = []
namespace_minimum_resolution = 0
if MIRAGE_NAMESPACE_MINIMUM_RESOLUTIONS and valid_mirage_timeseries:
for namespace in list(MIRAGE_NAMESPACE_MINIMUM_RESOLUTIONS.keys()):
try:
pattern_match = False
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, [namespace])
del metric_matched_by
except Exception as err:
logger.error('error :: matched_or_regexed_in_list failed checking to downsample_namespace for %s in %s - %s' % (
base_name, namespace, err))
pattern_match = False
if pattern_match:
downsample_namespace = True
original_timeseries = list(timeseries)
namespace_minimum_resolution = MIRAGE_NAMESPACE_MINIMUM_RESOLUTIONS[namespace]
logger.info('downsampling %s to namespace_minimum_resolution: %s' % (
base_name, str(namespace_minimum_resolution)))
except Exception as err:
logger.error('error :: failed checking to downsample_namespace for %s in %s - %s' % (
base_name, namespace, err))
if downsample_namespace and namespace_minimum_resolution:
logger.info('would downsample %s to namespace_minimum_resolution: %s' % (
base_name, str(namespace_minimum_resolution)))
# Determine resolution
# downsample_timeseries(skyline_app, timeseries, resolution, namespace_minimum_resolution, method='mean', origin='end')
try:
if valid_mirage_timeseries:
logger.info('analyzing :: %s at %s seconds' % (metric, second_order_resolution_seconds))
# @modified 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Added run_negatives_present and negatives_found
# anomalous, ensemble, datapoint = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds)
# @modified 20200607 - Feature #3566: custom_algorithms
# Added algorithms_run
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
# anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms)
# @modified 20230118 - Task #4786: Switch from matrixprofile to stumpy
# Task #4778: v4.0.0 - update dependencies
# Added current_func
# anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms)
anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms, current_func='mirage')
else:
logger.info('not analyzing :: %s at %s seconds as there is not sufficiently older datapoints in the timeseries - not valid_mirage_timeseries' % (metric, second_order_resolution_seconds))
anomalous = False
if timeseries:
datapoint = timeseries[-1][1]
else:
datapoint = 0
ensemble = []
# @added 20220315 - Feature #4482: Test alerts
# Allow to test on sparse metrics
if test_alert_and_trigger:
ensemble = [True]
triggered_algorithms = ['histogram_bins']
algorithms_run = ['histogram_bins']
negatives_found = False
# It could have been deleted by the Roomba
except TypeError:
# @added 20200430 - Feature #3480: batch_processing
# Added logging here as the DeletedByRoomba exception is
# generally not related to that but related to some other fail
# in the processing of the run algorithms phase.
# It could have been deleted by the Roomba, but Mirage does not use
# Redis data so probably, definitely was not :)
logger.error(traceback.format_exc())
logger.error('error :: added as DeletedByRoomba but possibly not see traceback above')
exceptions['DeletedByRoomba'] += 1
logger.info('exceptions :: DeletedByRoomba')
except TooShort:
exceptions['TooShort'] += 1
logger.info('exceptions :: TooShort')
except Stale:
exceptions['Stale'] += 1
logger.info('exceptions :: Stale')
except Boring:
exceptions['Boring'] += 1
logger.info('exceptions :: Boring')
except Exception as err:
exceptions['Other'] += 1
logger.info('exceptions :: Other')
logger.error(traceback.format_exc())
logger.error('error :: unhandled error - %s' % err)
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# To overcome the Graphite unfilled last bucket problem, the FULL_DURATION
# data is downsampled using the Pandas origin='end' backwards resample
# method and then aligning to the resolution and this data replaces the
# FULL_DURATION period in the Graphite timeseries data for analysis only.
# This solves the problem of false positive alerts being generated by
# unfilled buckets where the data in the bucket would 'normalise' over
# the duration of the bucket. However often times occur where the
# bucket data only has one or two very high or very low values and then
# the average for that unfilled bucket is skewed, causing an outlier.
# When the the rest of the bucket is filled the outlier is flattened.
# This preprocessing method resolves that issue and removes any
# requirement for extending the Graphite first retention period to a
# value > 7days, which resolves this issue but creates an issue in the
# Ionosphere context where features extraction takes a long time and is
# not suitable for high production workload. Thanks to Pandas for
# introducing backswards resampling in version 1.3.
downsampled_timeseries = None
if anomalous:
try:
logger.info('checking if Graphite data and FULL_DURATION data are different if so will check against realigned downsampled data - anomalous - %s' % (metric))
downsampled_timeseries = downsample_full_duration_and_merge_graphite(self, metric, timeseries, known_derivative_metric)
except Exception as err:
logger.error('error :: downsample_full_duration_and_merge_graphite for %s - %s' % (
metric, err))
if downsampled_timeseries:
try:
logger.info('checking realigned downsampled data for - %s' % (metric))
# @modified 20230118 - Task #4786: Switch from matrixprofile to stumpy
# Task #4778: v4.0.0 - update dependencies
# Added current_func
anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(downsampled_timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms, current_func='mirage')
# Replace the datapoint variable from the preprocessed
# downsampled_timeseries with the actual data point from the
# Graphite data
datapoint = timeseries[-1][1]
except Exception as err:
exceptions['Other'] += 1
logger.info('exceptions :: Other')
logger.error(traceback.format_exc())
logger.error('error :: unhandled error - %s' % err)
# @added 20230418 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
low_variance = 0.009
irregular_unstable_timeseries = False
if anomalous:
start_normalized_var = time()
normalized_var = None
try:
normalized_var = normalized_variance(timeseries)
except Exception as err:
logger.error('error :: normalized_variance failed on timeseries for %s - %s' % (
metric, err))
if isinstance(normalized_var, dict):
err = normalized_var['error']
logger.error('error :: normalized_variance reported an error with timeseries for %s - %s' % (
metric, err))
normalized_var = None
if isinstance(normalized_var, float):
if normalized_var <= low_variance:
irregular_unstable_timeseries = True
logger.info('mirage :: normalized_variance ran with result: %s (took %.6f seconds), for %s' % (
str(normalized_var), (time() - start_normalized_var), metric))
# @added 20221105 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Determine if an anomaly is a normal peak value of normal magnitude
# that occurs daily in a 7 day period
if anomalous and downsampled_timeseries and MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS:
logger.info('mirage :: checking anomalous_daily_peak %s' % (
metric))
result = True
start_anomalous_daily_peak = time()
try:
custom_algorithm = 'anomalous_daily_peak'
custom_algorithms_to_run = {}
custom_algorithms_to_run[custom_algorithm] = {
'namespaces': ['labelled_metrics'],
'algorithm_source': '/opt/skyline/github/skyline/skyline/custom_algorithms/anomalous_daily_peak.py',
# @modified 20230411 - Feature #4724: custom_algorithms - anomalous_daily_peak
# 'algorithm_parameters': {},
'algorithm_parameters': {
'number_of_daily_peaks': MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS,
'within_percent_of_normal_peaks': 20.0,
'debug_logging': True,
},
# 'max_execution_time': 1.0,
'max_execution_time': 2.0,
'consensus': 1,
'algorithms_allowed_in_consensus': ['anomalous_daily_peak'],
# 'debug_logging': False,
'debug_logging': True,
'run_3sigma_algorithms': False,
'run_before_3sigma': False,
'run_only_if_consensus': False,
'trigger_history_override': False,
'use_with': ['mirage'],
}
# use_debug_logging_here = False
use_debug_logging_here = True
result, anomalyScore = run_custom_algorithm_on_timeseries(skyline_app, os.getpid(), metric, downsampled_timeseries, 'anomalous_daily_peak', custom_algorithms_to_run[custom_algorithm], use_debug_logging_here)
algorithms_run.append(custom_algorithm)
ensemble.append(result)
if DEBUG_CUSTOM_ALGORITHMS or use_debug_logging_here:
logger.debug('debug :: mirage :: run_custom_algorithm_on_timeseries run anomalous_daily_peak with result - %s, anomalyScore - %s' % (
str(result), str(anomalyScore)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage :: run_custom_algorithm_on_timeseries anomalous_daily_peak failed on %s - %s' % (
str(metric), err))
result = None
logger.info('mirage :: anomalous_daily_peak ran with result: %s (took %.6f seconds), for %s' % (
str(result), (time() - start_anomalous_daily_peak), metric))
# Although fine in a notebook does not have the desired effect
# in the runtime so convert to a str and check
# if result is False:
if str(result) == 'False':
logger.info('mirage :: anomalous_daily_peak is overrriding anomalous result for %s' % (
metric))
anomalous = False
# @added 20220506 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# If the downsampled data is not anomalous, remove the entry from
# the mirage.trigger_history
if not anomalous:
logger.info('realigned downsampled data not anomalous removing entry from mirage.trigger_history for - %s' % (metric))
trigger_history = {}
try:
raw_trigger_history = self.redis_conn_decoded.hget('mirage.trigger_history', metric)
if raw_trigger_history:
trigger_history = literal_eval(raw_trigger_history)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_algorithms :: failed to evaluate data from mirage.trigger_history Redis hash key - %s' % (
str(err)))
new_trigger_history = {}
last_timestamp = int(timeseries[-1][0])
history_removed = False
for history_timestamp in list(trigger_history.keys()):
if history_timestamp == int(timeseries[-1][0]):
history_removed = True
logger.info('removing entry for %s from mirage.trigger_history - %s' % (
metric, str(trigger_history[history_timestamp])))
continue
new_trigger_history[history_timestamp] = trigger_history[history_timestamp]
if history_removed:
try:
self.redis_conn_decoded.hset('mirage.trigger_history', metric, str(new_trigger_history))
logger.info('updated mirage.trigger_history for %s' % metric)
except Exception as err:
logger.error('error :: failed to set key in mirage.trigger_history Redis hash key - %s' % (
str(err)))
# @added 20230616
# Remove the metric directory
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree - %s' % metric_data_dir)
# @added 20220420 - Feature #4530: namespace.analysed_events
parent_namespace = metric.split('.', maxsplit=1)[0]
date_string = str(strftime('%Y-%m-%d', gmtime()))
namespace_analysed_events_hash = 'namespace.analysed_events.%s.%s' % (skyline_app, date_string)
try:
self.redis_conn.hincrby(namespace_analysed_events_hash, parent_namespace, 1)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to increment %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
try:
self.redis_conn.expire(namespace_analysed_events_hash, (86400 * 15))
logger.info('updated %s Redis hash' % namespace_analysed_events_hash)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to set expire %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Add timings
analysis_run_time = time() - analysis_start_time
logger.info('algorithms analysis completed in %.2f seconds' % (
analysis_run_time))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
logger.info('test_alert triggering anomaly on %s with %s' % (
metric, str(timeseries[-1])))
anomalous = True
datapoint = timeseries[-1][1]
metric_timestamp = timeseries[-1][0]
# @added 20210309 - Task #3730: Validate Mirage running multiple processes
# Reimplement mirage.checks.done count
if not snab_only_check:
try:
self.redis_conn.incr('mirage.checks.done')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to increment mirage.checks.done Redis key - %s' % str(err))
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
if metric.startswith(settings.FULL_NAMESPACE):
base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric
# @added 20210505 - Bug #4048: Mirage - removing feedback metrics to be processed
feedback_cache_key_exists = False
feedback_cache_key = 'mirage.feedback_metric.checked.%s' % (base_name)
try:
feedback_cache_key_exists = self.redis_conn_decoded.get(feedback_cache_key)
except Exception as e:
logger.error('error :: failed to get %s key from Redis - %s' % (
str(feedback_cache_key), e))
if feedback_cache_key_exists:
feedback_processed_cache_key = 'mirage.feedback_metric.processed.%s' % (base_name)
logger.info('feedback metric processed adding Redis key with 600 TTL - %s' % feedback_processed_cache_key)
try:
self.redis_conn.setex(feedback_processed_cache_key, 600, int(analysis_start_time))
except Exception as e:
logger.error('error :: failed to add %s key to Redis - %s' % (
str(feedback_processed_cache_key), e))
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp. waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
literal_analyzer_waterfall_alerts = []
try:
literal_analyzer_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_analyzer_waterfall_alerts = []
analyzer_waterfall_alerts = []
for literal_waterfall_alert in literal_analyzer_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
analyzer_waterfall_alerts.append(waterfall_alert)
# @added 20220504 - Feature #2580: illuminance
# @modified 20230419 - Feature #2580: illuminance
# Moved out of the if anomalous block. Record illuminance for all
# Get the anomaly breakdown - who returned True?
triggered_algorithms = []
for index, boolean_value in enumerate(ensemble):
if boolean_value:
# @modified 20200607 - Feature #3566: custom_algorithms
# algorithm = settings.MIRAGE_ALGORITHMS[index]
algorithm = algorithms_run[index]
anomaly_breakdown[algorithm] += 1
triggered_algorithms.append(algorithm)
if triggered_algorithms:
illuminance_dict = {}
illuminance_dict[base_name] = {
'timestamp': int(metric_timestamp),
'value': float(datapoint),
'triggered_algorithms_count': len(triggered_algorithms)}
logger.info('calling add_illuminance_entries with %s entries to add' % (
str(len(illuminance_dict))))
current_illuminance_dict = {}
try:
current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict)
except Exception as err:
logger.error('error :: add_illuminance_entries failed - %s' % (
err))
logger.info('illuminance Redis hash now has %s entries' % (
str(len(current_illuminance_dict))))
if not anomalous:
not_anomalous_metric = [datapoint, base_name]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.not_anomalous_metrics.append(not_anomalous_metric)
redis_set = 'mirage.not_anomalous_metrics'
data = str(not_anomalous_metric)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
for waterfall_alert in analyzer_waterfall_alerts:
if waterfall_alert[0] == base_name:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# @added 20201128 - Feature #3734: waterfall alerts
# If the check just done is new than an existing analyzer
# waterfall alert metric timestamp remove those keys as well
if int(waterfall_alert[1]) < metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item with older timestamp from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# @added 20210330 - Feature #3994: Panorama - mirage not anomalous
# A hash is added to the mirage.panorama.not_anomalous_metrics for
# every metric that is found to be not anomalous. This provides
# data for /panorama?not_anomalous and /panorama?not_anomalous_metric
# method which are used for plots in the webapp and json response.
# The mirage.panorama.not_anomalous_metrics Redis hash is managed in
# analyzer/metrics_manager
not_anomalous_timestamp = None
try:
not_anomalous_timestamp = int(timeseries[-1][0])
except:
not_anomalous_timestamp = int(metric_timestamp)
redis_hash = 'mirage.panorama.not_anomalous_metrics'
try:
data = {
base_name: {
'timestamp': not_anomalous_timestamp,
'value': datapoint,
'hours_to_resolve': int(hours_to_resolve),
}
}
self.redis_conn.hset(redis_hash, time(), str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis hash %s' % (
str(data), str(redis_hash)))
logger.info('not anomalous :: %s with %s (at full duration), %s (at SECOND_ORDER_RESOLUTION_HOURS)' % (
metric, value, str(datapoint)))
# If it's anomalous, add it to list
if anomalous:
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
if metric.startswith(settings.FULL_NAMESPACE):
base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric
# metric_timestamp = int(timeseries[-1][0])
metric_timestamp = int_metric_timestamp
# Get the anomaly breakdown - who returned True?
# @modified 20230419 - Feature #2580: illuminance
# Moved out of the if anomalous block. Determine
# triggered_algorithms for all to record illuminance
# triggered_algorithms = []
# for index, boolean_value in enumerate(ensemble):
# if boolean_value:
# # @modified 20200607 - Feature #3566: custom_algorithms
# # algorithm = settings.MIRAGE_ALGORITHMS[index]
# algorithm = algorithms_run[index]
# anomaly_breakdown[algorithm] += 1
# triggered_algorithms.append(algorithm)
# @modified 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added second_order_resolution_seconds, triggered_algorithms and algorithms_run
# anomalous_metric = [datapoint, base_name, metric_timestamp]
anomalous_metric = [datapoint, base_name, metric_timestamp, second_order_resolution_seconds, triggered_algorithms, algorithms_run]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.anomalous_metrics.append(anomalous_metric)
redis_set = 'mirage.anomalous_metrics'
data = str(anomalous_metric)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to mirage.anomalous_metrics Redis set' % (
str(data)))
# @added 20220504 - Feature #2580: illuminance
# @modified 20230419 - Feature #2580: illuminance
# Moved out of the if anomalous block to above. Determine
# triggered_algorithms and record illuminance for all that
# triggered
# illuminance_dict = {}
# illuminance_dict[base_name] = {
# 'timestamp': int(metric_timestamp),
# 'value': float(datapoint),
# 'triggered_algorithms_count': len(triggered_algorithms)}
# logger.info('calling add_illuminance_entries with %s entries to add' % (
# str(len(illuminance_dict))))
# current_illuminance_dict = {}
# try:
# current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict)
# except Exception as err:
# logger.error('error :: add_illuminance_entries failed - %s' % (
# err))
# logger.info('illuminance Redis hash now has %s entries' % (
# str(len(current_illuminance_dict))))
# @modified 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Added analysis_run_time
if snab_check_namespace:
redis_key = 'mirage.analysis_run_time.%s.%s' % (base_name, str(metric_timestamp))
try:
self.redis_conn.setex(redis_key, 120, str(analysis_run_time))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add snab analysis_run_time Redis key - %s' % (
redis_key))
# @added 20230427 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Use downsampled_timeseries data
if downsampled_timeseries:
timeseries_dir = metric.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(metric_timestamp),
str(timeseries_dir))
# snab_json_file = '%s/%s.%s.downsampled.json' % (
# settings.IONOSPHERE_DATA_FOLDER, str(int(metric_timestamp)), str(metric))
snab_json_file = '%s/%s.downsampled.json' % (training_dir, metric)
timeseries_json = str(downsampled_timeseries).replace('[', '(').replace(']', ')')
if timeseries_json:
try:
write_data_to_file(skyline_app, snab_json_file, 'w', timeseries_json)
logger.info('added snab downsampled timeseries file - %s' % (
snab_json_file))
except Exception as err:
logger.error(traceback.format_exc())
logger.error(
'error :: failed to add snab timeseries file :: %s' %
snab_json_file)
if not os.path.isfile(snab_json_file):
logger.error('error - the snab_json_file was not created - %s' % (
str(snab_json_file)))
else:
logger.info('snab_json_file exists - %s' % snab_json_file)
else:
logger.error(
'error :: no timeseries_json to add snab timeseries file :: %s' %
snab_json_file)
logger.info('anomaly detected :: %s with %s (at SECOND_ORDER_RESOLUTION_HOURS), %s (at FULL_DURATION)' % (
metric, str(datapoint), str(value)))
# It runs so fast, this allows us to process 30 anomalies/min
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Removed limit
# sleep(2)
# @added 20170206 - Bug #1904: Handle non filesystem friendly metric names in check files
sane_metricname = filesafe_metricname(str(base_name))
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
if run_negatives_present and negatives_found:
redis_set = 'ionosphere.untrainable_metrics'
try:
last_negative_timestamp = int(negatives_found[-1][0])
last_negative_value = negatives_found[-1][1]
remove_after_timestamp = int(last_negative_timestamp + second_order_resolution_seconds)
data = str([base_name, metric_timestamp, datapoint, last_negative_timestamp, last_negative_value, second_order_resolution_seconds, remove_after_timestamp])
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# If Crucible or Panorama are enabled determine details
determine_anomaly_details = False
if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED:
determine_anomaly_details = True
if settings.PANORAMA_ENABLED:
determine_anomaly_details = True
# If Ionosphere is enabled determine details
try:
ionosphere_enabled = settings.IONOSPHERE_ENABLED
if settings.IONOSPHERE_ENABLED:
determine_anomaly_details = True
except:
ionosphere_enabled = False
if determine_anomaly_details:
# metric_timestamp = str(int(timeseries[-1][0]))
from_timestamp = str(int(timeseries[1][0]))
timeseries_dir = base_name.replace('.', '/')
cache_key = 'mirage.last_alert.smtp.%s' % (base_name)
last_alert = False
try:
# @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Changed the last_alert cache key to hold the last
# anomaly timestamp
# last_alert = self.redis_conn.get(cache_key)
last_alert = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % str(e))
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Evaluate the reported anomaly timestamp to determine whether
# EXPIRATION_TIME should be applied to a batch metric
analyzer_batch_anomaly = None
if last_alert:
# Is this a analyzer_batch related anomaly
analyzer_batch_metric_anomaly_key = 'analyzer_batch.anomaly.%s.%s' % (
str(int_metric_timestamp), base_name)
try:
analyzer_batch_anomaly = self.redis_conn_decoded.get(analyzer_batch_metric_anomaly_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s - %s' % (
analyzer_batch_metric_anomaly_key, e))
analyzer_batch_anomaly = None
if analyzer_batch_anomaly:
logger.info('identified as an analyzer_batch triggered anomaly from the presence of the Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
logger.info('not identified as an analyzer_batch triggered anomaly as no Redis key found - %s' % analyzer_batch_metric_anomaly_key)
if last_alert and analyzer_batch_anomaly:
# @modified 20201107 - Feature #3830: metrics_manager
# Optimise to use metrics_manager HGETALL rather than
# iterating the list of lists
# mirage_metrics_expiration_times = []
# try:
# mirage_metrics_expiration_times = list(self.redis_conn_decoded.smembers('mirage.metrics_expiration_times'))
# if LOCAL_DEBUG:
# logger.info('debug :: fetched the mirage.metrics_expiration_times Redis set')
# except:
# logger.info('failed to fetch the mirage.metrics_expiration_times Redis set')
# mirage_metrics_expiration_times = []
# metric_expiration_time = 3600
# try:
# for item_list_string in mirage_metrics_expiration_times:
# mirage_alert_expiration_data = literal_eval(item_list_string)
# if mirage_alert_expiration_data[0] == base_name:
# metric_expiration_time = int(mirage_alert_expiration_data[1])
# break
# except:
# if LOCAL_DEBUG:
# logger.error('error :: failed to determine mirage_alert_expiration_data for %s from the mirage.metrics_expiration_times Redis set' % str(base_name))
# metric_expiration_time = 3600
mirage_metrics_expiration_times = {}
try:
mirage_metrics_expiration_times = self.redis_conn_decoded.hgetall('mirage.hash_key.metrics_expiration_times')
logger.info('%s entries in mirage.hash_key.metrics_expiration_times Redis hash key' % str(len(mirage_metrics_expiration_times)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis hash key mirage.hash_key.metrics_expiration_times')
mirage_metrics_expiration_times = {}
try:
logger.info('%s entries in mirage.hash_key.metrics_expiration_times Redis hash key' % str(len(mirage_metrics_expiration_times)))
metric_expiration_time = int(mirage_metrics_expiration_times[base_name])
logger.info('%s has expiration time of %s' % (base_name, str(metric_expiration_time)))
except:
if LOCAL_DEBUG:
logger.error('error :: failed to determine mirage_alert_expiration_data for %s from the mirage.hash_key.metrics_expiration_times Redis hash key' % str(base_name))
metric_expiration_time = 3600
last_timestamp = None
try:
last_timestamp = int(last_alert)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine last_timestamp from the last Mirage alert key - %s' % cache_key)
last_timestamp = None
seconds_between_batch_anomalies = None
if last_timestamp:
try:
seconds_between_batch_anomalies = int(int_metric_timestamp) - int(last_timestamp)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine seconds_between_batch_anomalies for batch metric Panorama key- %s' % cache_key)
last_timestamp = None
if seconds_between_batch_anomalies:
if seconds_between_batch_anomalies >= int(metric_expiration_time):
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is greater than the metric EXPIRATION_TIME of %s' % (
str(last_timestamp), str(int_metric_timestamp), base_name,
str(metric_expiration_time)))
logger.info('alerting on anomaly for batch metric %s, so setting last_alert to None' % (
metric))
last_alert = None
else:
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is less than the metric EXPIRATION_TIME of %s, not alerting' % (
str(last_timestamp), str(int_metric_timestamp), base_name,
str(metric_expiration_time)))
if int(int_metric_timestamp) < last_timestamp:
logger.info('batch anomaly timestamp (%s) less than the last_check timestamp (%s), alerting on anomaly for batch metric %s, so setting last_alert to None' % (
str(int_metric_timestamp), str(last_timestamp), base_name))
last_alert = None
# @added 20170308 - Feature #1960: ionosphere_layers
# Allow Ionosphere to send Panorama checks, it is an ionosphere_metric
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# Only call once
# try:
# # @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# # Branch #3262: py3
# # ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics'))
# ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics'))
# except:
# ionosphere_unique_metrics = []
added_at = str(int(time()))
# If Panorama is enabled - create a Panorama check
# @modified 20170308 - Feature #1960: ionosphere_layers
# Allow Ionosphere to send Panorama checks for ionosphere_metrics
# if settings.PANORAMA_ENABLED:
send_to_panorama = False
redis_metric_name = '%s%s' % (str(settings.FULL_NAMESPACE), str(base_name))
if settings.PANORAMA_ENABLED:
send_to_panorama = True
if redis_metric_name in ionosphere_unique_metrics:
send_to_panorama = False
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
logger.info('test_alert sending triggered anomaly on %s to Panorama' % (
metric))
send_to_panorama = True
# Panorama must have at least one triggered algorithm
original_triggered_algorithms = list(triggered_algorithms)
if len(triggered_algorithms) == 0:
triggered_algorithms = [algorithms_run[0]]
if send_to_panorama:
if not os.path.exists(settings.PANORAMA_CHECK_PATH):
mkdir_p(settings.PANORAMA_CHECK_PATH)
if analyzer_batch_anomaly:
from_timestamp = int_metric_timestamp - int(second_order_resolution_seconds)
from_timestamp = str(from_timestamp)
# Note:
# The values are enclosed is single quoted intentionally
# as the imp.load_source used results in a shift in the
# decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
source = 'graphite'
panoroma_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'app = \'%s\'\n' \
'source = \'%s\'\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(datapoint), from_timestamp,
# @modified 20200607 - Feature #3566: custom_algorithms
# str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS),
str(int_metric_timestamp), str(algorithms_run),
triggered_algorithms, skyline_app, source,
this_host, added_at)
# Create an anomaly file with details about the anomaly
panoroma_anomaly_file = '%s/%s.%s.txt' % (
settings.PANORAMA_CHECK_PATH, added_at, sane_metricname)
try:
write_data_to_file(
skyline_app, panoroma_anomaly_file, 'w',
panoroma_anomaly_data)
logger.info('added panorama anomaly file :: %s' % (panoroma_anomaly_file))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Move to Redis set block below
# self.sent_to_panorama.append(base_name)
except:
logger.error('error :: failed to add panorama anomaly file :: %s' % (panoroma_anomaly_file))
logger.error(traceback.format_exc())
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved from the above self.sent_to_panorama
redis_set = 'mirage.sent_to_panorama'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20210323 - Feature #3642: Anomaly type classification
if LUMINOSITY_CLASSIFY_ANOMALIES:
redis_set = 'luminosity.classify_anomalies'
data_dict = {
'metric': metric,
'timestamp': int_metric_timestamp,
'value': datapoint,
'algorithms': algorithms_run,
'triggered_algorithms': triggered_algorithms,
'app': skyline_app,
'added_at': int(added_at),
}
data = [metric, int_metric_timestamp, int(added_at), data_dict]
try:
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
triggered_algorithms = list(original_triggered_algorithms)
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
for waterfall_alert in analyzer_waterfall_alerts:
if waterfall_alert[0] == base_name:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# If crucible is enabled - save timeseries and create a
# crucible check
if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED:
from_timestamp = str(int(timeseries[1][0]))
timeseries_dir = base_name.replace('.', '/')
crucible_anomaly_dir = str(settings.CRUCIBLE_DATA_FOLDER) + '/' + timeseries_dir + '/' + metric_timestamp
if not os.path.exists(crucible_anomaly_dir):
mkdir_p(crucible_anomaly_dir)
# Note:
# The value is enclosed is single quoted intentionally
# as the imp.load_source used in crucible results in a
# shift in the decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
crucible_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'anomaly_dir = \'%s\'\n' \
'graphite_metric = True\n' \
'run_crucible_tests = False\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(datapoint), from_timestamp,
# @modified 20200607 - Feature #3566: custom_algorithms
# str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS),
str(int_metric_timestamp), str(algorithms_run),
triggered_algorithms, crucible_anomaly_dir,
skyline_app, added_at)
# Create an anomaly file with details about the anomaly
crucible_anomaly_file = '%s/%s.txt' % (crucible_anomaly_dir, sane_metricname)
try:
write_data_to_file(
skyline_app, crucible_anomaly_file, 'w',
crucible_anomaly_data)
logger.info('added crucible anomaly file :: %s' % (crucible_anomaly_file))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.sent_to_crucible.append(base_name)
except:
logger.error('error :: failed to add crucible anomaly file :: %s' % (crucible_anomaly_file))
logger.error(traceback.format_exc())
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved from the above self.sent_to_crucible
redis_set = 'mirage.sent_to_crucible'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# Create timeseries json file with the timeseries
json_file = '%s/%s.json' % (crucible_anomaly_dir, base_name)
timeseries_json = str(timeseries).replace('[', '(').replace(']', ')')
try:
write_data_to_file(skyline_app, json_file, 'w', timeseries_json)
logger.info('added crucible timeseries file :: %s' % (json_file))
except:
logger.error('error :: failed to add crucible timeseries file :: %s' % (json_file))
logger.error(traceback.format_exc())
# Create a crucible check file
crucible_check_file = '%s/%s.%s.txt' % (settings.CRUCIBLE_CHECK_PATH, metric_timestamp, sane_metricname)
try:
write_data_to_file(
skyline_app, crucible_check_file, 'w',
crucible_anomaly_data)
logger.info('added crucible check :: %s,%s' % (base_name, metric_timestamp))
except:
logger.error('error :: failed to add crucible check file :: %s' % (crucible_check_file))
logger.error(traceback.format_exc())
# @added 20230510 - Feature #4902: Prevent training on metrics newer than 7 days
new_metric_added_at = False
if ionosphere_enabled and not last_alert:
try:
new_metric_added_at = self.redis_conn_decoded.hget('metrics_manager.untrainable_new_metrics', base_name)
except Exception as err:
logger.error('error :: failed to hget on metrics_manager.untrainable_new_metrics - %s' % (
err))
if new_metric_added_at:
try:
new_until = int(float(new_metric_added_at)) + (86400 * 7)
new_until_date = datetime.datetime.fromtimestamp(new_until).strftime('%Y-%m-%d %H:%M:%S')
logger.info('not sending %s to Ionosphere as still a new metric until %s' % (
base_name, new_until_date))
except Exception as err:
logger.error('error :: failed to determine when %s matures - %s' % (
base_name, err))
ionosphere_enabled = False
# @added 20160922 - Branch #922: Ionosphere
# Also added the send_anomalous_metric_to skyline_functions.py
# function
if ionosphere_enabled:
if not last_alert:
# @modified 20161228 Feature #1830: Ionosphere alerts
# Added full_duration which needs to be recorded to allow Mirage metrics
# to be profiled on Redis timeseries data at FULL_DURATION
# e.g. mirage.redis.24h.json
full_duration = str(second_order_resolution_seconds)
# @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Added ionosphere_parent_id, always zero from Analyzer and Mirage
ionosphere_parent_id = 0
send_anomalous_metric_to(
skyline_app, 'ionosphere', timeseries_dir,
str(int_metric_timestamp), base_name, str(datapoint),
from_timestamp, triggered_algorithms, timeseries,
full_duration, str(ionosphere_parent_id),
# @added 20201001 - Task #3748: POC SNAB
# Added algorithms_run required to determine the anomalyScore
# so this needs to be sent to Ionosphere so Ionosphere
# can send it back on an alert
algorithms_run)
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved to the mirage.sent_to_ionosphere Redis set Redis set
# block below
# self.sent_to_ionosphere.append(base_name)
# @added 20200804 - Feature #3462: Add IONOSPHERE_MANAGE_PURGE
# Feature #3472: ionosphere.training_data Redis set
# Feature #3474: webapp api - training_data
# Add training data to the ionosphere.training_data so that
# the ionosphere purge_old_data_dirs can happen less
# frequently for reduced I/O
redis_set = 'ionosphere.training_data'
data = [base_name, int(int_metric_timestamp), second_order_resolution_seconds]
try:
logger.info('adding to Redis set %s - %s' % (
redis_set, str(data)))
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to %s Redis set' % (str(data), redis_set))
else:
logger.info('alert expiry key exists not sending to Ionosphere :: %s' % base_name)
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
# Do not remove if this is only for training_data creation
if redis_metric_name in ionosphere_unique_metrics:
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
mirage_waterfall_data = []
for waterfall_alert in analyzer_waterfall_alerts:
if waterfall_alert[0] == base_name:
if int(waterfall_alert[1]) == metric_timestamp:
mirage_waterfall_data = waterfall_alert
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved from the above self.sent_to_ionosphere
if not last_alert:
redis_set = 'mirage.sent_to_ionosphere'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
logger.info('test_alert not sending anomaly on %s to ionosphere' % (
metric))
ionosphere_unique_metrics = []
# @added 20200904 - Feature #3734: waterfall alerts
# Add mirage waterfall alert
# Only add if this is an ionosphere_enabled metric_check_file
if redis_metric_name in ionosphere_unique_metrics:
if mirage_waterfall_data:
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
waterfall_data = mirage_waterfall_data
# @added 20201008 - Bug #3776: waterfall_alert - no analyzer triggered_algorithms in waterfall_panorama_data on MIRAGE_ALWAYS_METRICS
# When a MIRAGE_ALWAYS_METRICS on metric is sent
# through to Mirage from Analyzer the sometimes has
# no algorithms that triggered_algorithms as the
# metric is sent every run, this can be expected.
# However if the Mirage three-sigma check does
# trigger algorithms they need to be added here so
# that when metric and event are sent to Panorama
# the triggered_algorithms is populated
if base_name in MIRAGE_ALWAYS_METRICS:
from_timestamp = str(int(timeseries[1][0]))
waterfall_panorama_data = [
base_name, datapoint, int(timeseries[1][0]),
int(timeseries[-1][0]), algorithms_run,
triggered_algorithms, skyline_app,
skyline_app, this_host,
waterfall_alert[4][9]
]
# Use the original added_to_waterfall_timestamp
added_to_waterfall_timestamp = waterfall_data[3]
# @modified 20201009 - Bug #3776: waterfall_alert - no analyzer triggered_algorithms in waterfall_panorama_data on MIRAGE_ALWAYS_METRICS
# corrected datapoint, timestamp order
waterfall_data = [
base_name, int(timeseries[-1][0]), datapoint,
added_to_waterfall_timestamp, waterfall_panorama_data
]
redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere'
try:
self.redis_conn.sadd(redis_set, str(waterfall_data))
logger.info('added to Redis set %s - %s' % (redis_set, str(waterfall_data)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(waterfall_data), str(redis_set)))
# Add values to the queue so the parent process can collate
for key, ab_value in anomaly_breakdown.items():
self.mirage_anomaly_breakdown_q.put((key, ab_value))
for key, e_value in exceptions.items():
self.mirage_exceptions_q.put((key, e_value))
metric_var_files = []
timeseries = []
if os.path.isfile(metric_check_file):
# Remove metric check file
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
logger.error('error :: failed to remove check file - %s' % metric_check_file)
# Remove the metric directory
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree %s' % metric_data_dir)
# @added 20200723 - Feature #3472: ionosphere.training_data Redis set
# Feature #3566: custom_algorithms
# Optimize for MIRAGE_ALWAYS_METRICS which can create a lot
# of training_data dirs a Analyzer always hands them off to
# mirage.
remove_ionosphere_data_dir = False
if not anomalous:
if base_name in MIRAGE_ALWAYS_METRICS:
remove_ionosphere_data_dir = True
if not anomalous and periodic_mirage_check:
remove_ionosphere_data_dir = True
# @added 20190408 - Feature #2882: Mirage - periodic_check
# Remove the training_dir for mirage_periodic_check_metrics if not
# anomalous
# @modified 20200723 - Feature #3472: ionosphere.training_data Redis set
# Feature #3566: custom_algorithms
# if not anomalous and periodic_mirage_check:
if remove_ionosphere_data_dir:
timeseries_dir = base_name.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(metric_timestamp),
str(timeseries_dir))
if os.path.exists(training_dir):
try:
rmtree(training_dir)
logger.info('removed Mirage always or periodic check training_data dir - %s' % training_dir)
except:
logger.error('error :: failed to rmtree Mirage always or periodic check training_dir - %s' % training_dir)
if not anomalous and periodic_mirage_check:
del mirage_periodic_check_metrics
logger.info('processed %s checks in %.2f seconds' % (
str(len(processing_check_files)), (time() - start_time)))
[docs] def run(self):
"""
Called when the process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
else:
logger.info('bin/%s.d log management done' % skyline_app)
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
def smtp_trigger_alert(alert, metric, second_order_resolution_seconds, context, triggered_algorithms):
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
try:
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
p = Process(target=self.spawn_alerter_process, args=(alert, metric, second_order_resolution_seconds, context, triggered_algorithms))
pids.append(p)
pid_count += 1
p.start()
spawned_pids.append(p.pid)
except:
logger.error('error :: failed to spawn_alerter_process')
logger.error(traceback.format_exc())
p_starts = time()
while time() - p_starts <= 15:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing the spawn_trigger_alert process' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('%s :: stopping spawn_alerter_process - %s' % (skyline_app, str(p.is_alive())))
p.join()
# DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
gc.enable()
gc.set_debug(gc.DEBUG_LEAK)
# As per http://stackoverflow.com/a/1641280
# This got useable understandable data with gc
before = defaultdict(int)
after = defaultdict(int)
for i in get_objects():
before[type(i)] += 1
if LOCAL_DEBUG:
logger.info('debug :: Memory usage in run at start: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
last_sent_to_graphite = int(time())
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
filesafe_names_dict = {}
# @added 20230609 - Task #4806: Manage NUMBA_CACHE_DIR
# Feature #4702: numba optimisations
# Use start up key and allow numba cache files to be created
starting = True
start_key = '%s.starting' % skyline_app
try:
self.redis_conn.setex(start_key, 180, int(now))
except Exception as err:
logger.error('error :: Analyzer could not create Redis %s key - %s' % (
start_key, err))
while 1:
now = time()
# DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
# As per http://stackoverflow.com/a/1641280
# This got useable understandable data with gc
before = defaultdict(int)
after = defaultdict(int)
for i in get_objects():
before[type(i)] += 1
if LOCAL_DEBUG:
logger.info('debug :: Memory usage before looking for checks: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# Make sure Redis is up
try:
self.redis_conn.ping()
except:
logger.error('error :: skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
logger.info('attempting to connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
# @modified 20191113 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as err:
logger.error('error :: failed to connect to Redis - %s' % err)
# @modified 20220429 - Feature #4536: Handle Redis failure
# if self.redis_conn.ping():
# logger.info('connected to redis')
# continue
try:
self.redis_conn.ping()
logger.info('connected to redis')
except Exception as err:
logger.error('error :: failed to ping Redis - %s' % err)
# Determine if any metric to analyze or Ionosphere alerts to be sent
while True:
# Report app up
# @modified 20210524 - Branch #1444: thunder
# Report app AND Redis as up
# self.redis_conn.setex(skyline_app, 120, now)
try:
redis_is_up = self.redis_conn.setex(skyline_app, 120, now)
if redis_is_up:
try:
self.redis_conn.setex('redis', 120, now)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: could not update the Redis redis key - %s' % (
e))
except Exception as e:
logger.error('error :: failed to update Redis key for %s up - %s' % (skyline_app, e))
# @added 20200604 - Mirage - populate_redis
# This functionality enables Mirage to populate the Skyline
# Redis instance with FULL_DURATION data from Graphite if
# Analyzer flags the time series as TooShort and adds it too the
# mirage.populate_redis Redis set. Or possibly if there are
# airgaps in the Redis data due to a network partition. It will
# fill a metric about every 10 seconds or so, unless there are
# Mirage checks or ionosphere_alerts to send
populate_redis_with_metrics = []
if MIRAGE_AUTOFILL_TOOSHORT:
try:
populate_redis_with_metrics = list(self.redis_conn_decoded.smembers('mirage.populate_redis'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis set mirage.populate_redis')
populate_redis_with_metrics = []
metric_to_populate_redis = None
populate_redis_with_metrics_count = 0
if populate_redis_with_metrics:
populate_redis_with_metrics_count = len(populate_redis_with_metrics)
logger.info('%s metrics found in mirage.populate_redis Redis set' % str(populate_redis_with_metrics_count))
try:
metric_to_populate_redis = str(populate_redis_with_metrics[0])
try:
del populate_redis_with_metrics
except:
pass
logger.info('processing %s from mirage.populate_redis Redis set' % metric_to_populate_redis)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine metric to populate_redis')
metric_to_populate_redis = None
if metric_to_populate_redis:
try:
# Spawn a populate_redis processes
pids = []
spawned_pids = []
p = Process(target=self.populate_redis, args=(1, metric_to_populate_redis))
pids.append(p)
logger.info('starting populate_redis process')
p.start()
spawned_pids.append(p.pid)
p_starts = time()
while time() - p_starts <= 10:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('populate_redis process completed in %.2f seconds' % (
time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('timed out, killing populate_redis process')
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('stopping populate_redis process - %s' % (str(p.is_alive())))
p.join()
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to spawn populate_redis process')
# @added 20161228 - Feature #1828: ionosphere - mirage Redis data features
# If Ionosphere is going to pass alerts back to the app
# here we are going to have break out and force a alerting
# only run.
ionosphere_alerts = None
ionosphere_alerts_returned = False
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
test_alerts = {}
test_alert_metrics = []
try:
test_alerts = get_test_alerts(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: get_test_alerts failed - %s' % err)
if test_alerts:
for test_alert_timestamp in list(test_alerts.keys()):
use_timestamp = int(float(test_alert_timestamp))
logger.info('test_alert found: %s' % (
str(test_alerts[test_alert_timestamp])))
labelled_metric_name = None
anomaly_check_file = None
try:
metric = test_alerts[test_alert_timestamp]['metric']
use_metric = None
alert_tested_key = 'mirage.test_alerts.done.%s' % metric
alert_tested = None
try:
alert_tested = self.redis_conn_decoded.get(alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s - %s' % (
alert_tested_key, err))
if not alert_tested and '_tenant_id="' in metric:
try:
metric_id = get_metric_id_from_base_name(skyline_app, metric)
if metric_id:
use_metric = 'labelled_metrics.%s' % str(metric_id)
except Exception as err:
logger.error('error :: get_metric_id_from_base_name failed for test alert metric %s - %s' % (
metric, err))
if use_metric:
alert_tested_key = 'mirage.test_alerts.done.%s' % use_metric
try:
alert_tested = self.redis_conn_decoded.get(alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s - %s' % (
alert_tested_key, err))
if alert_tested:
test_alert_redis_key = '%s.test_alerts' % skyline_app
self.redis_conn_decoded.hdel(test_alert_redis_key, test_alert_timestamp)
logger.info('removed test_alert for %s from Redis key %s' % (
str(metric), str(test_alert_redis_key)))
test_alert_metrics.append(metric)
if use_metric:
test_alert_metrics.append(use_metric)
continue
use_metric = str(metric)
if '_tenant_id="' in metric:
metric_id = 0
try:
metric_id = get_metric_id_from_base_name(skyline_app, metric)
except Exception as err:
logger.error('error :: test_alert - get_metric_id_from_base_name failed for %s - %s' % (
str(metric), err))
metric = 'labelled_metrics.%s' % str(metric_id)
labelled_metric_name = str(metric)
if metric.startswith('labelled_metrics.'):
labelled_metric_name = str(metric)
metric_id_str = metric.replace('labelled_metrics.', '', 1)
metric_id = int(metric_id_str)
try:
trigger_anomaly = test_alerts[test_alert_timestamp]['trigger_anomaly']
except KeyError:
trigger_anomaly = False
if trigger_anomaly:
test_alert_metrics.append(metric)
logger.info('test_alert found for %s: %s' % (
str(metric), str(test_alerts[test_alert_timestamp])))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed determine test_alert details from test_alerts: %s - %s' % (
str(test_alerts), err))
continue
if not labelled_metric_name:
try:
sane_metricname = filesafe_metricname(str(metric))
anomaly_check_file = '%s/%s.%s.txt' % (settings.MIRAGE_CHECK_PATH, str(use_timestamp), sane_metricname)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine name of test_alert anomaly_check_file for: %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
anomaly_check_file = None
mirage_anomaly_check_file_created = False
use_hours_to_resolve = 168
triggered_algorithms = []
snab_check_only = False
if anomaly_check_file:
try:
with open(anomaly_check_file, 'w') as fh:
fh.write('metric = "%s"\nvalue = "%s"\nhours_to_resolve = "%s"\nmetric_timestamp = "%s"\nsnab_only_check = "%s"\ntriggered_algorithms = %s\n' % (
metric, 1, str(use_hours_to_resolve),
str(use_timestamp), str(snab_check_only),
str(triggered_algorithms)))
mirage_anomaly_check_file_created = True
logger.info('added test_alert anomaly_check_file: %s' % (
str(anomaly_check_file)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to write test_alert anomaly_check_file for: %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
if mirage_anomaly_check_file_created:
if python_version == 3:
os.chmod(anomaly_check_file, mode=0o644)
if labelled_metric_name:
metric_data = {
'metric': use_metric,
'metric_id': int(metric_id),
'metric_dict': {},
'from_timestamp': (use_timestamp - (use_hours_to_resolve * 3600)),
'timestamp': use_timestamp,
'value': 1,
'triggered_algorithms': ['testing'],
'algorithms_run': triggered_algorithms,
'snab_only_check': False,
'test_alert': True,
'trigger_anomaly': trigger_anomaly,
'processed_by': {
'analyzer_labelled_metrics': {
'timestamp': int(time()),
'triggered_algorithms': ['testing'],
'algorithms_run': triggered_algorithms,
'waterfall_panorama_data': [],
'process_number': 1,
},
},
}
redis_hash = 'analyzer_labelled_metrics.mirage_check'
try:
hash_key = '%s.%s' % (str(metric_id), str(int(time())))
self.redis_conn.hset(redis_hash, hash_key, str(metric_data))
logger.info('added mirage check for %s with metric_data: %s' % (
str(labelled_metric_name), str(metric_data)))
except Exception as err:
logger.error('error :: %s' % str([labelled_metric_name, 'sadd analyzer_labelled_metrics.anomalous_metrics', str(err)]))
logger.info('added test_alert anomaly check key to analyzer_labelled_metrics.anomalous_metrics')
# try:
# self.redis_conn_decoded.hdel('mirage.test_alerts', str(test_alert_timestamp))
# except Exception as err:
# logger.error('error :: failed to delete %s from mirage.test_alerts' % str(test_alert_timestamp))
metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))]
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Do not pospone the Ionosphere alerts check on based on whether
# there are checks on not
# if len(metric_var_files) == 0:
if not ionosphere_alerts_returned:
# @modified 20161228 - Feature #1830: Ionosphere alerts
try:
# @modified 20200430 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_alerts = list(self.redis_conn.scan_iter(match='ionosphere.mirage.alert.*'))
ionosphere_alerts = list(self.redis_conn_decoded.scan_iter(match='ionosphere.mirage.alert.*'))
ionosphere_alerts_returned = True
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to scan ionosphere.mirage.alert.* from Redis')
ionosphere_alerts = []
if len(ionosphere_alerts) == 0:
ionosphere_alerts_returned = False
else:
logger.info('Ionosphere alert requested :: %s' % str(ionosphere_alerts))
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Do not pospone the Ionosphere alerts check
# if not ionosphere_alerts_returned:
# logger.info('sleeping no metrics...')
# sleep(10)
# @added 20191106 - Branch #3262: py3
if os.path.isfile(alert_test_file):
test_alert = None
try:
with open((alert_test_file), 'r') as fh:
raw_test_alert = fh.read()
test_alert = literal_eval(raw_test_alert)
# [metric, alerter]
# e.g. ['server-1.cpu.user', 'smtp']
# e.g. ['server-1.cpu.user', 'slack']
# e.g. ['skyline_test.alerters.test', 'smtp']
except:
logger.error(traceback.format_exc())
logger.error('error :: could not evaluate test_alert from %s' % alert_test_file)
if test_alert:
try:
logger.info('test alert metric found - alerting on %s' % str((test_alert)))
metric_name = str(test_alert[0])
test_alerter = str(test_alert[1])
alerter_id = None
try:
alerter_id = str(test_alert[2])
except:
pass
metric = (1, metric_name, int(time()))
alert = (metric_name, test_alerter, 10)
if alerter_id:
alert = (metric_name, test_alerter, 10, None, {'type': 'external', 'id': alerter_id})
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
triggered_algorithms = ['testing']
if settings.SLACK_ENABLED and test_alerter == 'slack':
logger.info('test alert to slack for %s' % (metric_name))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
# @added 20220301 - Feature #4482: Test alerts
# Allow all alert types to be tested
if test_alerter.startswith('http_alerter'):
trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
if test_alerter in ['sms', 'pagerduty']:
trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
if test_alerter == 'smtp':
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
smtp_trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
except:
logger.error('error :: test trigger_alert - %s' % traceback.format_exc())
logger.error('error :: failed to test trigger_alert :: %s' % metric_name)
try:
os.remove(alert_test_file)
except OSError:
logger.error('error - failed to remove %s, continuing' % alert_test_file)
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Move this len(metric_var_files) from above and apply the
# appropriatte sleep
if len(metric_var_files) == 0:
if not ionosphere_alerts_returned:
# @modified 20200604 - Mirage - populate_redis
# Do not sleep if there are metrics to populate in Redis
if populate_redis_with_metrics_count == 0:
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
sleep_for = 10
next_send_to_graphite = last_sent_to_graphite + 60
seconds_to_next_send_to_graphite = next_send_to_graphite - int(time())
if seconds_to_next_send_to_graphite < 10:
if seconds_to_next_send_to_graphite > 1:
sleep_for = seconds_to_next_send_to_graphite
else:
break
logger.info('sleeping no metrics...')
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# sleep(10)
sleep(sleep_for)
else:
logger.info('no checks or alerts, continuing to process populate_redis metrics')
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Removed sleep, no delay
# else:
# sleep(1)
# @added 20220113 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Do not remove batch_processing checks
batch_processing_metrics = []
try:
batch_processing_metrics = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
except:
logger.error('error :: could not get aet.analyzer.batch_processing_metrics from Redis')
logger.error(traceback.format_exc())
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
if not filesafe_names_dict:
try:
filesafe_names_dict = self.redis_conn_decoded.hgetall('metrics_manager.filesafe_base_names')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: hgetall metrics_manager.filesafe_base_names failed - %s' % err)
# Clean up old files
now_timestamp = time()
stale_age = now_timestamp - settings.MIRAGE_STALE_SECONDS
for current_file in listdir(settings.MIRAGE_CHECK_PATH):
if os.path.isfile(settings.MIRAGE_CHECK_PATH + "/" + current_file):
t = os.stat(settings.MIRAGE_CHECK_PATH + "/" + current_file)
c = t.st_ctime
# @added 20220113 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Do not remove batch_processing checks
for b_metric in batch_processing_metrics:
if b_metric in current_file:
continue
# delete file if older than a week
if c < stale_age:
os.remove(settings.MIRAGE_CHECK_PATH + "/" + current_file)
logger.info('removed stale check - %s' % (current_file))
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
redis_set = 'mirage.stale_check_discarded'
try:
self.redis_conn.sadd(redis_set, str(current_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(current_file), str(redis_set)))
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
filesafe_names_list = []
if filesafe_names_dict:
filesafe_names_list = list(filesafe_names_dict.keys())
# @added 20230605 - Feature #4932: mute_alerts_on
mute_alerts_on_dict = {}
try:
mute_alerts_on_dict = self.redis_conn_decoded.hgetall('metrics_manager.mute_alerts_on')
except Exception as err:
logger.error('error :: failed to hgetall metrics_manager.mute_alerts_on - %s' % (
err))
mute_alerts_on = [i_metric for i_metric in list(mute_alerts_on_dict.keys()) if not i_metric.startswith('labelled_metrics.')]
if mute_alerts_on:
logger.info('adding %s mute_alert_on metrics to SKYLINE_FEEDBACK_NAMESPACES' % str(len(mute_alerts_on)))
CURRENT_SKYLINE_FEEDBACK_NAMESPACES = list(set(SKYLINE_FEEDBACK_NAMESPACES + mute_alerts_on))
# @added 20201026 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle feedback metrics in a similar style to Ionosphere
# Do not run checks if namespace has matched multiple times in
# the last 10 minutes.
if len(metric_var_files) > 3:
analyzer_waterfall_alerts = []
feedback_metric_loop_error_logged = False
for current_file in listdir(settings.MIRAGE_CHECK_PATH):
feedback_metric = False
remove_feedback_metric_check = False
remove_alerted_on_metric_check = False
try:
current_file_no_extension = current_file.replace('.txt', '')
current_file_no_extension_elements = current_file_no_extension.split('.')
base_name = '.'.join(current_file_no_extension_elements[1:])
metric_timestamp = int(current_file_no_extension_elements[0])
except:
pass
# @modified 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
use_base_name = str(base_name)
if base_name in filesafe_names_list:
use_base_name = filesafe_names_dict[base_name]
try:
metric_namespace_elements = base_name.split('.')
# @modified 20230605 - Feature #4932: mute_alerts_on
# for to_skip in SKYLINE_FEEDBACK_NAMESPACES:
for to_skip in CURRENT_SKYLINE_FEEDBACK_NAMESPACES:
if to_skip in base_name:
feedback_metric = True
break
to_skip_namespace_elements = to_skip.split('.')
elements_matched = set(metric_namespace_elements) & set(to_skip_namespace_elements)
if len(elements_matched) == len(to_skip_namespace_elements):
feedback_metric = True
break
feedback_cache_key_exists = False
feedback_cache_key = 'mirage.feedback_metric.checked.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
feedback_cache_key = 'mirage.feedback_metric.checked.%s' % (use_base_name)
feedback_metric_process_time = int(time())
# @added 20210701 - Feature #4152: DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES
if feedback_metric:
pattern_match = False
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES)
del metric_matched_by
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: matched_or_regexed_in_list failed checking %s in DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES - %s' % (
base_name, e))
pattern_match = False
if pattern_match:
feedback_metric = False
logger.info('%s matched DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES, will analyse' % base_name)
if feedback_metric:
try:
feedback_cache_key_exists = self.redis_conn_decoded.get(feedback_cache_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get %s key from Redis' % (
str(feedback_cache_key)))
if feedback_cache_key_exists:
feedback_metric_last_processed_seconds_ago = feedback_metric_process_time - int(feedback_cache_key_exists)
logger.info('feedback metric identified as last processed %s seconds ago via Redis key %s' % (
str(feedback_metric_last_processed_seconds_ago), feedback_cache_key))
remove_feedback_metric_check = True
# @added 20210505 - Bug #4048: Mirage - removing feedback metrics to be processed
feedback_processed_cache_key = 'mirage.feedback_metric.processed.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
feedback_processed_cache_key = 'mirage.feedback_metric.processed.%s' % (use_base_name)
feedback_processed_cache_key_exists = None
try:
feedback_processed_cache_key_exists = self.redis_conn_decoded.get(feedback_processed_cache_key)
except Exception as e:
logger.error('error :: failed to get %s key from Redis - %s' % (
str(feedback_processed_cache_key), e))
if not feedback_processed_cache_key_exists:
remove_feedback_metric_check = False
logger.info('feedback metric Redis key %s does not exist, not removing metric' % feedback_processed_cache_key)
if len(metric_var_files) > 10 and not feedback_cache_key_exists:
logger.info('Mirage is busy removing feedback metric check')
remove_feedback_metric_check = True
# @modified 20201128 - Feature #3734: waterfall alerts
# Only add if does not exist and always add
# else:
# try:
# self.redis_conn.setex(feedback_cache_key, 600, feedback_metric_process_time)
# if not feedback_cache_key_exists:
if not feedback_cache_key_exists and not remove_feedback_metric_check:
logger.info('feedback metric identified as not processed in last 600 seconds adding Redis key with 600 TTL and processing - %s' % feedback_cache_key)
try:
self.redis_conn.setex(feedback_cache_key, 600, feedback_metric_process_time)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s key to Redis' % (
str(feedback_cache_key)))
except:
if not feedback_metric_loop_error_logged:
logger.error(traceback.format_exc())
logger.error('error :: failed to check feedback and alerted on metrics')
feedback_metric_loop_error_logged = True
# Remove checks that have been alerted on by Mirage or
# via an Analyzer waterfall alert
if len(metric_var_files) > 10 and not remove_feedback_metric_check:
cache_key = 'mirage.last_alert.smtp.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
cache_key = 'mirage.last_alert.smtp.%s' % (use_base_name)
alerted_on = False
try:
alerted_on = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % str(e))
if not alerted_on:
# Check for Analyzer alert key from waterfall alert
cache_key = 'last_alert.smtp.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
cache_key = 'last_alert.smtp.%s' % (use_base_name)
try:
alerted_on = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % str(e))
if alerted_on:
remove_alerted_on_metric_check = True
# Unless is it older than PANORAMA_EXPIRY_TIME
try:
alerted_on_at = int(alerted_on)
alerted_on_seconds_ago = int(time()) - alerted_on_at
if alerted_on_seconds_ago >= settings.PANORAMA_EXPIRY_TIME:
remove_alerted_on_metric_check = False
except Exception as e:
logger.error('error :: failed determining if alerted more than PANORAMA_EXPIRY_TIME seconds ago - %s' % str(e))
remove_alerted_on_metric_check = True
if remove_feedback_metric_check or remove_alerted_on_metric_check:
if remove_feedback_metric_check:
log_str = 'feedback metric'
if remove_alerted_on_metric_check:
log_str = 'alerted on metric'
logger.info('removing %s %s check file and from analyzer.waterfall_alerts.sent_to_mirage Redis set' % (
log_str, base_name))
try:
os.remove(settings.MIRAGE_CHECK_PATH + "/" + current_file)
except:
logger.error('error :: failed to remove %s %s check file - %s' % (
log_str, base_name, current_file))
# Remove the metric from the waterfall_alerts Redis set
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp. waterfall_panorama_data]
if not analyzer_waterfall_alerts:
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
literal_analyzer_waterfall_alerts = []
try:
literal_analyzer_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_analyzer_waterfall_alerts = []
analyzer_waterfall_alerts = []
for literal_waterfall_alert in literal_analyzer_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
analyzer_waterfall_alerts.append(waterfall_alert)
for waterfall_alert in analyzer_waterfall_alerts:
# @modified 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
# if waterfall_alert[0] == base_name:
if waterfall_alert[0] == use_base_name:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item for %s from Redis set %s - %s' % (
log_str, redis_set, str(waterfall_alert)))
# @modified 20201128 - Feature #3734: waterfall alerts
# Do not break, check and remove
# waterfall_alert items with older
# timestamps as well
# break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s %s at %s from Redis set %s' % (
log_str, base_name, str(metric_timestamp), redis_set))
# @added 20201128 - Feature #3734: waterfall alerts
# If the check just done is new than an existing analyzer
# waterfall alert metric timestamp remove those keys as well
if int(waterfall_alert[1]) < metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item with older timestamp for %s from Redis set %s - %s' % (
log_str, redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s %s at %s from Redis set %s' % (
log_str, base_name, str(metric_timestamp), redis_set))
# Discover metric to analyze
metric_var_files = ''
# @added 20161228 - Feature #1830: Ionosphere alerts
# Prioritises Ionosphere alerts
if ionosphere_alerts_returned:
break
metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))]
if len(metric_var_files) > 0:
break
process_metric_check_files = False
# @modified 20161228 - Feature #1830: Ionosphere alerts
# Only spawn process if this is not an Ionosphere alert
if not ionosphere_alerts_returned:
metric_var_files_sorted = sorted(metric_var_files)
# metric_check_file = settings.MIRAGE_CHECK_PATH + "/" + metric_var_files_sorted[0]
if metric_var_files_sorted:
process_metric_check_files = True
# Grab data from the queue and populate dictionaries
exceptions = {}
anomaly_breakdown = {}
ionosphere_unique_metrics = []
if process_metric_check_files:
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
check_files_to_process = len(metric_var_files_sorted)
logger.info('%s checks to process' % str(check_files_to_process))
# Remove any existing algorithm.error files from any previous runs
# that did not cleanup for any reason
pattern = '%s.*.algorithm.error' % skyline_app
try:
for f in os.listdir(settings.SKYLINE_TMP_DIR):
if re.search(pattern, f):
try:
os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f))
logger.info('cleaning up old error file - %s' % (str(f)))
except OSError:
pass
except:
logger.error('failed to cleanup mirage_algorithm.error files - %s' % (traceback.format_exc()))
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# MIRAGE_PROCESSES = 1
if len(metric_var_files) > 1:
try:
MIRAGE_PROCESSES = int(settings.MIRAGE_PROCESSES)
if len(metric_var_files) < MIRAGE_PROCESSES:
MIRAGE_PROCESSES = len(metric_var_files)
except:
MIRAGE_PROCESSES = 1
else:
MIRAGE_PROCESSES = 1
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# processing_check_file = metric_var_files_sorted[0]
# logger.info('processing %s' % processing_check_file)
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes
# for i in range(1, MIRAGE_PROCESSES + 1):
# up_to = i - 1
# processing_check_file = metric_var_files_sorted[up_to]
# logger.info('processing %s' % processing_check_file)
# @added 20221014 - Feature #4576: mirage - process multiple metrics
max_metrics = 30 * MIRAGE_PROCESSES
multiple_check_files = metric_var_files_sorted[0:max_metrics]
last_index = 0
process_index_range = 29
if len(multiple_check_files) < 60:
process_index_range = int(len(multiple_check_files) / MIRAGE_PROCESSES) - 1
if len(multiple_check_files) < 30:
process_index_range = 30
MIRAGE_PROCESSES = 1
process_single_check_mode = False
# @modified 20161224 - send mirage metrics to graphite
# run_timestamp = int(now)
run_timestamp = int(time())
for i in range(1, MIRAGE_PROCESSES + 1):
# @added 20200909 - Task #3730: Validate Mirage running multiple processes
up_to = i - 1
# processing_check_file = metric_var_files_sorted[up_to]
# logger.info('processing %s' % processing_check_file)
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes
# p = Process(target=self.spin_process, args=(i, run_timestamp, metric_var_files_sorted))
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
# p = Process(target=self.spin_process, args=(i, run_timestamp, processing_check_file))
end_index = last_index + process_index_range
processing_check_files = multiple_check_files[last_index:end_index]
last_index = int(end_index)
logger.info('process %s - processing %s checks' % (
str(i), str(len(processing_check_files))))
p = Process(target=self.spin_process, args=(i, run_timestamp, processing_check_files))
pids.append(p)
pid_count += 1
logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(MIRAGE_PROCESSES)))
p.start()
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# spawned_pids.append(p.pid)
spawned_pids.append([p.pid, i])
logger.info('started spin_process %s with pid %s' % (str(pid_count), str(p.pid)))
# @added 20230609 - Task #4806: Manage NUMBA_CACHE_DIR
# Feature #4702: numba optimisations
# Use start up key and allow numba cache files to be created
start_key = '%s.starting' % skyline_app
max_analyzer_process_runtime = settings.MAX_ANALYZER_PROCESS_RUNTIME
try:
starting = self.redis_conn.exists(start_key)
if starting:
logger.info('mirage is starting max_analyzer_process_runtime set to 300')
max_analyzer_process_runtime = 300
else:
starting = False
except Exception as err:
logger.error('error :: exists failed on Redis %s key - %s' % (
start_key, err))
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_process has run
# for longer than 180 seconds - 20160512 @earthgecko
p_starts = time()
# @modified 20230609 - Task #4806: Manage NUMBA_CACHE_DIR
# Feature #4702: numba optimisations
# Use start up key and allow numba cache files to be created
# while time() - p_starts <= settings.MAX_ANALYZER_PROCESS_RUNTIME:
while time() - p_starts <= max_analyzer_process_runtime:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (
skyline_app, str(MIRAGE_PROCESSES), time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive())))
p.join()
# @added 20200607 - Feature #3508: ionosphere.untrainable_metrics
# Check to non 3sigma algorithm errors too
if LOCAL_DEBUG:
logger.debug('debug :: adding negatives_present to check_algorithm_errors')
check_algorithm_errors = ['negatives_present']
for algorithm in list(settings.MIRAGE_ALGORITHMS):
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: adding %s to check_algorithm_errors' % (algorithm))
check_algorithm_errors.append(algorithm)
# @added 20200607 - Feature #3566: custom_algorithms
if CUSTOM_ALGORITHMS:
for custom_algorithm in settings.CUSTOM_ALGORITHMS:
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: adding custom_algorithm %s to check_algorithm_errors' % (custom_algorithm))
check_algorithm_errors.append(custom_algorithm)
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: checking for algorithm error files')
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# for completed_pid in spawned_pids:
for completed_pid, mirage_process in spawned_pids:
logger.info('spin_process with pid %s completed' % (str(completed_pid)))
# @modified 20200607 - Feature #3566: custom_algorithms
# Feature #3508: ionosphere.untrainable_metrics
# Check to non 3sigma algorithm errors too and wrapped in try
try:
# for algorithm in settings.MIRAGE_ALGORITHMS:
for algorithm in check_algorithm_errors:
algorithm_error_file = '%s/%s.%s.%s.algorithm.error' % (
settings.SKYLINE_TMP_DIR, skyline_app,
str(completed_pid), algorithm)
if os.path.isfile(algorithm_error_file):
logger.info(
'error :: spin_process with pid %s has reported an error with the %s algorithm' % (
str(completed_pid), algorithm))
try:
with open(algorithm_error_file, 'r') as f:
error_string = f.read()
logger.error('%s' % str(error_string))
except:
logger.error('error :: failed to read %s error file' % algorithm)
try:
os.remove(algorithm_error_file)
except OSError:
pass
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check algorithm errors')
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: checked for algorithm error files')
# Grab data from the queue and populate dictionaries
while 1:
try:
key, value = self.mirage_anomaly_breakdown_q.get_nowait()
if key not in list(anomaly_breakdown.keys()):
anomaly_breakdown[key] = value
else:
anomaly_breakdown[key] += value
except Empty:
# @added 20191113 - Branch #3262: py3
# Log
logger.info('anomaly_breakdown.keys are empty')
break
while 1:
try:
key, value = self.mirage_exceptions_q.get_nowait()
if key not in list(exceptions.keys()):
exceptions[key] = value
else:
exceptions[key] += value
except Empty:
# @added 20191113 - Branch #3262: py3
# Log
logger.info('exceptions.keys are empty')
break
# @added 20191021 - Bug #3288: Always send anomaly_breakdown and exception metrics
# Branch #3262: py3
exceptions_metrics = ['Boring', 'Stale', 'TooShort', 'Other']
for i_exception in exceptions_metrics:
if i_exception not in list(exceptions.keys()):
exceptions[i_exception] = 0
# @modified 20200607 - Feature #3566: custom_algorithms
# for i_anomaly_breakdown in settings.MIRAGE_ALGORITHMS:
for i_anomaly_breakdown in check_algorithm_errors:
if i_anomaly_breakdown not in list(anomaly_breakdown.keys()):
anomaly_breakdown[i_anomaly_breakdown] = 0
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis set and not self.metric_variables
# @modified 20221014 - Feature #4576: mirage - process multiple metrics
if process_single_check_mode:
metric_variables = []
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# literal_metric_variables = list(self.redis_conn.smembers('mirage.metric_variables'))
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Handle per process
# literal_metric_variables = list(self.redis_conn_decoded.smembers('mirage.metric_variables'))
metric_variable_redis_set = 'mirage.%s.metric_variables' % str(mirage_process)
literal_metric_variables = list(self.redis_conn_decoded.smembers(metric_variable_redis_set))
for item_list_string in literal_metric_variables:
list_item = literal_eval(item_list_string)
metric_variables.append(list_item)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
# Handle per process
try:
self.redis_conn.delete(metric_variable_redis_set)
logger.info('deleted Redis set - %s' % metric_variable_redis_set)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % metric_variable_redis_set)
# @added 20191113 - Branch #3262: py3
# Set default values
metric_name = None
metric_value = None
hours_to_resolve = 0
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# for metric_variable in self.metric_variables:
for metric_variable in metric_variables:
if metric_variable[0] == 'metric_name':
metric_name = metric_variable[1]
if metric_variable[0] == 'metric_value':
metric_value = metric_variable[1]
if metric_variable[0] == 'hours_to_resolve':
hours_to_resolve = metric_variable[1]
# if metric_variable[0] == 'metric_timestamp':
# metric_timestamp = metric_variable[1]
# @added 20221014 - Feature #4576: mirage - process multiple metrics
if metric_variable[0] == 'metric_timestamp':
metric_check_timestamp = metric_variable[1]
logger.info('analysis done - %s' % str(metric_name))
# Send alerts
# Calculate hours second order resolution to seconds
# @modified 20191113 - Branch #3262: py3
# Only if set
if hours_to_resolve:
logger.info('analyzed at %s hours resolution' % hours_to_resolve)
second_order_resolution_seconds = int(hours_to_resolve) * 3600
logger.info('analyzed at %s seconds resolution' % str(second_order_resolution_seconds))
# @added 20221014 - Feature #4576: mirage - process multiple metrics
processing_check_file = '%s.%s.txt' % (
str(metric_check_timestamp), metric_name)
# Remove metric check file
metric_check_file = 'None'
try:
metric_check_file = '%s/%s' % (settings.MIRAGE_CHECK_PATH, processing_check_file)
if LOCAL_DEBUG:
logger.debug('debug :: interpolated metric_check_file to %s' % metric_check_file)
except:
logger.error('error :: failed to interpolate metric_check_file')
if os.path.isfile(metric_check_file):
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
if LOCAL_DEBUG:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove metric_check_file - %s' % metric_check_file)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove metric_check_file - %s' % metric_check_file)
else:
if LOCAL_DEBUG:
logger.debug('debug :: no metric_check_file to remove OK - %s' % metric_check_file)
# Remove the metric directory
# @modified 20191113 - Branch #3262: py3
# Convert None to str
# timeseries_dir = metric_name.replace('.', '/')
metric_data_dir = 'None'
try:
metric_name_str = str(metric_name)
timeseries_dir = metric_name_str.replace('.', '/')
metric_data_dir = '%s/%s' % (settings.MIRAGE_CHECK_PATH, timeseries_dir)
if LOCAL_DEBUG:
logger.debug('debug :: metric_data_dir interpolated to %s' % str(metric_data_dir))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to interpolate metric_data_dir')
metric_data_dir = 'None'
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree %s' % metric_data_dir)
else:
if LOCAL_DEBUG:
logger.debug('debug :: metric_data_dir does not exist - %s' % str(metric_data_dir))
ionosphere_unique_metrics = []
if settings.MIRAGE_ENABLE_ALERTS:
# @added 20161228 - Feature #1830: Ionosphere alerts
# Branch #922: Ionosphere
# Bringing Ionosphere online - do alert on Ionosphere metrics
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics'))
ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get ionosphere.unique_metrics from Redis')
ionosphere_unique_metrics = []
else:
if LOCAL_DEBUG:
logger.debug('debug :: settings.MIRAGE_ENABLE_ALERTS is not True')
# @added 20161228 - Feature #1830: Ionosphere alerts
# Branch #922: Ionosphere
# Send alerts for Ionosphere
alert_context = 'Mirage'
if ionosphere_alerts_returned:
alert_context = 'Ionosphere'
ionosphere_unique_metrics = []
logger.info('Ionosphere alerts requested emptying ionosphere_unique_metrics so Mirage will alert')
exceptions = {}
# @modified 20190524 - Branch #3002
# Wrapped in try except
try:
run_timestamp = int(time())
# @modified 20200430 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_alert_on = list(self.redis_conn.scan_iter(match='ionosphere.mirage.alert.*'))
ionosphere_alert_on = list(self.redis_conn_decoded.scan_iter(match='ionosphere.mirage.alert.*'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get ionosphere.mirage.alert.* from Redis key scan')
ionosphere_alert_on = []
for cache_key in ionosphere_alert_on:
try:
# @modified 20200322 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# alert_on = self.redis_conn.get(cache_key)
alert_on = self.redis_conn_decoded.get(cache_key)
send_alert_for = literal_eval(alert_on)
value = float(send_alert_for[0])
base_name = str(send_alert_for[1])
metric_timestamp = int(float(send_alert_for[2]))
triggered_algorithms = send_alert_for[3]
# @added 20201001 - Task #3748: POC SNAB
# Added algorithms_run required to determine the anomalyScore
algorithms_run = send_alert_for[5]
second_order_resolution_seconds = int(send_alert_for[4])
# @modified 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added triggered_algorithms and algorithms_run
# anomalous_metric = [value, base_name, metric_timestamp, second_order_resolution_seconds]
anomalous_metric = [value, base_name, metric_timestamp, second_order_resolution_seconds, triggered_algorithms, algorithms_run]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.anomalous_metrics.append(anomalous_metric)
redis_set = 'mirage.anomalous_metrics'
data = str(anomalous_metric)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20210323 - Feature #3642: Anomaly type classification
if LUMINOSITY_CLASSIFY_ANOMALIES:
redis_set = 'luminosity.classify_anomalies'
added_at = int(time())
data_dict = {
'metric': base_name,
'timestamp': int(metric_timestamp),
'value': value,
'algorithms': algorithms_run,
'triggered_algorithms': triggered_algorithms,
'app': skyline_app,
'added_at': added_at,
}
data = [base_name, int(metric_timestamp), added_at, data_dict]
try:
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
anomaly_breakdown = {}
for algorithm in triggered_algorithms:
anomaly_breakdown[algorithm] = 1
self.redis_conn.delete(cache_key)
except:
logger.error(traceback.format_exc())
# @modified 20200322 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# logger.error('error :: failed to add an Ionosphere anomalous_metric for %s' % base_name)
logger.error('error :: failed to add an Ionosphere anomalous_metric for cache key %s' % cache_key)
else:
if LOCAL_DEBUG:
logger.debug('debug :: no ionosphere_alerts_returned - %s' % str(ionosphere_alerts_returned))
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
# To reduce the amount of I/O used by Mirage in this loop check
# and reduce the number of log entries for 'not alerting - Ionosphere metric'
# a check is made if the metric_name has already been check, if
# so continue
not_alerting_for_ionosphere = 'none'
# @added 20221015 - Bug #2682: Reduce mirage ionosphere alert loop
not_alerting_logged = []
# @added 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Only check Ionosphere is up once per cycle
ionosphere_up = False
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
mirage_anomalous_metrics = []
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# literal_mirage_anomalous_metrics = list(self.redis_conn.smembers('mirage.anomalous_metrics'))
literal_mirage_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage.anomalous_metrics'))
for metric_list_string in literal_mirage_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_anomalous_metrics.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine list from mirage.anomalous_metrics Redis set')
mirage_anomalous_metrics = []
# @added 20221206 - Feature #4734: mirage_vortex
# Feature #4732: flux vortex
# Add any anomalies discovered by mirage_vortex to be alerted on
mirage_vortex_anomalous_basenames = []
if VORTEX_ENABLED and not ionosphere_alerts_returned:
mirage_vortex_anomalous_metrics = []
try:
literal_mirage_vortex_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage_vortex.anomalous_metrics'))
for metric_list_string in literal_mirage_vortex_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_vortex_anomalous_metrics.append(metric)
mirage_vortex_anomalous_basenames.append(metric[1])
except Exception as err:
logger.error('error :: failed to determine list from mirage_vortex.anomalous_metrics Redis set - %s' % err)
mirage_vortex_anomalous_metrics = []
try:
self.redis_conn_decoded.delete('mirage_vortex.anomalous_metrics')
except Exception as err:
logger.error('error :: failed to delete mirage_vortex.anomalous_metrics Redis set - %s' % err)
if mirage_vortex_anomalous_metrics:
logger.info('adding %s metrics to mirage_anomalous_metrics for mirage_vortex_anomalous_metrics to alert on' % (
str(len(mirage_vortex_anomalous_metrics))))
mirage_anomalous_metrics = mirage_anomalous_metrics + mirage_vortex_anomalous_metrics
# @added 20200907 - Feature #3734: waterfall alerts
# Add alert for expired waterfall_alert items
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
waterfall_alert_check_timestamp = int(time())
waterfall_alerts_to_alert_on = []
# A list to add a metric,timestamp string to in order to override
# the ionosphere_metric in the alerting block
alerting_waterfall_alerts = []
waterfall_redis_sets = [
'mirage.waterfall_alerts.sent_to_ionosphere',
]
for waterfall_redis_set in waterfall_redis_sets:
redis_set = waterfall_redis_set
literal_waterfall_alerts = []
try:
literal_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_waterfall_alerts = []
waterfall_alerts = []
logger.info('checking for expired checks in %s waterfall alerts in Redis set %s' % (
str(len(literal_waterfall_alerts)), redis_set))
for literal_waterfall_alert in literal_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
waterfall_alerts.append(waterfall_alert)
for waterfall_alert in waterfall_alerts:
if waterfall_alert_check_timestamp >= (int(waterfall_alert[3]) + 300):
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
# @modified 20221104 - Bug #4722: Handle alert and waterfall alert overlap
# Added more context to log to enable debugging
# logger.info('removed waterfall alert item to alert on from Redis set %s - %s' % (
logger.info('appending to waterfall_alerts_to_alert_on and removed waterfall alert item to alert on from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
waterfall_alerts_to_alert_on.append(waterfall_alert)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove feedback metric waterfall alert item for %s from Redis set %s' % (
base_name, redis_set))
for waterfall_alert in waterfall_alerts_to_alert_on:
try:
value = float(waterfall_alert[2])
base_name = str(waterfall_alert[0])
metric_timestamp = int(waterfall_alert[1])
# @added 20201008 - Feature #3734: waterfall alerts
# Branch #3068: SNAB
# Added triggered_algorithms and algorithms_run
algorithms_run = waterfall_alert[4][4]
triggered_algorithms = waterfall_alert[4][5]
# @modified 20201008 - Feature #3734: waterfall alerts
# Branch #3068: SNAB
# Added triggered_algorithms and algorithms_run
# anomalous_metric = [value, base_name, metric_timestamp]
# mirage_anomalous_metrics.append([value, base_name, metric_timestamp])
anomalous_metric = [value, base_name, metric_timestamp, triggered_algorithms, algorithms_run]
mirage_anomalous_metrics.append(anomalous_metric)
waterfall_alert_check_string = '%s.%s' % (str(metric_timestamp), base_name)
alerting_waterfall_alerts.append(waterfall_alert_check_string)
logger.info('waterfall alerting on %s' % base_name)
redis_waterfall_alert_key = 'mirage.waterfall.alert.%s' % waterfall_alert_check_string
try:
self.redis_conn.setex(redis_waterfall_alert_key, 300, waterfall_alert_check_timestamp)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add Redis key - %s for waterfall alert' % (
redis_waterfall_alert_key))
# @added 20200929 - Feature #3734: waterfall alerts
# Task #3748: POC SNAB
# Branch #3068: SNAB
# Added Panorama anomaly details for waterfall alerts
# Note:
# The values are enclosed is single quoted intentionally
# as the imp.load_source used results in a shift in the
# decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
logger.info('adding panorama anomaly file for waterfall alert on %s' % base_name)
panorama_data = None
try:
panorama_data = waterfall_alert[4]
from_timestamp = str(int(panorama_data[2]))
int_metric_timestamp = int(panorama_data[3])
algorithms_run = panorama_data[4]
triggered_algorithms = panorama_data[5]
source = panorama_data[7]
added_at = str(int(time()))
panoroma_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'app = \'%s\'\n' \
'source = \'%s\'\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(value), from_timestamp,
str(int_metric_timestamp), str(algorithms_run),
triggered_algorithms, skyline_app, source,
this_host, added_at)
logger.info('panorama anomaly data for waterfall alert - %s' % str(panorama_data))
# Create an anomaly file with details about the anomaly
sane_metricname = filesafe_metricname(str(base_name))
panoroma_anomaly_file = '%s/%s.%s.txt' % (
settings.PANORAMA_CHECK_PATH, added_at, sane_metricname)
try:
write_data_to_file(
skyline_app, panoroma_anomaly_file, 'w',
panoroma_anomaly_data)
logger.info('added panorama anomaly file for waterfall alert :: %s' % (panoroma_anomaly_file))
except:
logger.error('error :: failed to add panorama anomaly file :: %s' % (panoroma_anomaly_file))
logger.error(traceback.format_exc())
redis_set = 'mirage.sent_to_panorama'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add panorama anomaly data file for waterfall alert')
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add waterfall alert to alert on to mirage_anomalous_metrics')
# @added 20200913 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Info #1792: Shapelet extraction
snab_checks_sent = []
added_to_snab_at = int(time())
# @added 20200929 - Task #3748: POC SNAB
# Branch #3068: SNAB
# Added three-sigma snab.panorama items
panorama_added_at = int(time())
panorama_three_sigma_snab_added = []
# @added 20200610 - Feature #3560: External alert config
external_alerts = {}
external_from_cache = None
internal_alerts = {}
internal_from_cache = None
all_alerts = list(settings.ALERTS)
all_from_cache = None
if EXTERNAL_ALERTS:
try:
external_alerts, external_from_cache, internal_alerts, internal_from_cache, all_alerts, all_from_cache = get_external_alert_configs(skyline_app)
except:
logger.error(traceback.format_exc())
logger.error('error :: could not determine external alert configs')
logger.info('retrieved %s external_alerts configurations from_cache %s, %s internal_alerts from_cache %s and %s all_alerts from_cache %s' % (
str(len(external_alerts)), str(external_from_cache),
str(len(internal_alerts)), str(internal_from_cache),
str(len(all_alerts)), str(all_from_cache)))
if LOCAL_DEBUG:
logger.debug('debug :: all_alerts :: %s' % str(all_alerts))
if not all_alerts:
logger.error('error :: all_alerts is not set, so creating from settings.ALERTS')
all_alerts = list(settings.ALERTS)
# @added 20220316 - Feature #4482: Test alerts
# Add test_alerts http_alerters to all_alerts because any new
# external_settings alerters may not have been updated/added yet
if test_alerts:
test_all_alerts = []
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_alerter = test_alerts[test_alert_timestamp]['alerter']
if not test_alerter.startswith('http_alerter'):
continue
test_metric = test_alerts[test_alert_timestamp]['metric']
test_id = test_alerts[test_alert_timestamp]['alerter_id']
external_alert_id = 'external-%s' % str(test_id)
test_alert_setting = [
test_metric, test_alerter, 60, 168,
{'id': external_alert_id, 'alerter': test_alerter,
'namespace': test_metric, 'type': 'external',
'expiration': 60, 'second_order_resolution': 604800}
]
test_all_alerts.append(test_alert_setting)
logger.info('adding test_alert setting: %s' % str(test_alert_setting))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to add test_alert to test_all_alerts %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
for item in all_alerts:
test_all_alerts.append(item)
all_alerts = list(test_all_alerts)
not_ionosphere_metrics = []
# @modified 20200610 - Feature #3560: External alert config
# for alert in settings.ALERTS:
for alert in all_alerts:
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
not_an_ionosphere_metric_check_done = 'none'
if LOCAL_DEBUG:
logger.debug('debug :: %s metrics in mirage_anomalous_metrics' % str(len(mirage_anomalous_metrics)))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# for metric in self.anomalous_metrics:
for metric in mirage_anomalous_metrics:
# @added 20161228 - Feature #1830: Ionosphere alerts
# Branch #922: Ionosphere
# Bringing Ionosphere online - do alert on Ionosphere
# metrics if Ionosphere is up
try:
metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric[1]))
# @added 20220805 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
if metric[1].startswith('labelled_metrics.'):
metric_name = str(metric[1])
# if metric_name in ionosphere_unique_metrics:
# ionosphere_unique_metrics.remove(metric_name)
if LOCAL_DEBUG:
logger.debug('debug :: metric_name interpolated to %s' % str(metric_name))
except:
if LOCAL_DEBUG:
logger.error(traceback.format_exc())
logger.debug('debug :: failed to interpolate metric_name')
# @added 20200907 - Feature #3734: waterfall alerts
waterfall_alert_check_string = '%s.%s' % (str(int(metric[2])), metric[1])
if not ionosphere_unique_metrics:
ionosphere_unique_metrics = []
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if metric[1] in test_alert_metrics:
if metric_name in ionosphere_unique_metrics:
ionosphere_unique_metrics.remove(metric_name)
# @added 20221206 - Feature #4734: mirage_vortex
# Feature #4732: flux vortex
# Remove mirage_vortex metrics from ionosphere_unique_metrics
# so that they get alerted on
if metric[1] in mirage_vortex_anomalous_basenames:
if metric_name in ionosphere_unique_metrics:
ionosphere_unique_metrics.remove(metric_name)
if not ionosphere_unique_metrics:
ionosphere_unique_metrics = []
# @modified 20200907 - Feature #3734: waterfall alerts
# if metric_name in ionosphere_unique_metrics:
if metric_name in ionosphere_unique_metrics and waterfall_alert_check_string not in alerting_waterfall_alerts:
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
if not_alerting_for_ionosphere == metric_name:
continue
# @added 20221015 - Bug #2682: Reduce mirage ionosphere alert loop
if metric[1] in not_alerting_logged:
continue
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Only check Ionosphere is up once per cycle
# ionosphere_up = False
if not ionosphere_up:
try:
ionosphere_up = self.redis_conn.get('ionosphere')
except Exception as e:
logger.error('error :: could not query Redis for ionosphere key: %s' % str(e))
if ionosphere_up:
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Wrapped this block up on conditional based on
# ionosphere_alerts_returned
if not ionosphere_alerts_returned:
# @modified 20221015 - Bug #2682: Reduce mirage ionosphere alert loop
# Check if in not_alerting_logged
if metric[1] not in not_alerting_logged:
logger.info('not alerting - Ionosphere metric - %s' % str(metric[1]))
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
not_alerting_for_ionosphere = metric_name
# @added 20221015 - Bug #2682: Reduce mirage ionosphere alert loop
not_alerting_logged.append(str(metric[1]))
continue
else:
logger.error('error :: Ionosphere not reporting up')
logger.info('taking over alerting from Ionosphere if alert is matched on - %s' % str(metric[1]))
else:
# @modified 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
# logger.info('not an Ionosphere metric checking whether to alert - %s' % str(metric[1]))
if not_an_ionosphere_metric_check_done == metric_name:
# Do not log multiple times for this either
not_an_ionosphere_metric_check_done = metric_name
else:
if not ionosphere_alerts_returned:
if metric_name not in not_ionosphere_metrics:
logger.info('not an Ionosphere metric checking whether to alert - %s' % str(metric[1]))
not_an_ionosphere_metric_check_done = metric_name
not_ionosphere_metrics.append(metric_name)
# ALERT_MATCH_PATTERN = alert[0]
# METRIC_PATTERN = metric[1]
# @modified 20200622 - Task #3586: Change all alert pattern checks to matched_or_regexed_in_list
# Feature #3512: matched_or_regexed_in_list function
# Changed original alert matching pattern to use new
# method
base_name = str(metric[1])
# @added 20220805 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
use_base_name = str(base_name)
labelled_metric_name = None
if base_name.startswith('labelled_metrics.'):
use_base_name = get_base_name_from_labelled_metrics_name(skyline_app, base_name)
labelled_metric_name = str(base_name)
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, use_base_name, [alert[0]])
if LOCAL_DEBUG and pattern_match:
logger.debug('debug :: %s matched alert - %s' % (use_base_name, alert[0]))
try:
del metric_matched_by
except:
pass
except:
pattern_match = False
if pattern_match:
# @added 20200610 - Feature #3560: External alert config
# @modified 20200624 - Feature #3560: External alert config
# Set the alert key to the external alerter id
# external_alerter_alerter = None
external_alerter_id = None
try:
if alert[4]['type'] == 'external':
# @modified 20200624 - Feature #3560: External alert config
# Set the alert key to the external alerter id
# external_alerter_alerter = alert[4]['alerter']
external_alerter_id = alert[4]['id'].replace('external-', '')
except:
external_alerter_id = None
# @modified 20200610 - Feature #3560: External alert config
# Use the all_alerts list which includes external alert configs
# cache_key = 'mirage.last_alert.%s.%s' % (alert[1], metric[1])
# @modified 20200624 - Feature #3560: External alert config
# Set the alert key to the external alerter id
# if external_alerter_alerter:
# cache_key = 'mirage.last_alert.%s.%s.%s' % (str(external_alerter_alerter), alert[1], metric[1])
if external_alerter_id:
cache_key = 'mirage.last_alert.%s.%s.%s' % (str(external_alerter_id), alert[1], metric[1])
else:
cache_key = 'mirage.last_alert.%s.%s' % (alert[1], metric[1])
try:
# @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Changed the last_alert cache key to hold the last
# anomaly timestamp
# last_alert = self.redis_conn.get(cache_key)
last_alert = self.redis_conn_decoded.get(cache_key)
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Evaluate the reported anomaly timestamp to determine whether
# EXPIRATION_TIME should be applied to a batch metric
if last_alert:
# Is this a analyzer_batch related anomaly
analyzer_batch_anomaly = None
metric_timestamp = metric[2]
analyzer_batch_metric_anomaly_key = 'analyzer_batch.anomaly.%s.%s' % (
str(metric_timestamp), metric[1])
try:
analyzer_batch_anomaly = self.redis_conn_decoded.get(analyzer_batch_metric_anomaly_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s - %s' % (
analyzer_batch_metric_anomaly_key, e))
analyzer_batch_anomaly = None
if analyzer_batch_anomaly:
logger.info('identified as an analyzer_batch triggered anomaly from the presence of the Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
logger.info('not identified as an analyzer_batch triggered anomaly as no Redis key found - %s' % analyzer_batch_metric_anomaly_key)
if last_alert and analyzer_batch_anomaly:
last_timestamp = None
try:
last_timestamp = int(last_alert)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine last_timestamp from the last Mirage alert key - %s' % cache_key)
last_timestamp = None
seconds_between_batch_anomalies = None
if last_timestamp:
try:
seconds_between_batch_anomalies = int(metric_timestamp) - int(last_timestamp)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine seconds_between_batch_anomalies for batch metric Panorama key- %s' % cache_key)
last_timestamp = None
if seconds_between_batch_anomalies:
if seconds_between_batch_anomalies > int(alert[2]):
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is greater than the metric EXPIRATION_TIME of %s' % (
str(last_timestamp), str(metric_timestamp), metric[1],
str(alert[2])))
logger.info('alerting on anomaly for batch metric %s, so setting last_alert to None' % (
metric))
last_alert = None
else:
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is less than the metric EXPIRATION_TIME of %s, not alerting' % (
str(last_timestamp), str(metric_timestamp), metric[1],
str(alert[2])))
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Task #3562: Change panorama.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# If the metric is a batch processing metric and the anomaly
# timestamp is less than the last_check timestamp, insert
# the anomaly
if int(metric_timestamp) < last_timestamp:
logger.info('batch anomaly timestamp (%s) less than the last_check timestamp (%s), alerting on anomaly for batch metric %s, so setting last_alert to None' % (
str(metric_timestamp), str(last_timestamp), metric))
last_alert = None
if not last_alert:
if ionosphere_alerts_returned:
# @modified 20190410 - Feature #2882: Mirage - periodic_check
# Only set if not set
try:
second_order_resolution_seconds + 1
set_second_order_resolution_seconds = False
except:
set_second_order_resolution_seconds = True
if set_second_order_resolution_seconds:
try:
second_order_resolution_seconds = int(metric[3]) * 3600
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine full_duration from the Ionosphere alert for %s' % (metric[1]))
logger.info('using settings.FULL_DURATION - %s' % (str(settings.FULL_DURATION)))
second_order_resolution_seconds = int(settings.FULL_DURATION)
# @modified 20190524 - Branch #3002
# Wrapped in try except
try:
# @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Change the last_alert cache key to hold the
# the anomaly timestamp for which the alert
# was sent, not the packb anomaly value.
# Using the timestamp of the anomaly allows
# it to be used to determine if a batch
# anomaly should be alerted on based on the
# comparison of the timestamps rather than
# just the presence of the last_alert key
# based on it not having reach its TTL as
# analyzer_batch could send multiple
# anomalies in one batch that might be
# EXPIRATION_TIME apart.
# self.redis_conn.setex(cache_key, alert[2], packb(metric[0]))
self.redis_conn.setex(cache_key, str(alert[2]), int(metric[2]))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to set Redis key %s for %s' % (
str(cache_key), metric[1]))
# @added 20200929 - Task #3748: POC SNAB
# Branch #3068: SNAB
# Determine if this is a snab_check_metric so
# that info can be passed to the alerter
snab_check_metric = False
if SNAB_ENABLED:
# @added 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Added snab custom_algorithms to run
snab_algorithms_to_run = []
try:
snab_algorithms_to_run = metric[6]
except:
snab_algorithms_to_run = []
for app in SNAB_CHECKS:
if app == skyline_app:
for snab_context in SNAB_CHECKS[app]:
if snab_check_metric:
break
if snab_context == 'testing':
for algorithm in SNAB_CHECKS[app][snab_context]:
# @added 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
if labelled_metric_name:
if algorithm not in snab_algorithms_to_run:
continue
try:
algorithm_source = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_source']
except:
break
if not os.path.isfile(algorithm_source):
break
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
# @modified 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
# if namespace in base_name:
if namespace in use_base_name:
snab_check_metric = True
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check if %s is a snab_check_metric' % base_name)
if snab_check_metric:
algorithm_group = 'three-sigma'
# @added 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added second_order_resolution_seconds, triggered_algorithms and algorithms_run
try:
triggered_algorithms = metric[4]
algorithms_run = metric[5]
except:
triggered_algorithms = []
algorithms_run = []
# @added 20201001 - Task #3748: POC SNAB
# Added anomalyScore
try:
if triggered_algorithms and algorithms_run:
anomalyScore = len(triggered_algorithms) / len(algorithms_run)
else:
anomalyScore = 1.0
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine anomalyScore for %s' % base_name)
anomalyScore = 1.0
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Added analysis_run_time
analysis_run_time = 0
redis_key = 'mirage.analysis_run_time.%s.%s' % (base_name, str(metric[2]))
try:
analysis_run_time_data = self.redis_conn_decoded.get(redis_key)
if analysis_run_time_data:
analysis_run_time = float(analysis_run_time_data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine analysis_run_time from Redis key - %s' % (
redis_key))
try:
snab_panorama_details = {
# @modified 20230419 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
# 'metric': base_name,
'metric': use_base_name,
'labelled_metric_name': labelled_metric_name,
'timestamp': int(metric[2]),
# @added 20201001 - Task #3748: POC SNAB
# Added anomalyScore and analysis_run_time
'anomalyScore': anomalyScore,
'analysis_run_time': analysis_run_time,
'source': skyline_app,
'algorithm_group': algorithm_group,
'algorithm': None,
'added_at': panorama_added_at,
}
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to create snab_panorama_details dict %s' % base_name)
snab_panorama_details = None
# Only send once for a metric and timestamp
if snab_panorama_details not in panorama_three_sigma_snab_added:
self.redis_conn.sadd('snab.panorama', str(snab_panorama_details))
logger.info('added snab.panorama three-sigma item - %s' % (
str(snab_panorama_details)))
panorama_three_sigma_snab_added.append(snab_panorama_details)
# @added 20210801 - Feature #4214: alert.paused
alert_paused = False
try:
cache_key = 'alert.paused.%s.%s' % (alert[1], base_name)
alert_paused = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: alert_paused check failed: %s' % str(e))
alert_paused = False
if alert_paused:
logger.info('alert_paused for %s %s until %s' % (
alert[1], base_name, str(alert_paused)))
# @added 20221206 - Feature #4734: mirage_vortex
# Feature #4732: flux vortex
# Remove mirage_vortex metrics from ionosphere_unique_metrics
# so that they get alerted on
if metric[1] in mirage_vortex_anomalous_basenames:
alert_context = 'Vortex'
# trigger_alert(alert, metric, second_order_resolution_seconds, context)
try:
# @added 20210409 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Always determine triggered_algorithms and
# calculate anomalyScore
try:
triggered_algorithms = metric[4]
algorithms_run = metric[5]
except:
triggered_algorithms = []
algorithms_run = []
try:
if triggered_algorithms and algorithms_run:
anomalyScore = len(triggered_algorithms) / len(algorithms_run)
else:
anomalyScore = 1.0
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine anomalyScore for %s' % base_name)
anomalyScore = 1.0
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if base_name in test_alert_metrics:
triggered_algorithms = ['testing']
second_order_resolution_seconds = int(settings.FULL_DURATION * 7)
logger.info('test alert set second_order_resolution_seconds to %s' % str(second_order_resolution_seconds))
try:
second_order_resolution_seconds + 1
set_second_order_resolution_seconds = False
except:
set_second_order_resolution_seconds = True
if set_second_order_resolution_seconds:
try:
second_order_resolution_seconds = int(metric[3])
except:
logger.error(traceback.format_exc())
logger.warning('warning :: failed to determine full_duration for alert on %s' % (metric[1]))
logger.info('using settings.FULL_DURATION * 7 - %s' % (str(settings.FULL_DURATION)))
second_order_resolution_seconds = int(settings.FULL_DURATION * 7)
if alert[1] != 'smtp':
new_alert = None
# @added 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
if 'http_alerter' in alert[1]:
anomaly_id = None
anomaly_id_redis_key = 'panorama.anomaly_id.%s.%s' % (
str(int(metric[2])), base_name)
try_get_anomaly_id_redis_key_count = 0
while try_get_anomaly_id_redis_key_count < 20:
try_get_anomaly_id_redis_key_count += 1
try:
anomaly_id = int(self.redis_conn_decoded.get(anomaly_id_redis_key))
break
except:
sleep(1)
if not anomaly_id:
logger.error('error :: failed to determine anomaly_id from Redis key - %s' % anomaly_id_redis_key)
else:
logger.info('determined anomaly_id as %s, appending to alert' % str(anomaly_id))
# Do not modify the alert list object, create a new one
new_alert = list(alert)
new_alert.append(['anomaly_id', anomaly_id])
# @added 20201130 - Feature #3772: Add the anomaly_id to the http_alerter json
# Determine the triggered_algorithms
# and algorithms_run
try:
triggered_algorithms = metric[4]
algorithms_run = metric[5]
except:
triggered_algorithms = []
algorithms_run = []
# @added 20201111 - Feature #3772: Add the anomaly_id to the http_alerter json
# Add the real anomalyScore
try:
if triggered_algorithms and algorithms_run:
anomalyScore = len(triggered_algorithms) / len(algorithms_run)
else:
anomalyScore = 1.0
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine anomalyScore for %s' % base_name)
anomalyScore = 1.0
new_alert.append(['anomalyScore', anomalyScore])
if base_name in test_alert_metrics:
triggered_algorithms = ['testing']
new_alert.append(['test_alert', True])
# @added 20200929 - Task #3748: POC SNAB
# Branch #3068: SNAB
# Determine if this is a snab_check_metric so
# that info can be passed to the alerter
if SNAB_ENABLED and snab_check_metric:
if alert[1] == 'slack':
anomaly_id = None
snab_id = None
snab_details = None
anomaly_id_redis_key = 'panorama.anomaly_id.%s.%s' % (
str(int(metric[2])), base_name)
try_get_anomaly_id_redis_key_count = 0
while try_get_anomaly_id_redis_key_count < 20:
try_get_anomaly_id_redis_key_count += 1
try:
anomaly_id = int(self.redis_conn_decoded.get(anomaly_id_redis_key))
break
except:
sleep(1)
if not anomaly_id:
logger.error('error :: failed to determine anomaly_id from Redis key - %s' % anomaly_id_redis_key)
if anomaly_id:
# snab_id_redis_key = 'snab.id.%s.%s.%s.%s' % (algorithm_group, str(metric_timestamp), base_name, panorama_added_at)
snab_id_redis_key = 'snab.id.%s.%s.%s.%s' % (algorithm_group, str(int(metric[2])), base_name, panorama_added_at)
try_get_snab_id_redis_key_count = 0
while try_get_snab_id_redis_key_count < 20:
try_get_snab_id_redis_key_count += 1
try:
snab_id = int(self.redis_conn_decoded.get(snab_id_redis_key))
break
except:
sleep(1)
if not snab_id:
logger.error('error :: failed to determine snab_id from Redis key - %s' % snab_id_redis_key)
snab_details = ['snab_details', snab_id, anomaly_id, anomalyScore]
# Do not modify the alert list object, create a new one
new_alert = list(alert)
new_alert.append(snab_details)
# @modified 20201008 - Feature #3772: Add the anomaly_id to the http_alerter json
# Feature #3734: waterfall alerts
# Branch #3068: SNAB
# Use the appropriate alert context
if new_alert:
# @added 20210801 - Feature #4214: alert.paused
if not alert_paused:
logger.info('trigger_alert :: alert: %s, metric: %s, second_order_resolution_seconds: %s, context: %s' % (
str(new_alert), str(metric),
str(second_order_resolution_seconds),
str(alert_context)))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(new_alert, metric, second_order_resolution_seconds, alert_context, triggered_algorithms)
else:
# @added 20210801 - Feature #4214: alert.paused
if not alert_paused:
logger.info('trigger_alert :: alert: %s, metric: %s, second_order_resolution_seconds: %s, context: %s' % (
str(alert), str(metric),
str(second_order_resolution_seconds),
str(alert_context)))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(alert, metric, second_order_resolution_seconds, alert_context, triggered_algorithms)
else:
# @added 20210801 - Feature #4214: alert.paused
if not alert_paused:
logger.info('smtp_trigger_alert :: alert: %s, metric: %s, second_order_resolution_seconds: %s, context: %s' % (
str(alert), str(metric),
str(second_order_resolution_seconds),
str(alert_context)))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
smtp_trigger_alert(alert, metric, second_order_resolution_seconds, alert_context, triggered_algorithms)
if not alert_paused:
logger.info('sent %s alert: For %s' % (alert[1], metric[1]))
# @added 20221105 - Bug #4308: matrixprofile - fN on big drops
# Flush trigger history so that
# skyline_matrixprofile can be evaluated
# again.
try:
del_result = self.redis_conn_decoded.hdel('mirage.trigger_history', base_name)
if del_result == 1:
logger.info('cleared mirage.trigger_history entry for %s' % base_name)
except Exception as err:
logger.error('error :: failed to del trigger history for %s - %s' % (base_name, err))
except Exception as e:
logger.error('error :: could not send %s alert for %s: %s' % (alert[1], metric[1], e))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if base_name in test_alert_metrics or use_base_name in test_alert_metrics:
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_metric = test_alerts[test_alert_timestamp]['metric']
if test_metric != base_name:
if test_metric != use_base_name:
continue
test_alert_redis_key = '%s.test_alerts' % skyline_app
self.redis_conn_decoded.hdel(test_alert_redis_key, test_alert_timestamp)
logger.info('removed test_alert for %s from Redis key %s' % (
str(test_metric), str(test_alert_redis_key)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove test_alert for %s from Redis key %s - %s' % (
str(test_metric), str(test_alert_redis_key), err))
continue
# @added 20200913 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Info #1792: Shapelet extraction
if SNAB_ENABLED:
timeseries_dir = base_name.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(int(metric[2])),
str(timeseries_dir))
anomaly_data = '%s/%s.json' % (training_dir, base_name)
# @added 20230429 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Use downsampled_timeseries data
downsampled_anomaly_data = '%s/%s.downsampled.json' % (training_dir, base_name)
use_downsampled_data = False
if os.path.isfile(downsampled_anomaly_data):
anomaly_data = downsampled_anomaly_data
logger.info('using downsampled data for snab on %s' % (
base_name))
use_downsampled_data = True
check_full_duration = (int(alert[3]) * 60 * 60)
# TODO:
# Due to how the matrixprofile identifies
# discords, if a metric triggers as
# anomalous with 3sigma it must be checked
# for at x window periods thereafter as
# matrixprofile may only identify a discord
# later when the time series changes again.
for app in SNAB_CHECKS:
if app == skyline_app and check_full_duration:
for snab_context in SNAB_CHECKS[app]:
if snab_context == 'testing':
for algorithm in SNAB_CHECKS[app][snab_context]:
try:
alert_slack_channel = SNAB_CHECKS[app][snab_context][algorithm]['alert_slack_channel']
except:
alert_slack_channel = None
try:
algorithm_source = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_source']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to verify algorithm_source for %s for the %s context' % (
algorithm, snab_context))
break
if not os.path.isfile(algorithm_source):
logger.error('error :: algorithm_source file %s does not exist for %s for the %s context' % (
algorithm_source, algorithm, snab_context))
break
try:
algorithm_parameters = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_parameters']
except:
algorithm_parameters = {}
try:
max_execution_time = SNAB_CHECKS[app][snab_context][algorithm]['max_execution_time']
except:
max_execution_time = 1.0
try:
algo_debug_logging = SNAB_CHECKS[app][snab_context][algorithm]['debug_logging']
except:
algo_debug_logging = False
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
# if namespace in base_name:
if namespace in use_base_name:
algorithm_parameters['metric'] = use_base_name
# @added 20230429 - Feature #4848: mirage - analyse.irregular.unstable.timeseries.at.30days
# Use downsampled_timeseries data
if use_downsampled_data:
algorithm_parameters['anomaly_data'] = anomaly_data
snab_check_details = {
# 'metric': base_name,
'metric': use_base_name,
'labelled_metric_name': labelled_metric_name,
'timestamp': int(metric[2]),
'original_anomaly_timestamp': int(metric[2]),
'value': metric[0],
'full_duration': check_full_duration,
'anomaly_data': anomaly_data,
'source': 'mirage',
'added_at': added_to_snab_at,
'original_added_at': added_to_snab_at,
'context': snab_context,
'algorithm': algorithm,
'algorithm_source': algorithm_source,
'algorithm_parameters': algorithm_parameters,
'max_execution_time': max_execution_time,
'debug_logging': algo_debug_logging,
'alert_slack_channel': alert_slack_channel,
'processed': None,
'analysed': None,
'anomalous': None,
'anomalyScore': None,
'snab_only': False,
}
add_snab_check = True
if base_name in snab_checks_sent:
add_snab_check = False
break
if add_snab_check:
self.redis_conn.sadd('snab.work', str(snab_check_details))
logger.info('added snab check for %s with algorithm %s for alerter %s' % (
base_name, algorithm, str(alert[1])))
snab_checks_sent.append(base_name)
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check and add check_details to snab.work Redis set if required')
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere'
# @added 20200905 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere'
literal_waterfall_alerts = []
try:
literal_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_waterfall_alerts = []
waterfall_alerts = []
for literal_waterfall_alert in literal_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
waterfall_alerts.append(waterfall_alert)
for waterfall_alert in waterfall_alerts:
if waterfall_alert[0] == metric:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# TODO - Add anomaly to Panorama when a waterfall alert triggers
else:
logger.info('alert Redis key %s exists not alerting for %s' % (str(cache_key), metric[1]))
# @added 20230430 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
# TODO add to list and remove all in one as an alert may not exist for another alerter???
# If a redis alert key exists, handle removing the entry from
# mirage.anomalous_metrics due to the fact that if labelled_metrics
# can add up and fail to be removed if the alert is sent but mirage
# is restart BEFORE the mirage.anomalous_metrics set is deleted.
# When this occurs Mirage continues to do all the loop work on these
# and from that point they never get removed because the alert has been
# sent and the loops continue. This ensures that if an anomalous metric has
# been alerted on, it is removed.
try:
removed_a_item = self.redis_conn.srem('mirage.anomalous_metrics', str(metric))
if removed_a_item:
logger.info('removed item from mirage.anomalous_metrics Redis set - %s' % (
str(metric)))
else:
logger.warning('warning :: could not remove item from mirage.anomalous_metrics Redis set - %s' % (
str(metric)))
except Exception as err:
logger.error('error :: failed to srem item from mirage.anomalous_metrics %s - %s' % (
str(metric), err))
except Exception as e:
logger.error('error :: could not query Redis for cache_key - %s' % e)
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
snab_only_checks = []
if SNAB_ENABLED:
try:
literal_mirage_snab_only_checks = list(self.redis_conn_decoded.smembers(mirage_snab_only_checks_redis_set))
for snab_only_check_string in literal_mirage_snab_only_checks:
snab_only_check = literal_eval(snab_only_check_string)
snab_only_checks.append(snab_only_check)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine list from mirage.not_anomalous_metrics Redis set')
snab_only_checks = []
if not snab_only_checks:
logger.info('there are no snab_only_checks')
if snab_only_checks:
snab_only_checks_sent = []
for alert in all_alerts:
for snab_check in snab_only_checks:
try:
metric = snab_check['metric']
anomaly_data = snab_check['anomaly_data']
anomaly_timestamp = snab_check['timestamp']
original_anomaly_timestamp = snab_check['original_anomaly_timestamp']
value = snab_check['value']
original_added_at = snab_check['original_added_at']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to details from snab_only_checks snab_check dict - %s' % str(snab_check))
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, metric, [alert[0]])
if LOCAL_DEBUG and pattern_match:
logger.debug('debug :: %s matched alert - %s' % (base_name, alert[0]))
try:
del metric_matched_by
except:
pass
except:
pattern_match = False
if pattern_match:
check_full_duration = (int(alert[3]) * 60 * 60)
for app in SNAB_CHECKS:
if app == skyline_app and check_full_duration:
for snab_context in SNAB_CHECKS[app]:
if snab_context == 'testing':
for algorithm in SNAB_CHECKS[app][snab_context]:
try:
alert_slack_channel = SNAB_CHECKS[app][snab_context][algorithm]['alert_slack_channel']
except:
alert_slack_channel = None
try:
algorithm_source = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_source']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to verify algorithm_source for %s for the %s context' % (
algorithm, snab_context))
break
if not os.path.isfile(algorithm_source):
logger.error('error :: algorithm_source file %s does not exist for %s for the %s context' % (
algorithm_source, algorithm, snab_context))
break
try:
algorithm_parameters = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_parameters']
except:
algorithm_parameters = {}
try:
max_execution_time = SNAB_CHECKS[app][snab_context][algorithm]['max_execution_time']
except:
max_execution_time = 1.0
try:
algo_debug_logging = SNAB_CHECKS[app][snab_context][algorithm]['debug_logging']
except:
algo_debug_logging = False
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
if namespace in metric:
algorithm_parameters['metric'] = metric
snab_check_details = {
'metric': metric,
'timestamp': int(anomaly_timestamp),
'original_anomaly_timestamp': int(original_anomaly_timestamp),
'value': value,
'full_duration': check_full_duration,
'anomaly_data': anomaly_data,
'source': 'mirage',
'added_at': added_to_snab_at,
'original_added_at': original_added_at,
'context': snab_context,
'algorithm': algorithm,
'algorithm_source': algorithm_source,
'algorithm_parameters': algorithm_parameters,
'max_execution_time': max_execution_time,
'debug_logging': algo_debug_logging,
'alert_slack_channel': alert_slack_channel,
'processed': None,
'analysed': None,
'anomalous': None,
'anomalyScore': None,
'snab_only': True,
}
add_snab_check = True
if metric in snab_only_checks_sent:
add_snab_check = False
break
if add_snab_check:
self.redis_conn.sadd('snab.work', str(snab_check_details))
logger.info('added snab_only check for %s with algorithm %s for alerter %s' % (
metric, algorithm, str(alert[1])))
snab_only_checks_sent.append(metric)
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check and add check_details to snab.work Redis set if required')
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
mirage_not_anomalous_metrics = []
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# literal_mirage_not_anomalous_metrics = list(self.redis_conn.smembers('mirage.not_anomalous_metrics'))
literal_mirage_not_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage.not_anomalous_metrics'))
for metric_list_string in literal_mirage_not_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_not_anomalous_metrics.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine list from mirage.not_anomalous_metrics Redis set')
mirage_not_anomalous_metrics = []
if settings.NEGATE_ANALYZER_ALERTS:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# if len(self.anomalous_metrics) == 0:
if len(mirage_anomalous_metrics) == 0:
# @modified 20200610 - Feature #3560: External alert config
# for negate_alert in settings.ALERTS:
for negate_alert in all_alerts:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# for not_anomalous_metric in self.not_anomalous_metrics:
for not_anomalous_metric in mirage_not_anomalous_metrics:
NEGATE_ALERT_MATCH_PATTERN = negate_alert[0]
NOT_ANOMALOUS_METRIC_PATTERN = not_anomalous_metric[1]
alert_match_pattern = re.compile(NEGATE_ALERT_MATCH_PATTERN)
negate_pattern_match = alert_match_pattern.match(NOT_ANOMALOUS_METRIC_PATTERN)
if negate_pattern_match:
try:
logger.info('negate alert sent: For %s' % (not_anomalous_metric[1]))
trigger_negater(negate_alert, not_anomalous_metric, second_order_resolution_seconds, metric_value)
except Exception as e:
logger.error('error :: could not send alert: %s' % e)
# @added 20220316 - Feature #4482: Test alerts
# Remove test_alert key if metric has been done and anything went wrong
# so that mirage does not loop continuosly on the metric failing
if test_alerts:
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_metric = test_alerts[test_alert_timestamp]['metric']
alert_tested_key = 'mirage.test_alerts.done.%s' % test_metric
alert_tested = None
try:
alert_tested = self.redis_conn_decoded.get(alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s - %s' % (
alert_tested_key, err))
if not alert_tested and '_tenant_id="' in test_metric:
use_metric = None
try:
metric_id = get_metric_id_from_base_name(skyline_app, test_metric)
if metric_id:
use_metric = 'labelled_metrics.%s' % str(metric_id)
except Exception as err:
logger.error('error :: get_base_name_from_labelled_metrics_name failed for test alert metric %s - %s' % (
test_metric, err))
if use_metric:
alert_tested_key = 'mirage.test_alerts.done.%s' % use_metric
try:
alert_tested = self.redis_conn_decoded.get(alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s - %s' % (
alert_tested_key, err))
if alert_tested:
test_alert_redis_key = '%s.test_alerts' % skyline_app
self.redis_conn_decoded.hdel(test_alert_redis_key, test_alert_timestamp)
logger.info('removed test_alert for %s from Redis key %s' % (
str(test_metric), str(test_alert_redis_key)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove test_alert for %s from Redis key %s - %s' % (
str(test_metric), str(test_alert_redis_key), err))
# Log progress
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# if len(self.anomalous_metrics) > 0:
if len(mirage_anomalous_metrics) > 0:
try:
logger.info('seconds since last anomaly :: %.2f' % (time() - now))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# logger.info('total anomalies :: %d' % len(self.anomalous_metrics))
logger.info('total anomalies :: %d' % len(mirage_anomalous_metrics))
logger.info('exception stats :: %s' % str(exceptions))
logger.info('anomaly breakdown :: %s' % str(anomaly_breakdown))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to log stats - %s' % err)
# Log to Graphite
if process_metric_check_files:
run_time = time() - run_timestamp
logger.info('seconds to run :: %.2f' % run_time)
graphite_run_time = '%.2f' % run_time
send_metric_name = skyline_app_graphite_namespace + '.run_time'
send_graphite_metric(self, skyline_app, send_metric_name, graphite_run_time)
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Add the mirage metric and its EXPIRATION_TIME to
# the mirage.metrics_expiration_times so that Mirage
# can determine the metric EXPIRATION_TIME without
# having to create and iterate the all_alerts
# object in the Mirage analysis phase so that the
# reported anomaly timestamp can be used to determine
# whether the EXPIRATION_TIME should be applied to a
# batch metric in the alerting and Ionosphere contexts
# @modified 20201107 - Feature #3830: metrics_manager
# Use metrics_manager data, now managed there
# mirage_metrics_expiration_times = []
# try:
# mirage_metrics_expiration_times = list(self.redis_conn_decoded.smembers('mirage.metrics_expiration_times'))
# if LOCAL_DEBUG:
# logger.info('debug :: fetched the mirage.metrics_expiration_times Redis set')
# except:
# logger.info('failed to fetch the mirage.metrics_expiration_times Redis set')
# mirage_metrics_expiration_times = []
# try:
# mirage_unique_metrics = list(self.redis_conn_decoded.smembers('mirage.unique_metrics'))
# mirage_unique_metrics_count = len(mirage_unique_metrics)
# logger.info('mirage.unique_metrics Redis set count - %s' % str(mirage_unique_metrics_count))
# if LOCAL_DEBUG:
# logger.info('debug :: fetched the mirage.unique_metrics Redis set')
# logger.info('debug :: %s' % str(mirage_unique_metrics))
# except:
# logger.info('failed to fetch the mirage.unique_metrics Redis set')
# mirage_unique_metrics == []
# for metric in mirage_unique_metrics:
# if metric.startswith(settings.FULL_NAMESPACE):
# base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
# else:
# base_name = metric
# mirage_alert_expiration_data = [base_name, int(alert[2])]
# if str(mirage_alert_expiration_data) not in mirage_metrics_expiration_times:
# try:
# self.redis_conn.sadd('mirage.metrics_expiration_times', str(mirage_alert_expiration_data))
# if LOCAL_DEBUG:
# logger.info('debug :: added %s to mirage.metrics_expiration_times' % str(mirage_alert_expiration_data))
# except:
# if LOCAL_DEBUG:
# logger.error('error :: failed to add %s to mirage.metrics_expiration_times set' % str(mirage_alert_expiration_data))
# Reset counters
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets instead of Manager().list()
# self.anomalous_metrics[:] = []
# self.not_anomalous_metrics[:] = []
# Reset metric_variables
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets instead of Manager().list()
# self.metric_variables[:] = []
# self.sent_to_crucible[:] = []
# self.sent_to_panorama[:] = []
# self.sent_to_ionosphere[:] = []
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets instead of Manager().list()
delete_redis_sets = [
'mirage.anomalous_metrics',
'mirage.not_anomalous_metrics',
'mirage.metric_variables',
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Handle once per minute
# 'mirage.sent_to_crucible',
# 'mirage.sent_to_panorama',
# 'mirage.sent_to_ionosphere',
]
for i_redis_set in delete_redis_sets:
redis_set_to_delete = i_redis_set
try:
self.redis_conn.delete(redis_set_to_delete)
logger.info('deleted Redis set - %s' % redis_set_to_delete)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % redis_set_to_delete)
# DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
for i in get_objects():
after[type(i)] += 1
gc_results = [(k, after[k] - before[k]) for k in after if after[k] - before[k]]
for gc_result in gc_results:
logger.info('debug :: %s' % str(gc_result))
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
# show the dirt ;-)
try:
logger.info('garbage collecting')
all_the_garbage = str(self.dump_garbage())
except:
logger.error('error :: during garbage collecting')
logger.error(traceback.format_exc())
all_the_garbage = 'gc errored'
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info(all_the_garbage)
logger.info('garbage collected')
if LOCAL_DEBUG:
logger.info('debug :: Memory usage end of run: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
# Send checks.stale_discarded and checks.pending metrics
if int(time()) >= (last_sent_to_graphite + 60):
stale_check_discarded = []
try:
stale_check_discarded = list(self.redis_conn_decoded.smembers('mirage.stale_check_discarded'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get mirage.stale_check_discarded set from Redis')
stale_check_discarded = []
stale_check_discarded_count = len(stale_check_discarded)
logger.info('checks.stale_discarded :: %s' % str(stale_check_discarded_count))
send_metric_name = '%s.checks.stale_discarded' % skyline_app_graphite_namespace
send_graphite_metric(self, skyline_app, send_metric_name, str(stale_check_discarded_count))
checks_pending = [f_pending for f_pending in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f_pending))]
checks_pending_count = len(checks_pending)
logger.info('checks.pending :: %s' % str(checks_pending_count))
send_metric_name = '%s.checks.pending' % skyline_app_graphite_namespace
send_graphite_metric(self, skyline_app, send_metric_name, str(checks_pending_count))
# @modified 20210309 - Task #3730: Validate Mirage running multiple processes
# Reimplement mirage.checks.done count as increment key
# checks_done = []
# try:
# checks_done = list(self.redis_conn_decoded.smembers('mirage.checks.done'))
checks_done = 0
try:
# @modified 20230205 - Task #4844: Replace Redis getset with set with get
# As of Redis version 6.2.0, this command is regarded as deprecated.
# It can be replaced by SET with the GET argument when migrating or writing new code.
# checks_done_str = self.redis_conn_decoded.getset('mirage.checks.done', 0)
checks_done_str = self.redis_conn_decoded.set('mirage.checks.done', 0, get=True)
if checks_done_str:
checks_done = int(checks_done_str)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get mirage.checks.done key from Redis')
checks_done = 0
# checks_done_count = len(checks_done)
# logger.info('checks.done :: %s' % str(checks_done_count))
logger.info('checks.done :: %s' % str(checks_done))
send_metric_name = '%s.checks.done' % skyline_app_graphite_namespace
# send_graphite_metric(self, skyline_app, send_metric_name, str(checks_done_count))
send_graphite_metric(self, skyline_app, send_metric_name, str(checks_done))
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Only send panorama, ionosphere and crucible metrics once a minute
if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED:
try:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# sent_to_crucible = str(len(self.sent_to_crucible))#
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# sent_to_crucible = str(len(list(self.redis_conn.smembers('mirage.sent_to_crucible'))))
sent_to_crucible = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_crucible'))))
except:
sent_to_crucible = '0'
logger.info('sent_to_crucible :: %s' % sent_to_crucible)
send_metric_name = '%s.sent_to_crucible' % skyline_app_graphite_namespace
send_graphite_metric(self, skyline_app, send_metric_name, sent_to_crucible)
if settings.PANORAMA_ENABLED:
try:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# sent_to_panorama = str(len(self.sent_to_panorama))
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# sent_to_panorama = str(len(list(self.redis_conn.smembers('mirage.sent_to_panorama'))))
sent_to_panorama = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_panorama'))))
except:
sent_to_panorama = '0'
logger.info('sent_to_panorama :: %s' % sent_to_panorama)
send_metric_name = '%s.sent_to_panorama' % skyline_app_graphite_namespace
send_graphite_metric(self, skyline_app, send_metric_name, sent_to_panorama)
if settings.IONOSPHERE_ENABLED:
try:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# sent_to_ionosphere = str(len(self.sent_to_ionosphere))
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# sent_to_ionosphere = str(len(list(self.redis_conn.smembers('mirage.sent_to_ionosphere'))))
sent_to_ionosphere = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_ionosphere'))))
except Exception as e:
logger.error('error :: could not determine sent_to_ionosphere: %s' % e)
sent_to_ionosphere = '0'
logger.info('sent_to_ionosphere :: %s' % sent_to_ionosphere)
send_metric_name = '%s.sent_to_ionosphere' % skyline_app_graphite_namespace
send_graphite_metric(self, skyline_app, send_metric_name, sent_to_ionosphere)
last_sent_to_graphite = int(time())
delete_redis_sets = [
'mirage.sent_to_crucible',
'mirage.sent_to_panorama',
'mirage.sent_to_ionosphere',
'mirage.stale_check_discarded',
# @modified 20210309 - Task #3730: Validate Mirage running multiple processes
# Reimplement mirage.checks.done count as increment key
# 'mirage.checks.done',
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
mirage_snab_only_checks_redis_set,
]
for i_redis_set in delete_redis_sets:
redis_set_to_delete = i_redis_set
try:
self.redis_conn.delete(redis_set_to_delete)
logger.info('deleted Redis set - %s' % redis_set_to_delete)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % redis_set_to_delete)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Refresh
try:
filesafe_names_dict = self.redis_conn_decoded.hgetall('metrics_manager.filesafe_base_names')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: hgetall metrics_manager.filesafe_base_names failed - %s' % err)
# @added 20230609 - Task #4806: Manage NUMBA_CACHE_DIR
# Feature #4702: numba optimisations
# Use start up key and allow numba cache files to be created
if starting:
try:
self.redis_conn.delete(start_key)
except Exception as err:
logger.error('error :: delete failed on Redis %s key - %s' % (
start_key, err))
# Sleep if it went too fast
if time() - now < 1:
logger.info('sleeping due to low run time...')
# sleep(10)
sleep(1)