"""
mirage.py
"""
import logging
try:
from Queue import Empty
except:
from queue import Empty
# from redis import StrictRedis
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list() to reduce memory and number of
# processes
# from multiprocessing import Process, Manager, Queue
from multiprocessing import Process, Queue
from os import kill, getpid
import traceback
import re
# imports required for surfacing graphite JSON formatted timeseries for use in
# Mirage
import json
import sys
import os
# import errno
# import imp
from os import listdir
import datetime
# import os.path
import resource
from shutil import rmtree
from ast import literal_eval
from os.path import join, isfile
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Added Unpacker
from msgpack import Unpacker, packb
import requests
try:
import urlparse
except ImportError:
# @modified 20191113 - Branch #3262: py3
# import urllib.parse
import urllib.parse as urlparse
import settings
# @modified 20160922 - Branch #922: Ionosphere
# Added the send_anomalous_metric_to skyline_functions.py
from skyline_functions import (
write_data_to_file, fail_check, send_anomalous_metric_to,
mkdir_p, send_graphite_metric, filesafe_metricname,
# @added 20170603 - Feature #2034: analyse_derivatives
nonNegativeDerivative, in_list,
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
get_redis_conn, get_redis_conn_decoded,
# @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url
# Bug #3778: Handle single encoded forward slash requests to Graphite
# sanitise_graphite_url,
# @added 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
encode_graphite_metric_name,
# @added 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
get_graphite_metric,
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
sort_timeseries,
)
# @added 20200425 - Feature #3512: matched_or_regexed_in_list function
# Feature #3508: ionosphere.untrainable_metrics
# Feature #3486: analyzer_batch
from matched_or_regexed_in_list import matched_or_regexed_in_list
from mirage_alerters import trigger_alert
from negaters import trigger_negater
from mirage_algorithms import run_selected_algorithm
from algorithm_exceptions import TooShort, Stale, Boring
# @added 20220315 - Feature #4482: Test alerts
from functions.redis.get_test_alerts import get_test_alerts
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.timeseries.downsample import downsample_timeseries
from functions.mirage.downsample_full_duration_and_merge_graphite import downsample_full_duration_and_merge_graphite
# @added 20220504 - Feature #2580: illuminance
from functions.illuminance.add_illuminance_entries import add_illuminance_entries
"""
ENABLE_MEMORY_PROFILING - DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Added all the memory profiling blocks - mem_top, pympler, objgraph, gc
Garbage collection et al, should not be run in anything but development model,
therefore these variables are hard coded and not accessible via settings.py,
if you are in here reading this then knock yourself out. gc and dump_garbage
can be useful for getting an idea about what all the objects in play are, but
garbage collection will just take longer and longer to run.
"""
LOCAL_DEBUG = False
ENABLE_MEMORY_PROFILING = False
garbage_collection_enabled = False
if ENABLE_MEMORY_PROFILING:
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# As per http://stackoverflow.com/a/1641280
# This got useable understandable data
if garbage_collection_enabled:
from gc import get_objects
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
import gc
skyline_app = 'mirage'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(sys.version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
MIRAGE_PERIODIC_CHECK = settings.MIRAGE_PERIODIC_CHECK
except:
MIRAGE_PERIODIC_CHECK = False
# @added 20200413 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
try:
from settings import BATCH_PROCESSING
except:
BATCH_PROCESSING = None
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
BATCH_PROCESSING_NAMESPACES = list(settings.BATCH_PROCESSING_NAMESPACES)
except:
BATCH_PROCESSING_NAMESPACES = []
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
# from settings import KNOWN_NEGATIVE_METRICS
KNOWN_NEGATIVE_METRICS = list(settings.KNOWN_NEGATIVE_METRICS)
except:
KNOWN_NEGATIVE_METRICS = []
# @added 20200604 - Mirage - populate_redis
try:
from settings import MIRAGE_AUTOFILL_TOOSHORT
except:
MIRAGE_AUTOFILL_TOOSHORT = False
# @added 20200607 - Feature #3566: custom_algorithms
try:
CUSTOM_ALGORITHMS = settings.CUSTOM_ALGORITHMS
except:
CUSTOM_ALGORITHMS = None
try:
DEBUG_CUSTOM_ALGORITHMS = settings.DEBUG_CUSTOM_ALGORITHMS
except:
DEBUG_CUSTOM_ALGORITHMS = False
# @added 20200723 - Feature #3472: ionosphere.training_data Redis set
# Feature #3566: custom_algorithms
try:
MIRAGE_ALWAYS_METRICS = list(settings.MIRAGE_ALWAYS_METRICS)
except:
MIRAGE_ALWAYS_METRICS = []
# @added 20200610 - Feature #3560: External alert config
try:
EXTERNAL_ALERTS = settings.EXTERNAL_ALERTS
except:
EXTERNAL_ALERTS = {}
if EXTERNAL_ALERTS:
from external_alert_configs import get_external_alert_configs
# @added 20200913 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Info #1792: Shapelet extraction
try:
SNAB_ENABLED = settings.SNAB_ENABLED
except:
SNAB_ENABLED = False
try:
SNAB_CHECKS = settings.SNAB_CHECKS.copy()
except:
SNAB_CHECKS = {}
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
mirage_snab_only_checks_redis_set = 'mirage.snab_only_checks'
# @added 20201026 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle feedback metrics in a similar style to Ionosphere
try:
SKYLINE_FEEDBACK_NAMESPACES = list(settings.SKYLINE_FEEDBACK_NAMESPACES)
except:
# Let us take a guess
try:
graphite_host = str(settings.GRAPHITE_HOST)
graphite_hostname = graphite_host.split('.', -1)[0]
SKYLINE_FEEDBACK_NAMESPACES = [settings.SERVER_METRICS_NAME, graphite_hostname]
except:
SKYLINE_FEEDBACK_NAMESPACES = [this_host]
# @added 20210701 - Feature #4152: DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES
try:
DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES = list(settings.DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES)
except:
DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES = []
# @added 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Not introduced as a settings, making this the default behaviour
# try:
# MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = settings.MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# except:
# MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = False
MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS = True
# @added 20210323 - Feature #3642: Anomaly type classification
try:
LUMINOSITY_CLASSIFY_ANOMALIES = settings.LUMINOSITY_CLASSIFY_ANOMALIES
except:
LUMINOSITY_CLASSIFY_ANOMALIES = False
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
failed_checks_dir = '%s_failed' % settings.MIRAGE_CHECK_PATH
# @added 20191107 - Branch #3262: py3
alert_test_file = '%s/%s_alert_test.txt' % (settings.SKYLINE_TMP_DIR, skyline_app)
[docs]class Mirage(Thread):
"""
The Mirage thread
"""
def __init__(self, parent_pid):
"""
Initialize the Mirage
"""
super(Mirage, self).__init__()
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Task #3032: Debug number of Python processes and memory use
# Branch #3002: docker
# Reduce amount of Manager instances that are used as each requires a
# copy of entire memory to be copied into each subprocess so this
# results in a python process per Manager instance, using as much
# memory as the parent. OK on a server, not so much in a container.
# Disabled all the Manager().list() below and replaced with Redis sets
# self.anomalous_metrics = Manager().list()
self.mirage_exceptions_q = Queue()
self.mirage_anomaly_breakdown_q = Queue()
# self.not_anomalous_metrics = Manager().list()
# self.metric_variables = Manager().list()
# self.ionosphere_metrics = Manager().list()
# self.sent_to_crucible = Manager().list()
# self.sent_to_panorama = Manager().list()
# self.sent_to_ionosphere = Manager().list()
# @added 20170603 - Feature #2034: analyse_derivatives
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
# @modified 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Use get_redis_conn and get_redis_conn_decoded to use on Redis sets when the bytes
# types need to be decoded as utf-8 to str
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(
# password=settings.REDIS_PASSWORD,
# unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(
# unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
# @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics
# Log warning
logger.warning('warning :: parent or current process dead')
sys.exit(0)
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
[docs] def spawn_alerter_process(self, alert, metric, second_order_resolution_seconds, context, triggered_algorithms):
"""
Spawn a process to trigger an alert. This is used by smtp alerters so
that matplotlib objects are cleared down and the alerter cannot create
a memory leak in this manner and plt.savefig keeps the object in memory
until the process terminates. Seeing as data is being surfaced and
processed in the alert_smtp context, multiprocessing the alert creation
and handling prevents any memory leaks in the parent.
# @added 20160814 - Bug #1558: Memory leak in Analyzer
# Issue #21 Memory leak in Analyzer
# https://github.com/earthgecko/skyline/issues/21
"""
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(alert, metric, second_order_resolution_seconds, context, triggered_algorithms)
# @modified 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until):
# @modified 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Deprecating self.surface_graphite_metric_data in all mirage functions
# (EXCEPT for in populate_redis) so that the same function can be used
# for all graphite requests. get_graphite_metric does the derivative,
# zero_fill and last_known_value functions so no longer required here.
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Removed high_res parameter and will no longer be used not that the
# FULL_DURATION data is being backwards resampled
# def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until, high_res=False):
[docs] def surface_graphite_metric_data(self, metric_name, graphite_from, graphite_until):
# @added 20160803 - Unescaped Graphite target - https://github.com/earthgecko/skyline/issues/20
# bug1546: Unescaped Graphite target
# @modified 20191107 - Branch #3263: py3
# Commented out colon
# new_metric_namespace = metric_name.replace(':', '\:')
# metric_namespace = new_metric_namespace.replace('(', '\(')
metric_namespace = metric_name.replace('(', '\(')
metric_name = metric_namespace.replace(')', '\)')
# @added 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
encoded_graphite_metric_name = encode_graphite_metric_name(skyline_app, metric_name)
try:
# We use absolute time so that if there is a lag in mirage the correct
# timeseries data is still surfaced relevant to the anomalous datapoint
# timestamp
if settings.GRAPHITE_PORT != '':
# @modified 20190520 - Branch #3002: docker
# Use GRAPHITE_RENDER_URI
# url = '%s://%s:%s/render/?from=%s&until=%s&target=%s&format=json' % (
# settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
# str(settings.GRAPHITE_PORT), graphite_from, graphite_until,
# metric_name)
url = '%s://%s:%s/%s/?from=%s&until=%s&target=%s&format=json' % (
settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
str(settings.GRAPHITE_PORT), settings.GRAPHITE_RENDER_URI,
# @modified 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
# graphite_from, graphite_until, metric_name)
graphite_from, graphite_until, encoded_graphite_metric_name)
else:
# @modified 20190520 - Branch #3002: docker
# Use GRAPHITE_RENDER_URI
# url = '%s://%s/render/?from=%s&until=%s&target=%s&format=json' % (
# settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
# graphite_from, graphite_until, metric_name)
url = '%s://%s/%s/?from=%s&until=%s&target=%s&format=json' % (
settings.GRAPHITE_PROTOCOL, settings.GRAPHITE_HOST,
settings.GRAPHITE_RENDER_URI, graphite_from, graphite_until,
# @modified 20201013 - Feature #3780: skyline_functions - sanitise_graphite_url
# metric_name)
encoded_graphite_metric_name)
r = requests.get(url)
js = r.json()
datapoints = js[0]['datapoints']
except:
logger.error(traceback.format_exc())
logger.error('error :: surface_graphite_metric_data :: failed to get data from Graphite')
return False
try:
converted = []
for datapoint in datapoints:
try:
new_datapoint = [float(datapoint[1]), float(datapoint[0])]
converted.append(new_datapoint)
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
except: # nosec
continue
parsed = urlparse.urlparse(url)
target = urlparse.parse_qs(parsed.query)['target'][0]
metric_data_folder = str(settings.MIRAGE_DATA_FOLDER) + "/" + target
mkdir_p(metric_data_folder)
# @added 20201208 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# @modified 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# No longer used
# if high_res:
# with open(metric_data_folder + "/" + target + '.high_res', 'w') as f:
# f.write(json.dumps(converted))
# f.close()
# return True
with open(metric_data_folder + "/" + target + '.json', 'w') as f:
f.write(json.dumps(converted))
f.close()
return True
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: surface_graphite_metric_data :: failed to convert Graphite data and write to file - %s' % err)
return False
# @added 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Bug #1460: panorama check file fails
# Panorama check file fails #24
# Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def mirage_load_metric_vars(self, metric_vars_file):
"""
Load the metric variables for a check from a metric check variables file
:param metric_vars_file: the path and filename to the metric variables files
:type metric_vars_file: str
:return: the metric_vars list or ``False``
:rtype: list
"""
if os.path.isfile(metric_vars_file):
logger.info(
'loading metric variables from metric_check_file - %s' % (
str(metric_vars_file)))
else:
logger.error(
'error :: loading metric variables from metric_check_file - file not found - %s' % (
str(metric_vars_file)))
return False
metric_vars = []
with open(metric_vars_file) as f:
for line in f:
no_new_line = line.replace('\n', '')
no_equal_line = no_new_line.replace(' = ', ',')
array = str(no_equal_line.split(',', 1))
add_line = literal_eval(array)
metric_vars.append(add_line)
# @added 20200429 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Allow the check file to already hold a valid python list on one line
# so that a check can be added by simply echoing to debug metric_vars
# line from to log for any failed checks into a new Mirage check file
# The original above pattern is still the default, this is for the check
# files to be added by the operator from the log or for debugging.
try_literal_eval = False
if metric_vars:
if isinstance(metric_vars, list):
pass
else:
try_literal_eval = True
logger.info('metric_vars is not a list, set to try_literal_eval')
if len(metric_vars) < 2:
try_literal_eval = True
logger.info('metric_vars is not a list of lists, set to try_literal_eval')
else:
try_literal_eval = True
logger.info('metric_vars is not defined, set to try_literal_eval')
if try_literal_eval:
try:
with open(metric_vars_file) as f:
for line in f:
metric_vars = literal_eval(line)
if metric_vars:
break
except:
logger.error(traceback.format_exc())
logger.error('metric_vars not loaded with literal_eval')
metric_vars = []
string_keys = ['metric']
float_keys = ['value']
int_keys = ['hours_to_resolve', 'metric_timestamp']
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
boolean_keys = ['snab_only_check']
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms to mirage_check_file
list_keys = ['triggered_algorithms']
metric_vars_array = []
for var_array in metric_vars:
# @modified 20181023 - Feature #2618: alert_slack
# Wrapped in try except for debugging issue where the
# hours_to_resolve was interpolating to hours_to_resolve = "t"
try:
key = None
value = None
if var_array[0] in string_keys:
key = var_array[0]
_value_str = str(var_array[1]).replace("'", '')
value_str = str(_value_str).replace('"', '')
value = str(value_str)
if var_array[0] == 'metric':
metric = value
if var_array[0] in float_keys:
key = var_array[0]
_value_str = str(var_array[1]).replace("'", '')
value_str = str(_value_str).replace('"', '')
value = float(value_str)
if var_array[0] in int_keys:
key = var_array[0]
_value_str = str(var_array[1]).replace("'", '')
value_str = str(_value_str).replace('"', '')
value = int(float(value_str))
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Handle new snab_only_check boolean
if var_array[0] in boolean_keys:
key = var_array[0]
logger.debug(
'debug :: boolean key - key: %s, value: %s' % (
str(var_array[0]), str(var_array[1])))
if str(var_array[1]) == '"True"':
value = True
else:
value = False
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms to mirage_check_file
if var_array[0] in list_keys:
key = var_array[0]
logger.debug(
'debug :: list key - key: %s, value: %s' % (
str(var_array[0]), str(var_array[1])))
_value_str = str(var_array[1]).replace("'", '')
try:
value = literal_eval(var_array[1])
except Exception as e:
logger.error(
'error :: loading metric variables - failed to literal_eval list for %s, %s - %s' % (
str(key), str(var_array[1]), e))
value = []
if key:
metric_vars_array.append([key, value])
if len(metric_vars_array) == 0:
logger.error(
'error :: loading metric variables - none found from %s' % (
str(metric_vars_file)))
return False
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to load metric variables from check file - %s' % (metric_vars_file))
return False
logger.info('debug :: metric_vars for %s' % str(metric))
logger.info('debug :: %s' % str(metric_vars_array))
return metric_vars_array
[docs] def dump_garbage(self):
"""
DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
show us what's the garbage about
"""
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
# force collection
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info('debug :: GARBAGE')
try:
gc.collect()
gc_collect_ok = True
except:
logger.error('error :: gc.collect failed')
logger.error(traceback.format_exc())
gc_collect_ok = False
if gc_collect_ok:
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info('debug :: GARBAGE OBJECTS')
for x in gc.garbage:
s = str(x)
if len(s) > 80:
s = s[:80]
# print type(x), "\n ", s
try:
log_string = type(x), "\n ", s
log_string = 'unused variable for testing only'
except:
logger.error(traceback.format_exc())
logger.error('error :: print x and s')
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info(log_string)
else:
return None
# @added 20200604 - Mirage - populate_redis
[docs] def populate_redis(self, i, metric):
"""
Get FULL_DURATION data from Graphite for a metric and populate Redis
"""
# Check if it has been done via the mirage.redis_populate key
redis_populated = False
redis_populated_key = 'mirage.redis_populated.%s' % metric
try:
redis_populated = self.redis_conn_decoded.get(redis_populated_key)
except Exception as e:
logger.error(
'error :: populate_redis :: could not query cache_key - %s - %s' % (
redis_populated_key, e))
redis_populated = False
# Do not handle batch processing metrics
batch_processing_metrics = []
try:
# @modified 20220113 - Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# Use aet.analyzer.batch_processing_metrics
# batch_processing_metrics = list(self.redis_conn_decoded.smembers('analyzer.batch_processing_metrics'))
batch_processing_metrics = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to get analyzer.batch_processing_metrics from Redis')
batch_processing_metrics = None
if batch_processing_metrics:
if metric in batch_processing_metrics:
redis_populated = True
logger.info('populate_redis :: %s is a batch processing metric, not handling, creating Redis key %s' % (
metric, redis_populated_key))
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, int(time()))
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
if redis_populated:
logger.info('populate_redis :: the Redis key %s already exists, it has been done' % (redis_populated_key))
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
time_now = int(time())
time_from = int(time_now - settings.FULL_DURATION)
# Calculate graphite from and until parameters from the metric timestamp
graphite_until = datetime.datetime.fromtimestamp(int(float(time_now))).strftime('%H:%M_%Y%m%d')
graphite_from = datetime.datetime.fromtimestamp(int(time_from)).strftime('%H:%M_%Y%m%d')
# Remove any old json file related to the metric
metric_data_folder = '%s/%s' % (settings.MIRAGE_DATA_FOLDER, metric)
metric_json_file = '%s/%s.json' % (metric_data_folder, str(metric))
try:
os.remove(metric_json_file)
except OSError:
pass
# Get data from graphite
logger.info('populate_redis :: surfacing %s time series from Graphite' % (metric))
try:
# @modified 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
# The self.surface_graphite_metric_data is used here because the
# get_graphite_metric function automatically applies
# nonNegativeDerivative to a metric if it is a derivative_metric and
# in the context of populating Redis, that is not desired
self.surface_graphite_metric_data(metric, graphite_from, graphite_until)
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: get_graphite_metric failed to surface_graphite_metric_data to populate %s' % (
str(metric_json_file)))
# Check there is a json timeseries file to use
if not os.path.isfile(metric_json_file):
logger.error(
'error :: populate_redis :: retrieve failed - failed to surface %s time series from graphite' % (
metric))
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
logger.info('populate_redis :: retrieved data :: for %s' % (
metric))
self.check_if_parent_is_alive()
timeseries = []
try:
with open((metric_json_file), 'r') as f:
timeseries = json.loads(f.read())
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to get timeseries from json - %s' % metric_json_file)
timeseries = []
if not timeseries:
logger.info('populate_redis :: no timeseries data for %s, setting redis_populated_key and removing from mirage.populate_redis' % metric)
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
try:
os.remove(metric_json_file)
except OSError:
pass
FULL_NAMESPACE = settings.FULL_NAMESPACE
pipe = None
logger.info('populate_redis :: time series data for %s, populating Redis with %s data points' % (
metric, str(len(timeseries))))
try:
pipe = self.redis_conn.pipeline()
except Exception as e:
logger.error('error :: populate_redis :: error on Redis pipe: %s' % (str(e)))
pipe = None
redis_populated = False
try:
for metric_data in timeseries:
key = ''.join((FULL_NAMESPACE, metric))
try:
pipe.append(str(key), packb(metric_data))
except Exception as e:
logger.error('error :: populate_redis :: error on pipe.append: %s' % (str(e)))
pipe.execute()
redis_populated = True
except Exception as e:
logger.error('error :: populate_redis :: error on pipe.execute: %s' % (str(e)))
if redis_populated:
del timeseries
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
# Add to Redis set so that Analyzer sorts and deduplicates the data
# on the next run
try:
self.redis_conn.sadd('mirage.filled', metric)
logger.info('populate_redis :: add %s to Redis set mirage.filled for Analyzer to sort and deduplicate the Redis data' % metric)
except Exception as e:
logger.error('error :: populate_redis :: failed add metric to Redis set mirage.filled: %s' % e)
try:
self.redis_conn.setex(redis_populated_key, settings.FULL_DURATION, time_now)
logger.info('populate_redis :: created Redis key %s' % (redis_populated_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to create Redis key %s' % redis_populated_key)
try:
self.redis_conn.srem('mirage.populate_redis', metric)
logger.info('populate_redis :: removed item - %s - from Redis set mirage.populate_redis' % (metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: populate_redis :: failed to remove item %s from Redis set mirage.populate_redis' % metric)
return
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes
# def spin_process(self, i, run_timestamp):
[docs] def spin_process(self, i, run_timestamp, metric_check_filename):
"""
Assign a metric for a process to analyze.
"""
# if int(i) > 1:
# i_less_one = int(i) - 1
# sleep_for_str = '0.%s' % str(i_less_one)
# logger.info('process %s sleeping for %s' % (str(i), sleep_for_str))
# sleep(float(sleep_for_str))
# Discover metric to analyze
# metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))]
# Check if this process is unnecessary
# if len(metric_var_files) == 0:
# logger.info('no check files found, nothing to do')
# return
# metric_var_files_sorted = sorted(metric_var_files)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
# Ensure the process locks the check
# metric_check_filename = None
# for i_metric_check_file in metric_var_files_sorted:
# check_assigned = False
# cache_key = 'mirage.check.lock.%s' % str(i_metric_check_file)
# try:
# check_assigned = self.redis_conn.get(cache_key)
# if not check_assigned:
# try:
# self.redis_conn.setex(cache_key, 120, int(time()))
# metric_check_filename = str(i_metric_check_file)
# logger.info('assigned self check file and set Redis key - %s' % (cache_key))
# self.redis_conn.sadd('mirage.checks.done', metric_check_filename)
# break
# except:
# logger.error(traceback.format_exc())
# logger.error('error :: failed to set Redis key - %s' % cache_key)
# else:
# logger.info('already assigned, Redis key exists - %s' % (cache_key))
#
# except:
# logger.error(traceback.format_exc())
# logger.error('error :: failed to check if Redis key exists - %s' % cache_key)
if not metric_check_filename:
logger.info('no check to assign to process, nothing to do')
return
metric_check_file = '%s/%s' % (
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# settings.MIRAGE_CHECK_PATH, str(metric_var_files_sorted[0]))
settings.MIRAGE_CHECK_PATH, metric_check_filename)
check_file_name = os.path.basename(str(metric_check_file))
check_file_timestamp = check_file_name.split('.', 1)[0]
check_file_metricname_txt = check_file_name.split('.', 1)[1]
check_file_metricname = check_file_metricname_txt.replace('.txt', '')
check_file_metricname_dir = check_file_metricname.replace('.', '/')
metric_failed_check_dir = '%s/%s/%s' % (failed_checks_dir, check_file_metricname_dir, check_file_timestamp)
# Load metric variables
# @modified 20160822 - Bug #1460: panorama check file fails
# Changed to panorama style skyline_functions load_metric_vars
# self.load_metric_vars(metric_check_file)
# Load and validate metric variables
try:
# @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Bug #1460: panorama check file fails
# Panorama check file fails #24
# Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
# metric_vars = load_metric_vars(skyline_app, str(metric_check_file))
metric_vars_array = self.mirage_load_metric_vars(str(metric_check_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to load metric variables from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
metric = None
# @added 20200106 - Branch #3262: py3
# Task #3034: Reduce multiprocessing Manager list usage
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# redis_set_to_delete = 'mirage.metric_variables'
redis_metric_variables_set = 'mirage.%s.metric_variables' % str(i)
redis_set_to_delete = redis_metric_variables_set
try:
self.redis_conn.delete(redis_set_to_delete)
logger.info('deleted Redis set - %s' % redis_set_to_delete)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % redis_set_to_delete)
try:
key = 'metric'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
metric = str(value_list[0])
metric_name = ['metric_name', metric]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(metric_name)
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
redis_set = 'mirage.metric_variables'
data = str(metric_name)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
logger.info('debug :: added metric_name %s from check file - %s' % (str(metric_name), metric_check_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file))
return
if not metric:
logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file))
return
value = None
try:
key = 'value'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
value = float(value_list[0])
metric_value = ['metric_value', value]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(metric_value)
redis_set = 'mirage.metric_variables'
data = str(metric_value)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file))
return
if not value:
# @modified 20181119 - Bug #2708: Failing to load metric vars
if value == 0.0:
pass
else:
logger.error('error :: failed to load value variable from check file - %s' % (metric_check_file))
return
hours_to_resolve = None
try:
key = 'hours_to_resolve'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
hours_to_resolve = int(value_list[0])
hours_to_resolve_list = ['hours_to_resolve', hours_to_resolve]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(hours_to_resolve_list)
redis_set = 'mirage.metric_variables'
data = str(hours_to_resolve_list)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error('error :: failed to read hours_to_resolve variable from check file - %s' % (metric_check_file))
return
if not hours_to_resolve:
logger.error('error :: failed to load hours_to_resolve variable from check file - %s' % (metric_check_file))
return
metric_timestamp = None
try:
key = 'metric_timestamp'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
metric_timestamp = int(value_list[0])
metric_timestamp_list = ['metric_timestamp', metric_timestamp]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.metric_variables.append(metric_timestamp_list)
redis_set = 'mirage.metric_variables'
data = str(metric_timestamp_list)
try:
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# self.redis_conn.sadd(redis_set, data)
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file))
return
if not metric_timestamp:
logger.error('error :: failed to load metric_timestamp variable from check file - %s' % (metric_check_file))
return
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
snab_only_check = None
try:
key = 'snab_only_check'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
snab_only_check = value_list[0]
except:
snab_only_check = None
snab_only_check_list = ['snab_only_check', snab_only_check]
redis_set = 'mirage.metric_variables'
data = str(snab_only_check_list)
try:
self.redis_conn.sadd(redis_metric_variables_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms to mirage_check_file
try:
key = 'triggered_algorithms'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
triggered_algorithms = value_list[0]
except:
triggered_algorithms = []
metric_data_dir = '%s/%s' % (settings.MIRAGE_DATA_FOLDER, str(metric))
# Ignore any metric check with a timestamp greater than MIRAGE_STALE_SECONDS
int_metric_timestamp = int(metric_timestamp)
int_run_timestamp = int(run_timestamp)
metric_timestamp_age = int_run_timestamp - int_metric_timestamp
periodic_mirage_check = False
if MIRAGE_PERIODIC_CHECK:
try:
mirage_periodic_check_metrics = list(self.redis_conn_decoded.smembers('mirage.periodic_check.metrics'))
except:
logger.error('error :: failed to get mirage_periodic_check_metrics from Redis')
mirage_periodic_check_metrics = []
redis_metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric))
if redis_metric_name in mirage_periodic_check_metrics:
logger.info('this is a periodic Mirage check for %s' % metric)
periodic_mirage_check = True
# @added 20200413 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Do not evaluate batch metrics against MIRAGE_STALE_SECONDS
if BATCH_PROCESSING:
# Is this a analyzer_batch related anomaly
analyzer_batch_anomaly = None
analyzer_batch_metric_anomaly_key = 'analyzer_batch.anomaly.%s.%s' % (
str(metric_timestamp), metric)
try:
# analyzer_batch_anomaly = self.redis_conn.get(analyzer_batch_metric_anomaly_key)
analyzer_batch_anomaly = self.redis_conn_decoded.get(analyzer_batch_metric_anomaly_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s - %s' % (
analyzer_batch_metric_anomaly_key, e))
analyzer_batch_anomaly = None
if analyzer_batch_anomaly:
logger.info('batch processing - identified as an analyzer_batch triggered anomaly from the presence of the Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
logger.info('batch processing - not identified as an analyzer_batch triggered anomaly as no Redis key found - %s' % analyzer_batch_metric_anomaly_key)
if analyzer_batch_anomaly:
logger.info('batch processing - setting metric_timestamp_age from %s to 1 so that will not be discarded as stale on %s' % (
str(metric_timestamp_age), metric))
metric_timestamp_age = 1
if metric_timestamp_age > settings.MIRAGE_STALE_SECONDS:
logger.info('stale check :: %s check request is %s seconds old - discarding' % (metric, str(metric_timestamp_age)))
# Remove metric check file
if os.path.isfile(metric_check_file):
os.remove(metric_check_file)
logger.info('removed check file - %s' % (metric_check_file))
else:
logger.info('could not remove check file - %s' % (metric_check_file))
# Remove the metric directory
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree - %s' % metric_data_dir)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
redis_set = 'mirage.stale_check_discarded'
try:
self.redis_conn.sadd(redis_set, str(metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(metric), str(redis_set)))
return
# Calculate hours second order resolution to seconds
second_order_resolution_seconds = int(hours_to_resolve) * 3600
# Calculate graphite from and until parameters from the metric timestamp
graphite_until = datetime.datetime.fromtimestamp(int(float(metric_timestamp))).strftime('%H:%M_%Y%m%d')
int_second_order_resolution_seconds = int(float(second_order_resolution_seconds))
second_resolution_timestamp = int_metric_timestamp - int_second_order_resolution_seconds
graphite_from = datetime.datetime.fromtimestamp(int(second_resolution_timestamp)).strftime('%H:%M_%Y%m%d')
# Remove any old json file related to the metric
metric_json_file = '%s/%s.json' % (metric_data_dir, str(metric))
try:
os.remove(metric_json_file)
except OSError:
pass
# Get data from graphite
logger.info(
'retrieve data :: surfacing %s time series from graphite for %s seconds' % (
metric, str(second_order_resolution_seconds)))
# @modified 20191113 - Branch #3262: py3
# Wrapped in try
try:
# @modified 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
# Deprecate self.surface_graphite_metric_data in mirage so that the
# same function can be used for all graphite requests
# self.surface_graphite_metric_data(metric, graphite_from, graphite_until)
metric_json_file_saved = get_graphite_metric(
skyline_app, metric, second_resolution_timestamp,
metric_timestamp, 'json', metric_json_file)
if metric_json_file_saved:
logger.info('%s time series data saved' % metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: get_graphite_metric failed to surface_graphite_metric_data to populate %s' % (
str(metric_json_file)))
# Check there is a json timeseries file to test
if not os.path.isfile(metric_json_file):
logger.error(
'error :: retrieve failed - failed to surface %s time series from graphite' % (
metric))
# @added 20200905 - Feature #3734: waterfall alerts
# Try a metric 3 times before removing the check file
remove_check_file = True
check_failed_key = 'mirage.check.data_retrieval_failed.%s.%s' % (
str(int_metric_timestamp), metric)
fail_count = 0
try:
fail_count = self.redis_conn.get(check_failed_key)
except:
fail_count = 0
if not fail_count:
fail_count = 0
fail_count += 1
if fail_count < 3:
remove_check_file = False
try:
self.redis_conn.setex(check_failed_key, 300, fail_count)
logger.info('updated fail_count to %s in %s' % (str(fail_count), check_failed_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to set Redis key %s with %s' % (
str(check_failed_key), str(fail_count)))
else:
logger.error('error :: fail_count is %s in %s, removing check file' % (str(fail_count), check_failed_key))
if remove_check_file:
# Remove metric check file
try:
os.remove(metric_check_file)
except OSError:
pass
# Remove the metric directory
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree %s' % metric_data_dir)
return
logger.info('retrieved data :: for %s at %s seconds' % (
metric, str(second_order_resolution_seconds)))
# Make process-specific dicts
exceptions = defaultdict(int)
anomaly_breakdown = defaultdict(int)
self.check_if_parent_is_alive()
timeseries = []
try:
with open((metric_json_file), 'r') as f:
timeseries = json.loads(f.read())
logger.info('data points surfaced :: %s' % (str(len(timeseries))))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to create timeseries from %s - %s' % (
str(metric_json_file), err))
timeseries = []
# @added 20170212 - Feature #1886: Ionosphere learn
# Only process if the metric has sufficient data
first_timestamp = None
try:
first_timestamp = int(timeseries[0][0])
except Exception as err:
logger.error('error :: could not determine first timestamp - %s' % err)
timestamp_now = int(time())
valid_if_before_timestamp = timestamp_now - int(settings.FULL_DURATION)
valid_mirage_timeseries = True
if first_timestamp:
if first_timestamp > valid_if_before_timestamp:
valid_mirage_timeseries = False
else:
valid_mirage_timeseries = False
logger.warning('warning :: no first_timestamp, valid_mirage_timeseries: %s' % str(valid_mirage_timeseries))
# @added 20170603 - Feature #2034: analyse_derivatives
# Convert the values of metrics strictly increasing monotonically
# to their deriative products
known_derivative_metric = False
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# derivative_metrics = list(self.redis_conn.smembers('derivative_metrics'))
# @modified 20211012 - Feature #4280: aet.metrics_manager.derivative_metrics Redis hash
# derivative_metrics = list(self.redis_conn_decoded.smembers('derivative_metrics'))
derivative_metrics = list(self.redis_conn_decoded.smembers('aet.metrics_manager.derivative_metrics'))
except:
derivative_metrics = []
redis_metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric))
if redis_metric_name in derivative_metrics:
known_derivative_metric = True
if known_derivative_metric:
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
non_derivative_monotonic_metrics = list(settings.NON_DERIVATIVE_MONOTONIC_METRICS)
except:
non_derivative_monotonic_metrics = []
skip_derivative = in_list(redis_metric_name, non_derivative_monotonic_metrics)
if skip_derivative:
known_derivative_metric = False
# @modified 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
# Deprecate self.surface_graphite_metric_data in mirage so that the
# same function can be used for all graphite requests.
# get_graphite_metric does the derivative, zero_fill and last_known_value
# functions so no longer required here.
check_for_derivative = False
if check_for_derivative:
if known_derivative_metric and valid_mirage_timeseries:
try:
derivative_timeseries = nonNegativeDerivative(timeseries)
timeseries = derivative_timeseries
except:
logger.error('error :: nonNegativeDerivative failed')
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
if snab_only_check:
snab_recheck_key = 'snab.recheck.%s' % metric
snab_recheck_key_exists = False
try:
snab_recheck_key_exists = self.redis_conn_decoded.get(snab_recheck_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s' % (
snab_recheck_key))
original_added_at = 0
if snab_recheck_key_exists:
logger.info('snab recheck key exists - %s' % snab_recheck_key)
try:
original_added_at = int(snab_recheck_key_exists)
except:
# The key expired
original_added_at = int(time()) - 300
else:
logger.info('snab recheck key does not exists - %s' % snab_recheck_key)
snab_recheck_original_anomaly_timestamp_key = 'snab.recheck.anomaly_timestamp.%s' % metric
snab_recheck_original_anomaly_timestamp_key_exists = False
try:
snab_recheck_original_anomaly_timestamp_key_exists = self.redis_conn_decoded.get(snab_recheck_original_anomaly_timestamp_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s' % (
snab_recheck_key))
original_anomaly_timestamp = int(timeseries[-1][0])
if snab_recheck_original_anomaly_timestamp_key_exists:
logger.info('snab.recheck.anomaly_timestamp key exists - %s' % snab_recheck_original_anomaly_timestamp_key)
try:
original_anomaly_timestamp = int(snab_recheck_original_anomaly_timestamp_key_exists)
except:
# The key expired
pass
else:
logger.info('snab.recheck.anomaly_timestamp key does not exists - %s' % snab_recheck_original_anomaly_timestamp_key)
snab_json_file_created = False
if snab_recheck_key_exists and snab_recheck_original_anomaly_timestamp_key_exists:
# Create timeseries json file with the timeseries
use_snab_timestamp = metric_timestamp
try:
use_snab_timestamp = int(timeseries[-1][0])
except:
pass
snab_json_file = '%s/%s.%s.json' % (
# settings.SNAB_DATA_DIR, str(int(metric_timestamp)), str(metric))
settings.SNAB_DATA_DIR, str(int(use_snab_timestamp)), str(metric))
timeseries_json = str(timeseries).replace('[', '(').replace(']', ')')
try:
snab_data_last_datapoint = [timeseries[-1][0], timeseries[-1][1]]
except:
snab_data_last_datapoint = [None, None, 'there was no timeseries data']
if timeseries_json:
try:
write_data_to_file(skyline_app, snab_json_file, 'w', timeseries_json)
logger.info('added snab timeseries file with last entry - %s :: %s' % (
str(snab_data_last_datapoint), snab_json_file))
except:
logger.error(traceback.format_exc())
logger.error(
'error :: failed to add snab timeseries file :: %s' %
snab_json_file)
if not os.path.isfile(snab_json_file):
logger.error('error - the snab_json_file was not created - %s' % (
str(snab_json_file)))
else:
logger.info('snab_json_file exists - %s' % snab_json_file)
snab_json_file_created = True
else:
logger.error(
'error :: no timeseries_json to add snab timeseries file :: %s' %
snab_json_file)
else:
logger.info('not adding snab_json_file as snab recheck keys no longer not exist')
if snab_json_file_created:
data = {
'metric': metric,
'anomaly_data': snab_json_file,
'timestamp': int(timeseries[-1][0]),
'original_anomaly_timestamp': original_anomaly_timestamp,
'value': timeseries[-1][1],
'original_added_at': original_added_at,
}
try:
self.redis_conn.sadd(mirage_snab_only_checks_redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to %s Redis set' % (
metric, str(mirage_snab_only_checks_redis_set)))
# Remove metric check file
try:
os.remove(metric_check_file)
except OSError:
pass
# Remove the metric directory
try:
rmtree(metric_data_dir)
logger.info('removed data dir for snab_check_only - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree for snab_check_only - %s' % metric_data_dir)
return
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
run_negatives_present = False
if settings.IONOSPHERE_ENABLED and valid_mirage_timeseries:
run_negatives_present = True
known_negative_metric, known_negative_metric_matched_by = matched_or_regexed_in_list(skyline_app, metric, KNOWN_NEGATIVE_METRICS)
if known_negative_metric:
run_negatives_present = False
logger.info('will not check %s for negative values' % (metric))
else:
logger.info('will check %s for negative values' % (metric))
del known_negative_metric_matched_by
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Add timings
snab_check_namespace = False
if SNAB_ENABLED and SNAB_CHECKS:
for app in SNAB_CHECKS:
if app == skyline_app:
for snab_context in SNAB_CHECKS[app]:
if snab_check_namespace:
break
for algorithm in SNAB_CHECKS[app][snab_context]:
if snab_check_namespace:
break
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
if namespace in redis_metric_name:
snab_check_namespace = True
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check if %s is a snab_check_metric' % redis_metric_name)
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
test_alerts = {}
test_alert_and_trigger = False
try:
test_alerts = get_test_alerts(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: get_test_alerts failed - %s' % err)
if test_alerts:
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_metric = test_alerts[test_alert_timestamp]['metric']
if test_metric != metric:
continue
try:
trigger_anomaly = test_alerts[test_alert_timestamp]['trigger_anomaly']
except KeyError:
trigger_anomaly = False
if not trigger_anomaly:
continue
logger.info('test_alert found for %s with trigger_anomaly: %s' % (
str(test_metric), str(trigger_anomaly)))
test_alert_and_trigger = True
break
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed determine test_alert details from test_alerts: %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
if test_alert_and_trigger:
alert_tested_key = 'mirage.test_alerts.done.%s' % metric
try:
self.redis_conn_decoded.setex(alert_tested_key, 300, int(time()))
logger.info('test_alert created Redis key %s' % alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to create Redis key %s - %s' % (
alert_tested_key, err))
# @added 20200607 - Feature #3566: custom_algorithms
algorithms_run = list(settings.MIRAGE_ALGORITHMS)
# @added 20200904 - Feature #3734: waterfall alerts
anomalous = None
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Add timings
analysis_start_time = time()
try:
if valid_mirage_timeseries:
logger.info('analyzing :: %s at %s seconds' % (metric, second_order_resolution_seconds))
# @modified 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Added run_negatives_present and negatives_found
# anomalous, ensemble, datapoint = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds)
# @modified 20200607 - Feature #3566: custom_algorithms
# Added algorithms_run
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms)
else:
logger.info('not analyzing :: %s at %s seconds as there is not sufficiently older datapoints in the timeseries - not valid_mirage_timeseries' % (metric, second_order_resolution_seconds))
anomalous = False
if timeseries:
datapoint = timeseries[-1][1]
else:
datapoint = 0
# @added 20220315 - Feature #4482: Test alerts
# Allow to test on sparse metrics
if test_alert_and_trigger:
ensemble = [True]
triggered_algorithms = ['histogram_bins']
algorithms_run = ['histogram_bins']
negatives_found = False
# It could have been deleted by the Roomba
except TypeError:
# @added 20200430 - Feature #3480: batch_processing
# Added logging here as the DeletedByRoomba exception is
# generally not related to that but related to some other fail
# in the processing of the run algorithms phase.
# It could have been deleted by the Roomba, but Mirage does not use
# Redis data so probably, definitely was not :)
logger.error(traceback.format_exc())
logger.error('error :: added as DeletedByRoomba but possibly not see traceback above')
exceptions['DeletedByRoomba'] += 1
logger.info('exceptions :: DeletedByRoomba')
except TooShort:
exceptions['TooShort'] += 1
logger.info('exceptions :: TooShort')
except Stale:
exceptions['Stale'] += 1
logger.info('exceptions :: Stale')
except Boring:
exceptions['Boring'] += 1
logger.info('exceptions :: Boring')
except Exception as err:
exceptions['Other'] += 1
logger.info('exceptions :: Other')
logger.error(traceback.format_exc())
logger.error('error :: unhandled error - %s' % err)
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# To overcome the Graphite unfilled last bucket problem, the FULL_DURATION
# data is downsampled using the Pandas origin='end' backwards resample
# method and then aligning to the resolution and this data replaces the
# FULL_DURATION period in the Graphite timeseries data for analysis only.
# This solves the problem of false positive alerts being generated by
# unfilled buckets where the data in the bucket would 'normalise' over
# the duration of the bucket. However often times occur where the
# bucket data only has one or two very high or very low values and then
# the average for that unfilled bucket is skewed, causing an outlier.
# When the the rest of the bucket is filled the outlier is flattened.
# This preprocessing method resolves that issue and removes any
# requirement for extending the Graphite first retention period to a
# value > 7days, which resolves this issue but creates an issue in the
# Ionosphere context where features extraction takes a long time and is
# not suitable for high production workload. Thanks to Pandas for
# introducing backswards resampling in version 1.3.
downsampled_timeseries = None
if anomalous:
try:
logger.info('checking if Graphite data and FULL_DURATION data are different if so will check against realigned downsampled data - anomalous - %s' % (metric))
downsampled_timeseries = downsample_full_duration_and_merge_graphite(self, metric, timeseries, known_derivative_metric)
except Exception as err:
logger.error('error :: nonNegativeDerivative failed on resampled_aligned_timeseries for %s - %s' % (
metric, err))
if downsampled_timeseries:
try:
logger.info('checking realigned downsampled data for - %s' % (metric))
anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(downsampled_timeseries, metric, second_order_resolution_seconds, run_negatives_present, triggered_algorithms)
# Replace the datapoint variable from the preprocessed
# downsampled_timeseries with the actual data point from the
# Graphite data
datapoint = timeseries[-1][1]
except Exception as err:
exceptions['Other'] += 1
logger.info('exceptions :: Other')
logger.error(traceback.format_exc())
logger.error('error :: unhandled error - %s' % err)
# @added 20220506 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# If the downsampled data is not anomalous, remove the entry from
# the mirage.trigger_history
if not anomalous:
logger.info('realigned downsampled data not anomalous removing entry from mirage.trigger_history for - %s' % (metric))
trigger_history = {}
try:
raw_trigger_history = self.redis_conn_decoded.hget('mirage.trigger_history', metric)
if raw_trigger_history:
trigger_history = literal_eval(raw_trigger_history)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_algorithms :: failed to evaluate data from mirage.trigger_history Redis hash key - %s' % (
str(err)))
new_trigger_history = {}
last_timestamp = int(timeseries[-1][0])
history_removed = False
for history_timestamp in list(trigger_history.keys()):
if history_timestamp == int(timeseries[-1][0]):
history_removed = True
logger.info('removing entry for %s from mirage.trigger_history - %s' % (
metric, str(trigger_history[history_timestamp])))
continue
new_trigger_history[history_timestamp] = trigger_history[history_timestamp]
if history_removed:
try:
self.redis_conn_decoded.hset('mirage.trigger_history', metric, str(new_trigger_history))
logger.info('updated mirage.trigger_history for %s' % metric)
except Exception as err:
logger.error('error :: failed to set key in mirage.trigger_history Redis hash key - %s' % (
str(err)))
# @added 20220420 - Feature #4530: namespace.analysed_events
parent_namespace = metric.split('.', maxsplit=1)[0]
date_string = str(strftime('%Y-%m-%d', gmtime()))
namespace_analysed_events_hash = 'namespace.analysed_events.%s.%s' % (skyline_app, date_string)
try:
self.redis_conn.hincrby(namespace_analysed_events_hash, parent_namespace, 1)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to increment %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
try:
self.redis_conn.expire(namespace_analysed_events_hash, (86400 * 15))
logger.info('updated %s Redis hash' % namespace_analysed_events_hash)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to set expire %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Add timings
analysis_run_time = time() - analysis_start_time
logger.info('algorithms analysis completed in %.2f seconds' % (
analysis_run_time))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
logger.info('test_alert triggering anomaly on %s with %s' % (
metric, str(timeseries[-1])))
anomalous = True
datapoint = timeseries[-1][1]
metric_timestamp = timeseries[-1][0]
# @added 20210309 - Task #3730: Validate Mirage running multiple processes
# Reimplement mirage.checks.done count
if not snab_only_check:
try:
self.redis_conn.incr('mirage.checks.done')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to increment mirage.checks.done Redis key - %s' % str(err))
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
if metric.startswith(settings.FULL_NAMESPACE):
base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric
# @added 20210505 - Bug #4048: Mirage - removing feedback metrics to be processed
feedback_cache_key_exists = False
feedback_cache_key = 'mirage.feedback_metric.checked.%s' % (base_name)
try:
feedback_cache_key_exists = self.redis_conn_decoded.get(feedback_cache_key)
except Exception as e:
logger.error('error :: failed to get %s key from Redis - %s' % (
str(feedback_cache_key), e))
if feedback_cache_key_exists:
feedback_processed_cache_key = 'mirage.feedback_metric.processed.%s' % (base_name)
logger.info('feedback metric processed adding Redis key with 600 TTL - %s' % feedback_processed_cache_key)
try:
self.redis_conn.setex(feedback_processed_cache_key, 600, int(analysis_start_time))
except Exception as e:
logger.error('error :: failed to add %s key to Redis - %s' % (
str(feedback_processed_cache_key), e))
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp. waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
literal_analyzer_waterfall_alerts = []
try:
literal_analyzer_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_analyzer_waterfall_alerts = []
analyzer_waterfall_alerts = []
for literal_waterfall_alert in literal_analyzer_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
analyzer_waterfall_alerts.append(waterfall_alert)
if not anomalous:
not_anomalous_metric = [datapoint, base_name]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.not_anomalous_metrics.append(not_anomalous_metric)
redis_set = 'mirage.not_anomalous_metrics'
data = str(not_anomalous_metric)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
for waterfall_alert in analyzer_waterfall_alerts:
if waterfall_alert[0] == base_name:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# @added 20201128 - Feature #3734: waterfall alerts
# If the check just done is new than an existing analyzer
# waterfall alert metric timestamp remove those keys as well
if int(waterfall_alert[1]) < metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item with older timestamp from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# @added 20210330 - Feature #3994: Panorama - mirage not anomalous
# A hash is added to the mirage.panorama.not_anomalous_metrics for
# every metric that is found to be not anomalous. This provides
# data for /panorama?not_anomalous and /panorama?not_anomalous_metric
# method which are used for plots in the webapp and json response.
# The mirage.panorama.not_anomalous_metrics Redis hash is managed in
# analyzer/metrics_manager
not_anomalous_timestamp = None
try:
not_anomalous_timestamp = int(timeseries[-1][0])
except:
not_anomalous_timestamp = int(metric_timestamp)
redis_hash = 'mirage.panorama.not_anomalous_metrics'
try:
data = {
base_name: {
'timestamp': not_anomalous_timestamp,
'value': datapoint,
'hours_to_resolve': int(hours_to_resolve),
}
}
self.redis_conn.hset(redis_hash, time(), str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis hash %s' % (
str(data), str(redis_hash)))
logger.info('not anomalous :: %s with %s (at full duration), %s (at SECOND_ORDER_RESOLUTION_HOURS)' % (
metric, value, str(datapoint)))
# If it's anomalous, add it to list
if anomalous:
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
if metric.startswith(settings.FULL_NAMESPACE):
base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric
# metric_timestamp = int(timeseries[-1][0])
metric_timestamp = int_metric_timestamp
# Get the anomaly breakdown - who returned True?
triggered_algorithms = []
for index, boolean_value in enumerate(ensemble):
if boolean_value:
# @modified 20200607 - Feature #3566: custom_algorithms
# algorithm = settings.MIRAGE_ALGORITHMS[index]
algorithm = algorithms_run[index]
anomaly_breakdown[algorithm] += 1
triggered_algorithms.append(algorithm)
# @modified 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added second_order_resolution_seconds, triggered_algorithms and algorithms_run
# anomalous_metric = [datapoint, base_name, metric_timestamp]
anomalous_metric = [datapoint, base_name, metric_timestamp, second_order_resolution_seconds, triggered_algorithms, algorithms_run]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.anomalous_metrics.append(anomalous_metric)
redis_set = 'mirage.anomalous_metrics'
data = str(anomalous_metric)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to mirage.anomalous_metrics Redis set' % (
str(data)))
# @added 20220504 - Feature #2580: illuminance
illuminance_dict = {}
illuminance_dict[base_name] = {
'timestamp': int(metric_timestamp),
'value': float(datapoint),
'triggered_algorithms_count': len(triggered_algorithms)}
logger.info('calling add_illuminance_entries with %s entries to add' % (
str(len(illuminance_dict))))
current_illuminance_dict = {}
try:
current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict)
except Exception as err:
logger.error('error :: add_illuminance_entries failed - %s' % (
err))
logger.info('illuminance Redis hash now has %s entries' % (
str(len(current_illuminance_dict))))
# @modified 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Added analysis_run_time
if snab_check_namespace:
redis_key = 'mirage.analysis_run_time.%s.%s' % (base_name, str(metric_timestamp))
try:
self.redis_conn.setex(redis_key, 120, str(analysis_run_time))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add snab analysis_run_time Redis key - %s' % (
redis_key))
logger.info('anomaly detected :: %s with %s (at SECOND_ORDER_RESOLUTION_HOURS), %s (at FULL_DURATION)' % (
metric, str(datapoint), str(value)))
# It runs so fast, this allows us to process 30 anomalies/min
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Removed limit
# sleep(2)
# @added 20170206 - Bug #1904: Handle non filesystem friendly metric names in check files
sane_metricname = filesafe_metricname(str(base_name))
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
if run_negatives_present and negatives_found:
redis_set = 'ionosphere.untrainable_metrics'
try:
last_negative_timestamp = int(negatives_found[-1][0])
last_negative_value = negatives_found[-1][1]
remove_after_timestamp = int(last_negative_timestamp + second_order_resolution_seconds)
data = str([base_name, metric_timestamp, datapoint, last_negative_timestamp, last_negative_value, second_order_resolution_seconds, remove_after_timestamp])
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# If Crucible or Panorama are enabled determine details
determine_anomaly_details = False
if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED:
determine_anomaly_details = True
if settings.PANORAMA_ENABLED:
determine_anomaly_details = True
# If Ionosphere is enabled determine details
try:
ionosphere_enabled = settings.IONOSPHERE_ENABLED
if settings.IONOSPHERE_ENABLED:
determine_anomaly_details = True
except:
ionosphere_enabled = False
if determine_anomaly_details:
# metric_timestamp = str(int(timeseries[-1][0]))
from_timestamp = str(int(timeseries[1][0]))
timeseries_dir = base_name.replace('.', '/')
cache_key = 'mirage.last_alert.smtp.%s' % (base_name)
last_alert = False
try:
# @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Changed the last_alert cache key to hold the last
# anomaly timestamp
# last_alert = self.redis_conn.get(cache_key)
last_alert = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % str(e))
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Evaluate the reported anomaly timestamp to determine whether
# EXPIRATION_TIME should be applied to a batch metric
analyzer_batch_anomaly = None
if last_alert:
# Is this a analyzer_batch related anomaly
analyzer_batch_metric_anomaly_key = 'analyzer_batch.anomaly.%s.%s' % (
str(int_metric_timestamp), base_name)
try:
analyzer_batch_anomaly = self.redis_conn_decoded.get(analyzer_batch_metric_anomaly_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s - %s' % (
analyzer_batch_metric_anomaly_key, e))
analyzer_batch_anomaly = None
if analyzer_batch_anomaly:
logger.info('identified as an analyzer_batch triggered anomaly from the presence of the Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
logger.info('not identified as an analyzer_batch triggered anomaly as no Redis key found - %s' % analyzer_batch_metric_anomaly_key)
if last_alert and analyzer_batch_anomaly:
# @modified 20201107 - Feature #3830: metrics_manager
# Optimise to use metrics_manager HGETALL rather than
# iterating the list of lists
# mirage_metrics_expiration_times = []
# try:
# mirage_metrics_expiration_times = list(self.redis_conn_decoded.smembers('mirage.metrics_expiration_times'))
# if LOCAL_DEBUG:
# logger.info('debug :: fetched the mirage.metrics_expiration_times Redis set')
# except:
# logger.info('failed to fetch the mirage.metrics_expiration_times Redis set')
# mirage_metrics_expiration_times = []
# metric_expiration_time = 3600
# try:
# for item_list_string in mirage_metrics_expiration_times:
# mirage_alert_expiration_data = literal_eval(item_list_string)
# if mirage_alert_expiration_data[0] == base_name:
# metric_expiration_time = int(mirage_alert_expiration_data[1])
# break
# except:
# if LOCAL_DEBUG:
# logger.error('error :: failed to determine mirage_alert_expiration_data for %s from the mirage.metrics_expiration_times Redis set' % str(base_name))
# metric_expiration_time = 3600
mirage_metrics_expiration_times = {}
try:
mirage_metrics_expiration_times = self.redis_conn_decoded.hgetall('mirage.hash_key.metrics_expiration_times')
logger.info('%s entries in mirage.hash_key.metrics_expiration_times Redis hash key' % str(len(mirage_metrics_expiration_times)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis hash key mirage.hash_key.metrics_expiration_times')
mirage_metrics_expiration_times = {}
try:
logger.info('%s entries in mirage.hash_key.metrics_expiration_times Redis hash key' % str(len(mirage_metrics_expiration_times)))
metric_expiration_time = int(mirage_metrics_expiration_times[base_name])
logger.info('%s has expiration time of %s' % (base_name, str(metric_expiration_time)))
except:
if LOCAL_DEBUG:
logger.error('error :: failed to determine mirage_alert_expiration_data for %s from the mirage.hash_key.metrics_expiration_times Redis hash key' % str(base_name))
metric_expiration_time = 3600
last_timestamp = None
try:
last_timestamp = int(last_alert)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine last_timestamp from the last Mirage alert key - %s' % cache_key)
last_timestamp = None
seconds_between_batch_anomalies = None
if last_timestamp:
try:
seconds_between_batch_anomalies = int(int_metric_timestamp) - int(last_timestamp)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine seconds_between_batch_anomalies for batch metric Panorama key- %s' % cache_key)
last_timestamp = None
if seconds_between_batch_anomalies:
if seconds_between_batch_anomalies >= int(metric_expiration_time):
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is greater than the metric EXPIRATION_TIME of %s' % (
str(last_timestamp), str(int_metric_timestamp), base_name,
str(metric_expiration_time)))
logger.info('alerting on anomaly for batch metric %s, so setting last_alert to None' % (
metric))
last_alert = None
else:
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is less than the metric EXPIRATION_TIME of %s, not alerting' % (
str(last_timestamp), str(int_metric_timestamp), base_name,
str(metric_expiration_time)))
if int(int_metric_timestamp) < last_timestamp:
logger.info('batch anomaly timestamp (%s) less than the last_check timestamp (%s), alerting on anomaly for batch metric %s, so setting last_alert to None' % (
str(int_metric_timestamp), str(last_timestamp), base_name))
last_alert = None
# @added 20170308 - Feature #1960: ionosphere_layers
# Allow Ionosphere to send Panorama checks, it is an ionosphere_metric
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics'))
ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics'))
except:
ionosphere_unique_metrics = []
added_at = str(int(time()))
# If Panorama is enabled - create a Panorama check
# @modified 20170308 - Feature #1960: ionosphere_layers
# Allow Ionosphere to send Panorama checks for ionosphere_metrics
# if settings.PANORAMA_ENABLED:
send_to_panorama = False
redis_metric_name = '%s%s' % (str(settings.FULL_NAMESPACE), str(base_name))
if settings.PANORAMA_ENABLED:
send_to_panorama = True
if redis_metric_name in ionosphere_unique_metrics:
send_to_panorama = False
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
logger.info('test_alert sending triggered anomaly on %s to Panorama' % (
metric))
send_to_panorama = True
# Panorama must have at least one triggered algorithm
original_triggered_algorithms = list(triggered_algorithms)
if len(triggered_algorithms) == 0:
triggered_algorithms = [algorithms_run[0]]
if send_to_panorama:
if not os.path.exists(settings.PANORAMA_CHECK_PATH):
mkdir_p(settings.PANORAMA_CHECK_PATH)
if analyzer_batch_anomaly:
from_timestamp = int_metric_timestamp - int(second_order_resolution_seconds)
from_timestamp = str(from_timestamp)
# Note:
# The values are enclosed is single quoted intentionally
# as the imp.load_source used results in a shift in the
# decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
source = 'graphite'
panaroma_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'app = \'%s\'\n' \
'source = \'%s\'\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(datapoint), from_timestamp,
# @modified 20200607 - Feature #3566: custom_algorithms
# str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS),
str(int_metric_timestamp), str(algorithms_run),
triggered_algorithms, skyline_app, source,
this_host, added_at)
# Create an anomaly file with details about the anomaly
panaroma_anomaly_file = '%s/%s.%s.txt' % (
settings.PANORAMA_CHECK_PATH, added_at, sane_metricname)
try:
write_data_to_file(
skyline_app, panaroma_anomaly_file, 'w',
panaroma_anomaly_data)
logger.info('added panorama anomaly file :: %s' % (panaroma_anomaly_file))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Move to Redis set block below
# self.sent_to_panorama.append(base_name)
except:
logger.error('error :: failed to add panorama anomaly file :: %s' % (panaroma_anomaly_file))
logger.error(traceback.format_exc())
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved from the above self.sent_to_panorama
redis_set = 'mirage.sent_to_panorama'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20210323 - Feature #3642: Anomaly type classification
if LUMINOSITY_CLASSIFY_ANOMALIES:
redis_set = 'luminosity.classify_anomalies'
data_dict = {
'metric': metric,
'timestamp': int_metric_timestamp,
'value': datapoint,
'algorithms': algorithms_run,
'triggered_algorithms': triggered_algorithms,
'app': skyline_app,
'added_at': int(added_at),
}
data = [metric, int_metric_timestamp, int(added_at), data_dict]
try:
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
triggered_algorithms = list(original_triggered_algorithms)
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
for waterfall_alert in analyzer_waterfall_alerts:
if waterfall_alert[0] == base_name:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# If crucible is enabled - save timeseries and create a
# crucible check
if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED:
from_timestamp = str(int(timeseries[1][0]))
timeseries_dir = base_name.replace('.', '/')
crucible_anomaly_dir = str(settings.CRUCIBLE_DATA_FOLDER) + '/' + timeseries_dir + '/' + metric_timestamp
if not os.path.exists(crucible_anomaly_dir):
mkdir_p(crucible_anomaly_dir)
# Note:
# The value is enclosed is single quoted intentionally
# as the imp.load_source used in crucible results in a
# shift in the decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
crucible_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'anomaly_dir = \'%s\'\n' \
'graphite_metric = True\n' \
'run_crucible_tests = False\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(datapoint), from_timestamp,
# @modified 20200607 - Feature #3566: custom_algorithms
# str(int_metric_timestamp), str(settings.MIRAGE_ALGORITHMS),
str(int_metric_timestamp), str(algorithms_run),
triggered_algorithms, crucible_anomaly_dir,
skyline_app, added_at)
# Create an anomaly file with details about the anomaly
crucible_anomaly_file = '%s/%s.txt' % (crucible_anomaly_dir, sane_metricname)
try:
write_data_to_file(
skyline_app, crucible_anomaly_file, 'w',
crucible_anomaly_data)
logger.info('added crucible anomaly file :: %s' % (crucible_anomaly_file))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.sent_to_crucible.append(base_name)
except:
logger.error('error :: failed to add crucible anomaly file :: %s' % (crucible_anomaly_file))
logger.error(traceback.format_exc())
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved from the above self.sent_to_crucible
redis_set = 'mirage.sent_to_crucible'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# Create timeseries json file with the timeseries
json_file = '%s/%s.json' % (crucible_anomaly_dir, base_name)
timeseries_json = str(timeseries).replace('[', '(').replace(']', ')')
try:
write_data_to_file(skyline_app, json_file, 'w', timeseries_json)
logger.info('added crucible timeseries file :: %s' % (json_file))
except:
logger.error('error :: failed to add crucible timeseries file :: %s' % (json_file))
logger.error(traceback.format_exc())
# Create a crucible check file
crucible_check_file = '%s/%s.%s.txt' % (settings.CRUCIBLE_CHECK_PATH, metric_timestamp, sane_metricname)
try:
write_data_to_file(
skyline_app, crucible_check_file, 'w',
crucible_anomaly_data)
logger.info('added crucible check :: %s,%s' % (base_name, metric_timestamp))
except:
logger.error('error :: failed to add crucible check file :: %s' % (crucible_check_file))
logger.error(traceback.format_exc())
# @added 20160922 - Branch #922: Ionosphere
# Also added the send_anomalous_metric_to skyline_functions.py
# function
if ionosphere_enabled:
if not last_alert:
# @modified 20161228 Feature #1830: Ionosphere alerts
# Added full_duration which needs to be recorded to allow Mirage metrics
# to be profiled on Redis timeseries data at FULL_DURATION
# e.g. mirage.redis.24h.json
full_duration = str(second_order_resolution_seconds)
# @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Added ionosphere_parent_id, always zero from Analyzer and Mirage
ionosphere_parent_id = 0
send_anomalous_metric_to(
skyline_app, 'ionosphere', timeseries_dir,
str(int_metric_timestamp), base_name, str(datapoint),
from_timestamp, triggered_algorithms, timeseries,
full_duration, str(ionosphere_parent_id),
# @added 20201001 - Task #3748: POC SNAB
# Added algorithms_run required to determine the anomalyScore
# so this needs to be sent to Ionosphere so Ionosphere
# can send it back on an alert
algorithms_run)
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved to the mirage.sent_to_ionosphere Redis set Redis set
# block below
# self.sent_to_ionosphere.append(base_name)
# @added 20200804 - Feature #3462: Add IONOSPHERE_MANAGE_PURGE
# Feature #3472: ionosphere.training_data Redis set
# Feature #3474: webapp api - training_data
# Add training data to the ionosphere.training_data so that
# the ionosphere purge_old_data_dirs can happen less
# frequently for reduced I/O
redis_set = 'ionosphere.training_data'
data = [base_name, int(int_metric_timestamp), second_order_resolution_seconds]
try:
logger.info('adding to Redis set %s - %s' % (
redis_set, str(data)))
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to %s Redis set' % (str(data), redis_set))
else:
logger.info('alert expiry key exists not sending to Ionosphere :: %s' % base_name)
# @added 20200904 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
# Do not remove if this is only for training_data creation
if redis_metric_name in ionosphere_unique_metrics:
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
mirage_waterfall_data = []
for waterfall_alert in analyzer_waterfall_alerts:
if waterfall_alert[0] == base_name:
if int(waterfall_alert[1]) == metric_timestamp:
mirage_waterfall_data = waterfall_alert
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved from the above self.sent_to_ionosphere
if not last_alert:
redis_set = 'mirage.sent_to_ionosphere'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if test_alert_and_trigger:
logger.info('test_alert not sending anomaly on %s to ionosphere' % (
metric))
ionosphere_unique_metrics = []
# @added 20200904 - Feature #3734: waterfall alerts
# Add mirage waterfall alert
# Only add if this is an ionosphere_enabled metric_check_file
if redis_metric_name in ionosphere_unique_metrics:
if mirage_waterfall_data:
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
waterfall_data = mirage_waterfall_data
# @added 20201008 - Bug #3776: waterfall_alert - no analyzer triggered_algorithms in waterfall_panorama_data on MIRAGE_ALWAYS_METRICS
# When a MIRAGE_ALWAYS_METRICS on metric is sent
# through to Mirage from Analyzer the sometimes has
# no algorithms that triggered_algorithms as the
# metric is sent every run, this can be expected.
# However if the Mirage three-sigma check does
# trigger algorithms they need to be added here so
# that when metric and event are sent to Panorama
# the triggered_algorithms is populated
if base_name in MIRAGE_ALWAYS_METRICS:
from_timestamp = str(int(timeseries[1][0]))
waterfall_panorama_data = [
base_name, datapoint, int(timeseries[1][0]),
int(timeseries[-1][0]), algorithms_run,
triggered_algorithms, skyline_app,
skyline_app, this_host,
waterfall_alert[4][9]
]
# Use the original added_to_waterfall_timestamp
added_to_waterfall_timestamp = waterfall_data[3]
# @modified 20201009 - Bug #3776: waterfall_alert - no analyzer triggered_algorithms in waterfall_panorama_data on MIRAGE_ALWAYS_METRICS
# corrected datapoint, timestamp order
waterfall_data = [
base_name, int(timeseries[-1][0]), datapoint,
added_to_waterfall_timestamp, waterfall_panorama_data
]
redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere'
try:
self.redis_conn.sadd(redis_set, str(waterfall_data))
logger.info('added to Redis set %s - %s' % (redis_set, str(waterfall_data)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(waterfall_data), str(redis_set)))
# Add values to the queue so the parent process can collate
for key, ab_value in anomaly_breakdown.items():
self.mirage_anomaly_breakdown_q.put((key, ab_value))
for key, e_value in exceptions.items():
self.mirage_exceptions_q.put((key, e_value))
metric_var_files = []
timeseries = []
if os.path.isfile(metric_check_file):
# Remove metric check file
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
logger.error('error :: failed to remove check file - %s' % metric_check_file)
# Remove the metric directory
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed data dir - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree %s' % metric_data_dir)
# @added 20200723 - Feature #3472: ionosphere.training_data Redis set
# Feature #3566: custom_algorithms
# Optimize for MIRAGE_ALWAYS_METRICS which can create a lot
# of training_data dirs a Analyzer always hands them off to
# mirage.
remove_ionosphere_data_dir = False
if not anomalous:
if base_name in MIRAGE_ALWAYS_METRICS:
remove_ionosphere_data_dir = True
if not anomalous and periodic_mirage_check:
remove_ionosphere_data_dir = True
# @added 20190408 - Feature #2882: Mirage - periodic_check
# Remove the training_dir for mirage_periodic_check_metrics if not
# anomalous
# @modified 20200723 - Feature #3472: ionosphere.training_data Redis set
# Feature #3566: custom_algorithms
# if not anomalous and periodic_mirage_check:
if remove_ionosphere_data_dir:
timeseries_dir = base_name.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(metric_timestamp),
str(timeseries_dir))
if os.path.exists(training_dir):
try:
rmtree(training_dir)
logger.info('removed Mirage always or periodic check training_data dir - %s' % training_dir)
except:
logger.error('error :: failed to rmtree Mirage always or periodic check training_dir - %s' % training_dir)
if not anomalous and periodic_mirage_check:
del mirage_periodic_check_metrics
[docs] def run(self):
"""
Called when the process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
else:
logger.info('bin/%s.d log management done' % skyline_app)
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
def smtp_trigger_alert(alert, metric, second_order_resolution_seconds, context, triggered_algorithms):
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
try:
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
p = Process(target=self.spawn_alerter_process, args=(alert, metric, second_order_resolution_seconds, context, triggered_algorithms))
pids.append(p)
pid_count += 1
p.start()
spawned_pids.append(p.pid)
except:
logger.error('error :: failed to spawn_alerter_process')
logger.error(traceback.format_exc())
p_starts = time()
while time() - p_starts <= 15:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing the spawn_trigger_alert process' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive())))
p.join()
"""
DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
"""
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
gc.enable()
gc.set_debug(gc.DEBUG_LEAK)
# As per http://stackoverflow.com/a/1641280
# This got useable understandable data with gc
before = defaultdict(int)
after = defaultdict(int)
for i in get_objects():
before[type(i)] += 1
if LOCAL_DEBUG:
logger.info('debug :: Memory usage in run at start: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
last_sent_to_graphite = int(time())
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
filesafe_names_dict = {}
while 1:
now = time()
"""
DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
"""
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
# As per http://stackoverflow.com/a/1641280
# This got useable understandable data with gc
before = defaultdict(int)
after = defaultdict(int)
for i in get_objects():
before[type(i)] += 1
if LOCAL_DEBUG:
logger.info('debug :: Memory usage before looking for checks: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# Make sure Redis is up
try:
self.redis_conn.ping()
except:
logger.error('error :: skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
logger.info('attempting to connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
# @modified 20191113 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as err:
logger.error('error :: failed to connect to Redis - %s' % err)
# @modified 20220429 - Feature #4536: Handle Redis failure
# if self.redis_conn.ping():
# logger.info('connected to redis')
# continue
try:
self.redis_conn.ping()
logger.info('connected to redis')
except Exception as err:
logger.error('error :: failed to ping Redis - %s' % err)
# Determine if any metric to analyze or Ionosphere alerts to be sent
while True:
# Report app up
# @modified 20210524 - Branch #1444: thunder
# Report app AND Redis as up
# self.redis_conn.setex(skyline_app, 120, now)
try:
redis_is_up = self.redis_conn.setex(skyline_app, 120, now)
if redis_is_up:
try:
self.redis_conn.setex('redis', 120, now)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: could not update the Redis redis key - %s' % (
e))
except Exception as e:
logger.error('error :: failed to update Redis key for %s up - %s' % (skyline_app, e))
# @added 20200604 - Mirage - populate_redis
# This functionality enables Mirage to populate the Skyline
# Redis instance with FULL_DURATION data from Graphite if
# Analyzer flags the time series as TooShort and adds it too the
# mirage.populate_redis Redis set. Or possibly if there are
# airgaps in the Redis data due to a network partition. It will
# fill a metric about every 10 seconds or so, unless there are
# Mirage checks or ionosphere_alerts to send
populate_redis_with_metrics = []
if MIRAGE_AUTOFILL_TOOSHORT:
try:
populate_redis_with_metrics = list(self.redis_conn_decoded.smembers('mirage.populate_redis'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis set mirage.populate_redis')
populate_redis_with_metrics = []
metric_to_populate_redis = None
populate_redis_with_metrics_count = 0
if populate_redis_with_metrics:
populate_redis_with_metrics_count = len(populate_redis_with_metrics)
logger.info('%s metrics found in mirage.populate_redis Redis set' % str(populate_redis_with_metrics_count))
try:
metric_to_populate_redis = str(populate_redis_with_metrics[0])
try:
del populate_redis_with_metrics
except:
pass
logger.info('processing %s from mirage.populate_redis Redis set' % metric_to_populate_redis)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine metric to populate_redis')
metric_to_populate_redis = None
if metric_to_populate_redis:
try:
# Spawn a populate_redis processes
pids = []
spawned_pids = []
p = Process(target=self.populate_redis, args=(1, metric_to_populate_redis))
pids.append(p)
logger.info('starting populate_redis process')
p.start()
spawned_pids.append(p.pid)
p_starts = time()
while time() - p_starts <= 10:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('populate_redis process completed in %.2f seconds' % (
time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('timed out, killing populate_redis process')
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('stopping populate_redis process - %s' % (str(p.is_alive())))
p.join()
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to spawn populate_redis process')
# @added 20161228 - Feature #1828: ionosphere - mirage Redis data features
# If Ionosphere is going to pass alerts back to the app
# here we are going to have break out and force a alerting
# only run.
ionosphere_alerts = None
ionosphere_alerts_returned = False
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
test_alerts = {}
test_alert_metrics = []
try:
test_alerts = get_test_alerts(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: get_test_alerts failed - %s' % err)
if test_alerts:
for test_alert_timestamp in list(test_alerts.keys()):
try:
metric = test_alerts[test_alert_timestamp]['metric']
try:
trigger_anomaly = test_alerts[test_alert_timestamp]['trigger_anomaly']
except KeyError:
trigger_anomaly = False
if trigger_anomaly:
test_alert_metrics.append(metric)
logger.info('test_alert found for %s: %s' % (
str(metric), str(test_alerts[test_alert_timestamp])))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed determine test_alert details from test_alerts: %s - %s' % (
str(test_alerts), err))
continue
try:
use_timestamp = int(test_alert_timestamp)
sane_metricname = filesafe_metricname(str(metric))
anomaly_check_file = '%s/%s.%s.txt' % (settings.MIRAGE_CHECK_PATH, str(use_timestamp), sane_metricname)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine name of test_alert anomaly_check_file for: %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
anomaly_check_file = None
mirage_anomaly_check_file_created = False
if anomaly_check_file:
try:
use_hours_to_resolve = 168
snab_check_only = False
triggered_algorithms = []
with open(anomaly_check_file, 'w') as fh:
fh.write('metric = "%s"\nvalue = "%s"\nhours_to_resolve = "%s"\nmetric_timestamp = "%s"\nsnab_only_check = "%s"\ntriggered_algorithms = %s\n' % (
metric, 1, str(use_hours_to_resolve),
str(use_timestamp), str(snab_check_only),
str(triggered_algorithms)))
mirage_anomaly_check_file_created = True
logger.info('added test_alert anomaly_check_file: %s' % (
str(anomaly_check_file)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to write test_alert anomaly_check_file for: %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
if mirage_anomaly_check_file_created:
if python_version == 3:
os.chmod(anomaly_check_file, mode=0o644)
metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))]
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Do not pospone the Ionosphere alerts check on based on whether
# there are checks on not
# if len(metric_var_files) == 0:
if not ionosphere_alerts_returned:
# @modified 20161228 - Feature #1830: Ionosphere alerts
try:
# @modified 20200430 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_alerts = list(self.redis_conn.scan_iter(match='ionosphere.mirage.alert.*'))
ionosphere_alerts = list(self.redis_conn_decoded.scan_iter(match='ionosphere.mirage.alert.*'))
ionosphere_alerts_returned = True
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to scan ionosphere.mirage.alert.* from Redis')
ionosphere_alerts = []
if len(ionosphere_alerts) == 0:
ionosphere_alerts_returned = False
else:
logger.info('Ionosphere alert requested :: %s' % str(ionosphere_alerts))
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Do not pospone the Ionosphere alerts check
# if not ionosphere_alerts_returned:
# logger.info('sleeping no metrics...')
# sleep(10)
# @added 20191106 - Branch #3262: py3
if os.path.isfile(alert_test_file):
test_alert = None
try:
with open((alert_test_file), 'r') as fh:
raw_test_alert = fh.read()
test_alert = literal_eval(raw_test_alert)
# [metric, alerter]
# e.g. ['server-1.cpu.user', 'smtp']
# e.g. ['server-1.cpu.user', 'slack']
# e.g. ['skyline_test.alerters.test', 'smtp']
except:
logger.error(traceback.format_exc())
logger.error('error :: could not evaluate test_alert from %s' % alert_test_file)
if test_alert:
try:
logger.info('test alert metric found - alerting on %s' % str((test_alert)))
metric_name = str(test_alert[0])
test_alerter = str(test_alert[1])
alerter_id = None
try:
alerter_id = str(test_alert[2])
except:
pass
metric = (1, metric_name, int(time()))
alert = (metric_name, test_alerter, 10)
if alerter_id:
alert = (metric_name, test_alerter, 10, None, {'type': 'external', 'id': alerter_id})
# @added 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
triggered_algorithms = ['testing']
if settings.SLACK_ENABLED and test_alerter == 'slack':
logger.info('test alert to slack for %s' % (metric_name))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
# @added 20220301 - Feature #4482: Test alerts
# Allow all alert types to be tested
if test_alerter.startswith('http_alerter'):
trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
if test_alerter in ['sms', 'pagerduty']:
trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
if test_alerter == 'smtp':
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
smtp_trigger_alert(alert, metric, 604800, skyline_app, triggered_algorithms)
except:
logger.error('error :: test trigger_alert - %s' % traceback.format_exc())
logger.error('error :: failed to test trigger_alert :: %s' % metric_name)
try:
os.remove(alert_test_file)
except OSError:
logger.error('error - failed to remove %s, continuing' % alert_test_file)
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Move this len(metric_var_files) from above and apply the
# appropriatte sleep
if len(metric_var_files) == 0:
if not ionosphere_alerts_returned:
# @modified 20200604 - Mirage - populate_redis
# Do not sleep if there are metrics to populate in Redis
if populate_redis_with_metrics_count == 0:
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
sleep_for = 10
next_send_to_graphite = last_sent_to_graphite + 60
seconds_to_next_send_to_graphite = next_send_to_graphite - int(time())
if seconds_to_next_send_to_graphite < 10:
if seconds_to_next_send_to_graphite > 1:
sleep_for = seconds_to_next_send_to_graphite
else:
break
logger.info('sleeping no metrics...')
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# sleep(10)
sleep(sleep_for)
else:
logger.info('no checks or alerts, continuing to process populate_redis metrics')
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Removed sleep, no delay
# else:
# sleep(1)
# @added 20220113 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Do not remove batch_processing checks
batch_processing_metrics = []
try:
batch_processing_metrics = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
except:
logger.error('error :: could not get aet.analyzer.batch_processing_metrics from Redis')
logger.error(traceback.format_exc())
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
if not filesafe_names_dict:
try:
filesafe_names_dict = self.redis_conn_decoded.hgetall('metrics_manager.filesafe_base_names')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: hgetall metrics_manager.filesafe_base_names failed - %s' % err)
# Clean up old files
now_timestamp = time()
stale_age = now_timestamp - settings.MIRAGE_STALE_SECONDS
for current_file in listdir(settings.MIRAGE_CHECK_PATH):
if os.path.isfile(settings.MIRAGE_CHECK_PATH + "/" + current_file):
t = os.stat(settings.MIRAGE_CHECK_PATH + "/" + current_file)
c = t.st_ctime
# @added 20220113 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Do not remove batch_processing checks
for b_metric in batch_processing_metrics:
if b_metric in current_file:
continue
# delete file if older than a week
if c < stale_age:
os.remove(settings.MIRAGE_CHECK_PATH + "/" + current_file)
logger.info('removed stale check - %s' % (current_file))
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
redis_set = 'mirage.stale_check_discarded'
try:
self.redis_conn.sadd(redis_set, str(current_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(current_file), str(redis_set)))
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
filesafe_names_list = []
if filesafe_names_dict:
filesafe_names_list = list(filesafe_names_dict.keys())
# @added 20201026 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle feedback metrics in a similar style to Ionosphere
# Do not run checks if namespace has matched multiple times in
# the last 10 minutes.
if len(metric_var_files) > 3:
analyzer_waterfall_alerts = []
feedback_metric_loop_error_logged = False
for current_file in listdir(settings.MIRAGE_CHECK_PATH):
feedback_metric = False
remove_feedback_metric_check = False
remove_alerted_on_metric_check = False
try:
current_file_no_extension = current_file.replace('.txt', '')
current_file_no_extension_elements = current_file_no_extension.split('.')
base_name = '.'.join(current_file_no_extension_elements[1:])
metric_timestamp = int(current_file_no_extension_elements[0])
except:
pass
# @modified 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
use_base_name = str(base_name)
if base_name in filesafe_names_list:
use_base_name = filesafe_names_dict[base_name]
try:
metric_namespace_elements = base_name.split('.')
for to_skip in SKYLINE_FEEDBACK_NAMESPACES:
if to_skip in base_name:
feedback_metric = True
break
to_skip_namespace_elements = to_skip.split('.')
elements_matched = set(metric_namespace_elements) & set(to_skip_namespace_elements)
if len(elements_matched) == len(to_skip_namespace_elements):
feedback_metric = True
break
feedback_cache_key_exists = False
feedback_cache_key = 'mirage.feedback_metric.checked.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
feedback_cache_key = 'mirage.feedback_metric.checked.%s' % (use_base_name)
feedback_metric_process_time = int(time())
# @added 20210701 - Feature #4152: DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES
if feedback_metric:
pattern_match = False
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES)
del metric_matched_by
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: matched_or_regexed_in_list failed checking %s in DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES - %s' % (
base_name, e))
pattern_match = False
if pattern_match:
feedback_metric = False
logger.info('%s matched DO_NOT_SKIP_SKYLINE_FEEDBACK_NAMESPACES, will analyse' % base_name)
if feedback_metric:
try:
feedback_cache_key_exists = self.redis_conn_decoded.get(feedback_cache_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get %s key from Redis' % (
str(feedback_cache_key)))
if feedback_cache_key_exists:
feedback_metric_last_processed_seconds_ago = feedback_metric_process_time - int(feedback_cache_key_exists)
logger.info('feedback metric identified as last processed %s seconds ago via Redis key %s' % (
str(feedback_metric_last_processed_seconds_ago), feedback_cache_key))
remove_feedback_metric_check = True
# @added 20210505 - Bug #4048: Mirage - removing feedback metrics to be processed
feedback_processed_cache_key = 'mirage.feedback_metric.processed.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
feedback_processed_cache_key = 'mirage.feedback_metric.processed.%s' % (use_base_name)
feedback_processed_cache_key_exists = None
try:
feedback_processed_cache_key_exists = self.redis_conn_decoded.get(feedback_processed_cache_key)
except Exception as e:
logger.error('error :: failed to get %s key from Redis - %s' % (
str(feedback_processed_cache_key), e))
if not feedback_processed_cache_key_exists:
remove_feedback_metric_check = False
logger.info('feedback metric Redis key %s does not exist, not removing metric' % feedback_processed_cache_key)
if len(metric_var_files) > 10 and not feedback_cache_key_exists:
logger.info('Mirage is busy removing feedback metric check')
remove_feedback_metric_check = True
# @modified 20201128 - Feature #3734: waterfall alerts
# Only add if does not exist and always add
# else:
# try:
# self.redis_conn.setex(feedback_cache_key, 600, feedback_metric_process_time)
# if not feedback_cache_key_exists:
if not feedback_cache_key_exists and not remove_feedback_metric_check:
logger.info('feedback metric identified as not processed in last 600 seconds adding Redis key with 600 TTL and processing - %s' % feedback_cache_key)
try:
self.redis_conn.setex(feedback_cache_key, 600, feedback_metric_process_time)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s key to Redis' % (
str(feedback_cache_key)))
except:
if not feedback_metric_loop_error_logged:
logger.error(traceback.format_exc())
logger.error('error :: failed to check feedback and alerted on metrics')
feedback_metric_loop_error_logged = True
# Remove checks that have been alerted on by Mirage or
# via an Analyzer waterfall alert
if len(metric_var_files) > 10 and not remove_feedback_metric_check:
cache_key = 'mirage.last_alert.smtp.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
cache_key = 'mirage.last_alert.smtp.%s' % (use_base_name)
alerted_on = False
try:
alerted_on = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % str(e))
if not alerted_on:
# Check for Analyzer alert key from waterfall alert
cache_key = 'last_alert.smtp.%s' % (base_name)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
if use_base_name != base_name:
cache_key = 'last_alert.smtp.%s' % (use_base_name)
try:
alerted_on = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % str(e))
if alerted_on:
remove_alerted_on_metric_check = True
# Unless is it older than PANORAMA_EXPIRY_TIME
try:
alerted_on_at = int(alerted_on)
alerted_on_seconds_ago = int(time()) - alerted_on_at
if alerted_on_seconds_ago >= settings.PANORAMA_EXPIRY_TIME:
remove_alerted_on_metric_check = False
except Exception as e:
logger.error('error :: failed determining if alerted more than PANORAMA_EXPIRY_TIME seconds ago - %s' % str(e))
remove_alerted_on_metric_check = True
if remove_feedback_metric_check or remove_alerted_on_metric_check:
if remove_feedback_metric_check:
log_str = 'feedback metric'
if remove_alerted_on_metric_check:
log_str = 'alerted on metric'
logger.info('removing %s %s check file and from analyzer.waterfall_alerts.sent_to_mirage Redis set' % (
log_str, base_name))
try:
os.remove(settings.MIRAGE_CHECK_PATH + "/" + current_file)
except:
logger.error('error :: failed to remove %s %s check file - %s' % (
log_str, base_name, current_file))
# Remove the metric from the waterfall_alerts Redis set
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp. waterfall_panorama_data]
if not analyzer_waterfall_alerts:
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
literal_analyzer_waterfall_alerts = []
try:
literal_analyzer_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_analyzer_waterfall_alerts = []
analyzer_waterfall_alerts = []
for literal_waterfall_alert in literal_analyzer_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
analyzer_waterfall_alerts.append(waterfall_alert)
for waterfall_alert in analyzer_waterfall_alerts:
# @modified 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Handle filesafe names
# if waterfall_alert[0] == base_name:
if waterfall_alert[0] == use_base_name:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item for %s from Redis set %s - %s' % (
log_str, redis_set, str(waterfall_alert)))
# @modified 20201128 - Feature #3734: waterfall alerts
# Do not break, check and remove
# waterfall_alert items with older
# timestamps as well
# break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s %s at %s from Redis set %s' % (
log_str, base_name, str(metric_timestamp), redis_set))
# @added 20201128 - Feature #3734: waterfall alerts
# If the check just done is new than an existing analyzer
# waterfall alert metric timestamp remove those keys as well
if int(waterfall_alert[1]) < metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item with older timestamp for %s from Redis set %s - %s' % (
log_str, redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s %s at %s from Redis set %s' % (
log_str, base_name, str(metric_timestamp), redis_set))
# Discover metric to analyze
metric_var_files = ''
# @added 20161228 - Feature #1830: Ionosphere alerts
# Prioritises Ionosphere alerts
if ionosphere_alerts_returned:
break
metric_var_files = [f for f in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f))]
if len(metric_var_files) > 0:
break
process_metric_check_files = False
# @modified 20161228 - Feature #1830: Ionosphere alerts
# Only spawn process if this is not an Ionosphere alert
if not ionosphere_alerts_returned:
metric_var_files_sorted = sorted(metric_var_files)
# metric_check_file = settings.MIRAGE_CHECK_PATH + "/" + metric_var_files_sorted[0]
if metric_var_files_sorted:
process_metric_check_files = True
if process_metric_check_files:
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
check_files_to_process = len(metric_var_files_sorted)
logger.info('%s checks to process' % str(check_files_to_process))
# Remove any existing algorithm.error files from any previous runs
# that did not cleanup for any reason
pattern = '%s.*.algorithm.error' % skyline_app
try:
for f in os.listdir(settings.SKYLINE_TMP_DIR):
if re.search(pattern, f):
try:
os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f))
logger.info('cleaning up old error file - %s' % (str(f)))
except OSError:
pass
except:
logger.error('failed to cleanup mirage_algorithm.error files - %s' % (traceback.format_exc()))
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# MIRAGE_PROCESSES = 1
if len(metric_var_files) > 1:
try:
MIRAGE_PROCESSES = int(settings.MIRAGE_PROCESSES)
if len(metric_var_files) < MIRAGE_PROCESSES:
MIRAGE_PROCESSES = len(metric_var_files)
except:
MIRAGE_PROCESSES = 1
else:
MIRAGE_PROCESSES = 1
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# processing_check_file = metric_var_files_sorted[0]
# logger.info('processing %s' % processing_check_file)
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes
# for i in range(1, MIRAGE_PROCESSES + 1):
# up_to = i - 1
# processing_check_file = metric_var_files_sorted[up_to]
# logger.info('processing %s' % processing_check_file)
# @modified 20161224 - send mirage metrics to graphite
# run_timestamp = int(now)
run_timestamp = int(time())
for i in range(1, MIRAGE_PROCESSES + 1):
# @added 20200909 - Task #3730: Validate Mirage running multiple processes
up_to = i - 1
processing_check_file = metric_var_files_sorted[up_to]
logger.info('processing %s' % processing_check_file)
# @modified 20200909 - Task #3730: Validate Mirage running multiple processes
# p = Process(target=self.spin_process, args=(i, run_timestamp, metric_var_files_sorted))
p = Process(target=self.spin_process, args=(i, run_timestamp, processing_check_file))
pids.append(p)
pid_count += 1
logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(MIRAGE_PROCESSES)))
p.start()
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# spawned_pids.append(p.pid)
spawned_pids.append([p.pid, i])
logger.info('started spin_process %s with pid %s' % (str(pid_count), str(p.pid)))
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_process has run
# for longer than 180 seconds - 20160512 @earthgecko
p_starts = time()
while time() - p_starts <= settings.MAX_ANALYZER_PROCESS_RUNTIME:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (
skyline_app, str(MIRAGE_PROCESSES), time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive())))
p.join()
# @added 20200607 - Feature #3508: ionosphere.untrainable_metrics
# Check to non 3sigma algorithm errors too
if LOCAL_DEBUG:
logger.debug('debug :: adding negatives_present to check_algorithm_errors')
check_algorithm_errors = ['negatives_present']
for algorithm in list(settings.MIRAGE_ALGORITHMS):
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: adding %s to check_algorithm_errors' % (algorithm))
check_algorithm_errors.append(algorithm)
# @added 20200607 - Feature #3566: custom_algorithms
if CUSTOM_ALGORITHMS:
for custom_algorithm in settings.CUSTOM_ALGORITHMS:
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: adding custom_algorithm %s to check_algorithm_errors' % (custom_algorithm))
check_algorithm_errors.append(custom_algorithm)
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: checking for algorithm error files')
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# for completed_pid in spawned_pids:
for completed_pid, mirage_process in spawned_pids:
logger.info('spin_process with pid %s completed' % (str(completed_pid)))
# @modified 20200607 - Feature #3566: custom_algorithms
# Feature #3508: ionosphere.untrainable_metrics
# Check to non 3sigma algorithm errors too and wrapped in try
try:
# for algorithm in settings.MIRAGE_ALGORITHMS:
for algorithm in check_algorithm_errors:
algorithm_error_file = '%s/%s.%s.%s.algorithm.error' % (
settings.SKYLINE_TMP_DIR, skyline_app,
str(completed_pid), algorithm)
if os.path.isfile(algorithm_error_file):
logger.info(
'error :: spin_process with pid %s has reported an error with the %s algorithm' % (
str(completed_pid), algorithm))
try:
with open(algorithm_error_file, 'r') as f:
error_string = f.read()
logger.error('%s' % str(error_string))
except:
logger.error('error :: failed to read %s error file' % algorithm)
try:
os.remove(algorithm_error_file)
except OSError:
pass
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check algorithm errors')
if LOCAL_DEBUG or DEBUG_CUSTOM_ALGORITHMS:
logger.debug('debug :: checked for algorithm error files')
# Grab data from the queue and populate dictionaries
exceptions = {}
anomaly_breakdown = {}
while 1:
try:
key, value = self.mirage_anomaly_breakdown_q.get_nowait()
if key not in list(anomaly_breakdown.keys()):
anomaly_breakdown[key] = value
else:
anomaly_breakdown[key] += value
except Empty:
# @added 20191113 - Branch #3262: py3
# Log
logger.info('anomaly_breakdown.keys are empty')
break
while 1:
try:
key, value = self.mirage_exceptions_q.get_nowait()
if key not in list(exceptions.keys()):
exceptions[key] = value
else:
exceptions[key] += value
except Empty:
# @added 20191113 - Branch #3262: py3
# Log
logger.info('exceptions.keys are empty')
break
# @added 20191021 - Bug #3288: Always send anomaly_breakdown and exception metrics
# Branch #3262: py3
exceptions_metrics = ['Boring', 'Stale', 'TooShort', 'Other']
for i_exception in exceptions_metrics:
if i_exception not in list(exceptions.keys()):
exceptions[i_exception] = 0
# @modified 20200607 - Feature #3566: custom_algorithms
# for i_anomaly_breakdown in settings.MIRAGE_ALGORITHMS:
for i_anomaly_breakdown in check_algorithm_errors:
if i_anomaly_breakdown not in list(anomaly_breakdown.keys()):
anomaly_breakdown[i_anomaly_breakdown] = 0
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis set and not self.metric_variables
metric_variables = []
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# literal_metric_variables = list(self.redis_conn.smembers('mirage.metric_variables'))
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Handle per process
# literal_metric_variables = list(self.redis_conn_decoded.smembers('mirage.metric_variables'))
metric_variable_redis_set = 'mirage.%s.metric_variables' % str(mirage_process)
literal_metric_variables = list(self.redis_conn_decoded.smembers(metric_variable_redis_set))
for item_list_string in literal_metric_variables:
list_item = literal_eval(item_list_string)
metric_variables.append(list_item)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
# Handle per process
try:
self.redis_conn.delete(metric_variable_redis_set)
logger.info('deleted Redis set - %s' % metric_variable_redis_set)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % metric_variable_redis_set)
# @added 20191113 - Branch #3262: py3
# Set default values
metric_name = None
metric_value = None
hours_to_resolve = 0
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# for metric_variable in self.metric_variables:
for metric_variable in metric_variables:
if metric_variable[0] == 'metric_name':
metric_name = metric_variable[1]
if metric_variable[0] == 'metric_value':
metric_value = metric_variable[1]
if metric_variable[0] == 'hours_to_resolve':
hours_to_resolve = metric_variable[1]
# if metric_variable[0] == 'metric_timestamp':
# metric_timestamp = metric_variable[1]
logger.info('analysis done - %s' % str(metric_name))
# Send alerts
# Calculate hours second order resolution to seconds
# @modified 20191113 - Branch #3262: py3
# Only if set
if hours_to_resolve:
logger.info('analyzed at %s hours resolution' % hours_to_resolve)
second_order_resolution_seconds = int(hours_to_resolve) * 3600
logger.info('analyzed at %s seconds resolution' % str(second_order_resolution_seconds))
# Remove metric check file
metric_check_file = 'None'
try:
metric_check_file = '%s/%s' % (settings.MIRAGE_CHECK_PATH, processing_check_file)
if LOCAL_DEBUG:
logger.debug('debug :: interpolated metric_check_file to %s' % metric_check_file)
except:
logger.error('error :: failed to interpolate metric_check_file')
if os.path.isfile(metric_check_file):
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
if LOCAL_DEBUG:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove metric_check_file - %s' % metric_check_file)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove metric_check_file - %s' % metric_check_file)
else:
if LOCAL_DEBUG:
logger.debug('debug :: no metric_check_file to remove OK - %s' % metric_check_file)
# Remove the metric directory
# @modified 20191113 - Branch #3262: py3
# Convert None to str
# timeseries_dir = metric_name.replace('.', '/')
metric_data_dir = 'None'
try:
metric_name_str = str(metric_name)
timeseries_dir = metric_name_str.replace('.', '/')
metric_data_dir = '%s/%s' % (settings.MIRAGE_CHECK_PATH, timeseries_dir)
if LOCAL_DEBUG:
logger.debug('debug :: metric_data_dir interpolated to %s' % str(metric_data_dir))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to interpolate metric_data_dir')
metric_data_dir = 'None'
if os.path.exists(metric_data_dir):
try:
rmtree(metric_data_dir)
logger.info('removed - %s' % metric_data_dir)
except:
logger.error('error :: failed to rmtree %s' % metric_data_dir)
else:
if LOCAL_DEBUG:
logger.debug('debug :: metric_data_dir does not exist - %s' % str(metric_data_dir))
ionosphere_unique_metrics = []
if settings.MIRAGE_ENABLE_ALERTS:
# @added 20161228 - Feature #1830: Ionosphere alerts
# Branch #922: Ionosphere
# Bringing Ionosphere online - do alert on Ionosphere metrics
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_unique_metrics = list(self.redis_conn.smembers('ionosphere.unique_metrics'))
ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get ionosphere.unique_metrics from Redis')
ionosphere_unique_metrics = []
else:
if LOCAL_DEBUG:
logger.debug('debug :: settings.MIRAGE_ENABLE_ALERTS is not True')
# @added 20161228 - Feature #1830: Ionosphere alerts
# Branch #922: Ionosphere
# Send alerts for Ionosphere
alert_context = 'Mirage'
if ionosphere_alerts_returned:
alert_context = 'Ionosphere'
ionosphere_unique_metrics = []
logger.info('Ionosphere alerts requested emptying ionosphere_unique_metrics so Mirage will alert')
exceptions = {}
# @modified 20190524 - Branch #3002
# Wrapped in try except
try:
run_timestamp = int(time())
# @modified 20200430 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# ionosphere_alert_on = list(self.redis_conn.scan_iter(match='ionosphere.mirage.alert.*'))
ionosphere_alert_on = list(self.redis_conn_decoded.scan_iter(match='ionosphere.mirage.alert.*'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get ionosphere.mirage.alert.* from Redis key scan')
ionosphere_alert_on = []
for cache_key in ionosphere_alert_on:
try:
# @modified 20200322 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# alert_on = self.redis_conn.get(cache_key)
alert_on = self.redis_conn_decoded.get(cache_key)
send_alert_for = literal_eval(alert_on)
value = float(send_alert_for[0])
base_name = str(send_alert_for[1])
metric_timestamp = int(float(send_alert_for[2]))
triggered_algorithms = send_alert_for[3]
# @added 20201001 - Task #3748: POC SNAB
# Added algorithms_run required to determine the anomalyScore
algorithms_run = send_alert_for[5]
second_order_resolution_seconds = int(send_alert_for[4])
# @modified 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added triggered_algorithms and algorithms_run
# anomalous_metric = [value, base_name, metric_timestamp, second_order_resolution_seconds]
anomalous_metric = [value, base_name, metric_timestamp, second_order_resolution_seconds, triggered_algorithms, algorithms_run]
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.anomalous_metrics.append(anomalous_metric)
redis_set = 'mirage.anomalous_metrics'
data = str(anomalous_metric)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20210323 - Feature #3642: Anomaly type classification
if LUMINOSITY_CLASSIFY_ANOMALIES:
redis_set = 'luminosity.classify_anomalies'
added_at = int(time())
data_dict = {
'metric': base_name,
'timestamp': int(metric_timestamp),
'value': value,
'algorithms': algorithms_run,
'triggered_algorithms': triggered_algorithms,
'app': skyline_app,
'added_at': added_at,
}
data = [base_name, int(metric_timestamp), added_at, data_dict]
try:
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
anomaly_breakdown = {}
for algorithm in triggered_algorithms:
anomaly_breakdown[algorithm] = 1
self.redis_conn.delete(cache_key)
except:
logger.error(traceback.format_exc())
# @modified 20200322 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# logger.error('error :: failed to add an Ionosphere anomalous_metric for %s' % base_name)
logger.error('error :: failed to add an Ionosphere anomalous_metric for cache key %s' % cache_key)
else:
if LOCAL_DEBUG:
logger.debug('debug :: no ionosphere_alerts_returned - %s' % str(ionosphere_alerts_returned))
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
# To reduce the amount of I/O used by Mirage in this loop check
# and reduce the number of log entries for 'not alerting - Ionosphere metric'
# a check is made if the metric_name has already been check, if
# so continue
not_alerting_for_ionosphere = 'none'
# @added 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Only check Ionosphere is up once per cycle
ionosphere_up = False
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
mirage_anomalous_metrics = []
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# literal_mirage_anomalous_metrics = list(self.redis_conn.smembers('mirage.anomalous_metrics'))
literal_mirage_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage.anomalous_metrics'))
for metric_list_string in literal_mirage_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_anomalous_metrics.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine list from mirage.anomalous_metrics Redis set')
mirage_anomalous_metrics = []
# @added 20200907 - Feature #3734: waterfall alerts
# Add alert for expired waterfall_alert items
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
waterfall_alert_check_timestamp = int(time())
waterfall_alerts_to_alert_on = []
# A list to add a metric,timestamp string to in order to override
# the ionosphere_metric in the alerting block
alerting_waterfall_alerts = []
waterfall_redis_sets = [
'mirage.waterfall_alerts.sent_to_ionosphere',
]
for waterfall_redis_set in waterfall_redis_sets:
redis_set = waterfall_redis_set
literal_waterfall_alerts = []
try:
literal_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_waterfall_alerts = []
waterfall_alerts = []
logger.info('checking for expired checks in %s waterfall alerts in Redis set %s' % (
str(len(literal_waterfall_alerts)), redis_set))
for literal_waterfall_alert in literal_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
waterfall_alerts.append(waterfall_alert)
for waterfall_alert in waterfall_alerts:
if waterfall_alert_check_timestamp >= (int(waterfall_alert[3]) + 300):
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item to alert on from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
waterfall_alerts_to_alert_on.append(waterfall_alert)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove feedback metric waterfall alert item for %s from Redis set %s' % (
base_name, redis_set))
for waterfall_alert in waterfall_alerts_to_alert_on:
try:
value = float(waterfall_alert[2])
base_name = str(waterfall_alert[0])
metric_timestamp = int(waterfall_alert[1])
# @added 20201008 - Feature #3734: waterfall alerts
# Branch #3068: SNAB
# Added triggered_algorithms and algorithms_run
algorithms_run = waterfall_alert[4][4]
triggered_algorithms = waterfall_alert[4][5]
# @modified 20201008 - Feature #3734: waterfall alerts
# Branch #3068: SNAB
# Added triggered_algorithms and algorithms_run
# anomalous_metric = [value, base_name, metric_timestamp]
# mirage_anomalous_metrics.append([value, base_name, metric_timestamp])
anomalous_metric = [value, base_name, metric_timestamp, triggered_algorithms, algorithms_run]
mirage_anomalous_metrics.append(anomalous_metric)
waterfall_alert_check_string = '%s.%s' % (str(metric_timestamp), base_name)
alerting_waterfall_alerts.append(waterfall_alert_check_string)
logger.info('waterfall alerting on %s' % base_name)
redis_waterfall_alert_key = 'mirage.waterfall.alert.%s' % waterfall_alert_check_string
try:
self.redis_conn.setex(redis_waterfall_alert_key, 300, waterfall_alert_check_timestamp)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add Redis key - %s for waterfall alert' % (
redis_waterfall_alert_key))
# @added 20200929 - Feature #3734: waterfall alerts
# Task #3748: POC SNAB
# Branch #3068: SNAB
# Added Panorama anomaly details for waterfall alerts
# Note:
# The values are enclosed is single quoted intentionally
# as the imp.load_source used results in a shift in the
# decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
logger.info('adding panorama anomaly file for waterfall alert on %s' % base_name)
panorama_data = None
try:
panorama_data = waterfall_alert[4]
from_timestamp = str(int(panorama_data[2]))
int_metric_timestamp = int(panorama_data[3])
algorithms_run = panorama_data[4]
triggered_algorithms = panorama_data[5]
source = panorama_data[7]
added_at = str(int(time()))
panaroma_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'app = \'%s\'\n' \
'source = \'%s\'\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(value), from_timestamp,
str(int_metric_timestamp), str(algorithms_run),
triggered_algorithms, skyline_app, source,
this_host, added_at)
logger.info('panorama anomaly data for waterfall alert - %s' % str(panorama_data))
# Create an anomaly file with details about the anomaly
sane_metricname = filesafe_metricname(str(base_name))
panaroma_anomaly_file = '%s/%s.%s.txt' % (
settings.PANORAMA_CHECK_PATH, added_at, sane_metricname)
try:
write_data_to_file(
skyline_app, panaroma_anomaly_file, 'w',
panaroma_anomaly_data)
logger.info('added panorama anomaly file for waterfall alert :: %s' % (panaroma_anomaly_file))
except:
logger.error('error :: failed to add panorama anomaly file :: %s' % (panaroma_anomaly_file))
logger.error(traceback.format_exc())
redis_set = 'mirage.sent_to_panorama'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add panorama anomaly data file for waterfall alert')
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add waterfall alert to alert on to mirage_anomalous_metrics')
# @added 20200913 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Info #1792: Shapelet extraction
snab_checks_sent = []
added_to_snab_at = int(time())
# @added 20200929 - Task #3748: POC SNAB
# Branch #3068: SNAB
# Added three-sigma snab.panorama items
panorama_added_at = int(time())
panorama_three_sigma_snab_added = []
# @added 20200610 - Feature #3560: External alert config
external_alerts = {}
external_from_cache = None
internal_alerts = {}
internal_from_cache = None
all_alerts = list(settings.ALERTS)
all_from_cache = None
if EXTERNAL_ALERTS:
try:
external_alerts, external_from_cache, internal_alerts, internal_from_cache, all_alerts, all_from_cache = get_external_alert_configs(skyline_app)
except:
logger.error(traceback.format_exc())
logger.error('error :: could not determine external alert configs')
logger.info('retrieved %s external_alerts configurations from_cache %s, %s internal_alerts from_cache %s and %s all_alerts from_cache %s' % (
str(len(external_alerts)), str(external_from_cache),
str(len(internal_alerts)), str(internal_from_cache),
str(len(all_alerts)), str(all_from_cache)))
if LOCAL_DEBUG:
logger.debug('debug :: all_alerts :: %s' % str(all_alerts))
if not all_alerts:
logger.error('error :: all_alerts is not set, so creating from settings.ALERTS')
all_alerts = list(settings.ALERTS)
# @added 20220316 - Feature #4482: Test alerts
# Add test_alerts http_alerters to all_alerts because any new
# external_settings alerters may not have been updated/added yet
if test_alerts:
test_all_alerts = []
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_alerter = test_alerts[test_alert_timestamp]['alerter']
if not test_alerter.startswith('http_alerter'):
continue
test_metric = test_alerts[test_alert_timestamp]['metric']
test_id = test_alerts[test_alert_timestamp]['alerter_id']
external_alert_id = 'external-%s' % str(test_id)
test_alert_setting = [
test_metric, test_alerter, 60, 168,
{'id': external_alert_id, 'alerter': test_alerter,
'namespace': test_metric, 'type': 'external',
'expiration': 60, 'second_order_resolution': 604800}
]
test_all_alerts.append(test_alert_setting)
logger.info('adding test_alert setting: %s' % str(test_alert_setting))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to add test_alert to test_all_alerts %s - %s' % (
str(test_alerts[test_alert_timestamp]), err))
for item in all_alerts:
test_all_alerts.append(item)
all_alerts = list(test_all_alerts)
not_ionosphere_metrics = []
# @modified 20200610 - Feature #3560: External alert config
# for alert in settings.ALERTS:
for alert in all_alerts:
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
not_an_ionosphere_metric_check_done = 'none'
if LOCAL_DEBUG:
logger.debug('debug :: %s metrics in mirage_anomalous_metrics' % str(len(mirage_anomalous_metrics)))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# for metric in self.anomalous_metrics:
for metric in mirage_anomalous_metrics:
# @added 20161228 - Feature #1830: Ionosphere alerts
# Branch #922: Ionosphere
# Bringing Ionosphere online - do alert on Ionosphere
# metrics if Ionosphere is up
try:
metric_name = '%s%s' % (settings.FULL_NAMESPACE, str(metric[1]))
if LOCAL_DEBUG:
logger.debug('debug :: metric_name interpolated to %s' % str(metric_name))
except:
if LOCAL_DEBUG:
logger.error(traceback.format_exc())
logger.debug('debug :: failed to interpolate metric_name')
# @added 20200907 - Feature #3734: waterfall alerts
waterfall_alert_check_string = '%s.%s' % (str(int(metric[2])), metric[1])
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if metric[1] in test_alert_metrics:
if metric_name in ionosphere_unique_metrics:
ionosphere_unique_metrics.remove(metric_name)
# @modified 20200907 - Feature #3734: waterfall alerts
# if metric_name in ionosphere_unique_metrics:
if metric_name in ionosphere_unique_metrics and waterfall_alert_check_string not in alerting_waterfall_alerts:
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
if not_alerting_for_ionosphere == metric_name:
continue
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Only check Ionosphere is up once per cycle
# ionosphere_up = False
if not ionosphere_up:
try:
ionosphere_up = self.redis_conn.get('ionosphere')
except Exception as e:
logger.error('error :: could not query Redis for ionosphere key: %s' % str(e))
if ionosphere_up:
# @modified 20190408 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Wrapped this block up on conditional based on
# ionosphere_alerts_returned
if not ionosphere_alerts_returned:
logger.info('not alerting - Ionosphere metric - %s' % str(metric[1]))
# @added 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
not_alerting_for_ionosphere = metric_name
continue
else:
logger.error('error :: Ionosphere not report up')
logger.info('taking over alerting from Ionosphere if alert is matched on - %s' % str(metric[1]))
else:
# @modified 20181114 - Bug #2682: Reduce mirage ionosphere alert loop
# logger.info('not an Ionosphere metric checking whether to alert - %s' % str(metric[1]))
if not_an_ionosphere_metric_check_done == metric_name:
# Do not log multiple times for this either
not_an_ionosphere_metric_check_done = metric_name
else:
if not ionosphere_alerts_returned:
if metric_name not in not_ionosphere_metrics:
logger.info('not an Ionosphere metric checking whether to alert - %s' % str(metric[1]))
not_an_ionosphere_metric_check_done = metric_name
not_ionosphere_metrics.append(metric_name)
# ALERT_MATCH_PATTERN = alert[0]
# METRIC_PATTERN = metric[1]
# @modified 20200622 - Task #3586: Change all alert pattern checks to matched_or_regexed_in_list
# Feature #3512: matched_or_regexed_in_list function
# Changed original alert matching pattern to use new
# method
base_name = str(metric[1])
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, [alert[0]])
if LOCAL_DEBUG and pattern_match:
logger.debug('debug :: %s matched alert - %s' % (base_name, alert[0]))
try:
del metric_matched_by
except:
pass
except:
pattern_match = False
if pattern_match:
# @added 20200610 - Feature #3560: External alert config
# @modified 20200624 - Feature #3560: External alert config
# Set the alert key to the external alerter id
# external_alerter_alerter = None
external_alerter_id = None
try:
if alert[4]['type'] == 'external':
# @modified 20200624 - Feature #3560: External alert config
# Set the alert key to the external alerter id
# external_alerter_alerter = alert[4]['alerter']
external_alerter_id = alert[4]['id'].replace('external-', '')
except:
external_alerter_id = None
# @modified 20200610 - Feature #3560: External alert config
# Use the all_alerts list which includes external alert configs
# cache_key = 'mirage.last_alert.%s.%s' % (alert[1], metric[1])
# @modified 20200624 - Feature #3560: External alert config
# Set the alert key to the external alerter id
# if external_alerter_alerter:
# cache_key = 'mirage.last_alert.%s.%s.%s' % (str(external_alerter_alerter), alert[1], metric[1])
if external_alerter_id:
cache_key = 'mirage.last_alert.%s.%s.%s' % (str(external_alerter_id), alert[1], metric[1])
else:
cache_key = 'mirage.last_alert.%s.%s' % (alert[1], metric[1])
try:
# @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Changed the last_alert cache key to hold the last
# anomaly timestamp
# last_alert = self.redis_conn.get(cache_key)
last_alert = self.redis_conn_decoded.get(cache_key)
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Evaluate the reported anomaly timestamp to determine whether
# EXPIRATION_TIME should be applied to a batch metric
if last_alert:
# Is this a analyzer_batch related anomaly
analyzer_batch_anomaly = None
metric_timestamp = metric[2]
analyzer_batch_metric_anomaly_key = 'analyzer_batch.anomaly.%s.%s' % (
str(metric_timestamp), metric[1])
try:
analyzer_batch_anomaly = self.redis_conn_decoded.get(analyzer_batch_metric_anomaly_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s - %s' % (
analyzer_batch_metric_anomaly_key, e))
analyzer_batch_anomaly = None
if analyzer_batch_anomaly:
logger.info('identified as an analyzer_batch triggered anomaly from the presence of the Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
logger.info('not identified as an analyzer_batch triggered anomaly as no Redis key found - %s' % analyzer_batch_metric_anomaly_key)
if last_alert and analyzer_batch_anomaly:
last_timestamp = None
try:
last_timestamp = int(last_alert)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine last_timestamp from the last Mirage alert key - %s' % cache_key)
last_timestamp = None
seconds_between_batch_anomalies = None
if last_timestamp:
try:
seconds_between_batch_anomalies = int(metric_timestamp) - int(last_timestamp)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine seconds_between_batch_anomalies for batch metric Panorama key- %s' % cache_key)
last_timestamp = None
if seconds_between_batch_anomalies:
if seconds_between_batch_anomalies > int(alert[2]):
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is greater than the metric EXPIRATION_TIME of %s' % (
str(last_timestamp), str(metric_timestamp), metric[1],
str(alert[2])))
logger.info('alerting on anomaly for batch metric %s, so setting last_alert to None' % (
metric))
last_alert = None
else:
logger.info('the difference between the last anomaly timestamp (%s) and the batch anomaly timestamp (%s) for batch metric %s is less than the metric EXPIRATION_TIME of %s, not alerting' % (
str(last_timestamp), str(metric_timestamp), metric[1],
str(alert[2])))
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Task #3562: Change panorama.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# If the metric is a batch processing metric and the anomaly
# timestamp is less than the last_check timestamp, insert
# the anomaly
if int(metric_timestamp) < last_timestamp:
logger.info('batch anomaly timestamp (%s) less than the last_check timestamp (%s), alerting on anomaly for batch metric %s, so setting last_alert to None' % (
str(metric_timestamp), str(last_timestamp), metric))
last_alert = None
if not last_alert:
if ionosphere_alerts_returned:
# @modified 20190410 - Feature #2882: Mirage - periodic_check
# Only set if not set
try:
second_order_resolution_seconds + 1
set_second_order_resolution_seconds = False
except:
set_second_order_resolution_seconds = True
if set_second_order_resolution_seconds:
try:
second_order_resolution_seconds = int(metric[3]) * 3600
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine full_duration from the Ionosphere alert for %s' % (metric[1]))
logger.info('using settings.FULL_DURATION - %s' % (str(settings.FULL_DURATION)))
second_order_resolution_seconds = int(settings.FULL_DURATION)
# @modified 20190524 - Branch #3002
# Wrapped in try except
try:
# @modified 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Change the last_alert cache key to hold the
# the anomaly timestamp for which the alert
# was sent, not the packb anomaly value.
# Using the timestamp of the anomaly allows
# it to be used to determine if a batch
# anomaly should be alerted on based on the
# comparison of the timestamps rather than
# just the presence of the last_alert key
# based on it not having reach its TTL as
# analyzer_batch could send multiple
# anomalies in one batch that might be
# EXPIRATION_TIME apart.
# self.redis_conn.setex(cache_key, alert[2], packb(metric[0]))
self.redis_conn.setex(cache_key, str(alert[2]), int(metric[2]))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to set Redis key %s for %s' % (
str(cache_key), metric[1]))
# @added 20200929 - Task #3748: POC SNAB
# Branch #3068: SNAB
# Determine if this is a snab_check_metric so
# that info can be passed to the alerter
snab_check_metric = False
if SNAB_ENABLED:
for app in SNAB_CHECKS:
if app == skyline_app:
for snab_context in SNAB_CHECKS[app]:
if snab_check_metric:
break
if snab_context == 'testing':
for algorithm in SNAB_CHECKS[app][snab_context]:
try:
algorithm_source = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_source']
except:
break
if not os.path.isfile(algorithm_source):
break
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
if namespace in base_name:
snab_check_metric = True
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check if %s is a snab_check_metric' % base_name)
if snab_check_metric:
algorithm_group = 'three-sigma'
# @added 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added second_order_resolution_seconds, triggered_algorithms and algorithms_run
try:
triggered_algorithms = metric[4]
algorithms_run = metric[5]
except:
triggered_algorithms = []
algorithms_run = []
# @added 20201001 - Task #3748: POC SNAB
# Added anomalyScore
try:
if triggered_algorithms and algorithms_run:
anomalyScore = len(triggered_algorithms) / len(algorithms_run)
else:
anomalyScore = 1.0
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine anomalyScore for %s' % base_name)
anomalyScore = 1.0
# @added 20201001 - Branch #3068: SNAB
# Task #3748: POC SNAB
# Added analysis_run_time
analysis_run_time = 0
redis_key = 'mirage.analysis_run_time.%s.%s' % (base_name, str(metric[2]))
try:
analysis_run_time_data = self.redis_conn_decoded.get(redis_key)
if analysis_run_time_data:
analysis_run_time = float(analysis_run_time_data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine analysis_run_time from Redis key - %s' % (
redis_key))
try:
snab_panorama_details = {
'metric': base_name,
'timestamp': int(metric[2]),
# @added 20201001 - Task #3748: POC SNAB
# Added anomalyScore and analysis_run_time
'anomalyScore': anomalyScore,
'analysis_run_time': analysis_run_time,
'source': skyline_app,
'algorithm_group': algorithm_group,
'algorithm': None,
'added_at': panorama_added_at,
}
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to create snab_panorama_details dict %s' % base_name)
snab_panorama_details = None
# Only send once for a metric and timestamp
if snab_panorama_details not in panorama_three_sigma_snab_added:
self.redis_conn.sadd('snab.panorama', str(snab_panorama_details))
logger.info('added snab.panorama three-sigma item - %s' % (
str(snab_panorama_details)))
panorama_three_sigma_snab_added.append(snab_panorama_details)
# @added 20210801 - Feature #4214: alert.paused
alert_paused = False
try:
cache_key = 'alert.paused.%s.%s' % (alert[1], base_name)
alert_paused = self.redis_conn_decoded.get(cache_key)
except Exception as e:
logger.error('error :: alert_paused check failed: %s' % str(e))
alert_paused = False
if alert_paused:
logger.info('alert_paused for %s %s until %s' % (
alert[1], base_name, str(alert_paused)))
# trigger_alert(alert, metric, second_order_resolution_seconds, context)
try:
# @added 20210409 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Always determine triggered_algorithms and
# calculate anomalyScore
try:
triggered_algorithms = metric[4]
algorithms_run = metric[5]
except:
triggered_algorithms = []
algorithms_run = []
try:
if triggered_algorithms and algorithms_run:
anomalyScore = len(triggered_algorithms) / len(algorithms_run)
else:
anomalyScore = 1.0
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine anomalyScore for %s' % base_name)
anomalyScore = 1.0
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if base_name in test_alert_metrics:
triggered_algorithms = ['testing']
if alert[1] != 'smtp':
new_alert = None
# @added 20201007 - Feature #3772: Add the anomaly_id to the http_alerter json
if 'http_alerter' in alert[1]:
anomaly_id = None
anomaly_id_redis_key = 'panorama.anomaly_id.%s.%s' % (
str(int(metric[2])), base_name)
try_get_anomaly_id_redis_key_count = 0
while try_get_anomaly_id_redis_key_count < 20:
try_get_anomaly_id_redis_key_count += 1
try:
anomaly_id = int(self.redis_conn_decoded.get(anomaly_id_redis_key))
break
except:
sleep(1)
if not anomaly_id:
logger.error('error :: failed to determine anomaly_id from Redis key - %s' % anomaly_id_redis_key)
else:
logger.info('determined anomaly_id as %s, appending to alert' % str(anomaly_id))
# Do not modify the alert list object, create a new one
new_alert = list(alert)
new_alert.append(['anomaly_id', anomaly_id])
# @added 20201130 - Feature #3772: Add the anomaly_id to the http_alerter json
# Determine the triggered_algorithms
# and algorithms_run
try:
triggered_algorithms = metric[4]
algorithms_run = metric[5]
except:
triggered_algorithms = []
algorithms_run = []
# @added 20201111 - Feature #3772: Add the anomaly_id to the http_alerter json
# Add the real anomalyScore
try:
if triggered_algorithms and algorithms_run:
anomalyScore = len(triggered_algorithms) / len(algorithms_run)
else:
anomalyScore = 1.0
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine anomalyScore for %s' % base_name)
anomalyScore = 1.0
new_alert.append(['anomalyScore', anomalyScore])
if base_name in test_alert_metrics:
triggered_algorithms = ['testing']
new_alert.append(['test_alert', True])
# @added 20200929 - Task #3748: POC SNAB
# Branch #3068: SNAB
# Determine if this is a snab_check_metric so
# that info can be passed to the alerter
if SNAB_ENABLED and snab_check_metric:
if alert[1] == 'slack':
anomaly_id = None
snab_id = None
snab_details = None
anomaly_id_redis_key = 'panorama.anomaly_id.%s.%s' % (
str(int(metric[2])), base_name)
try_get_anomaly_id_redis_key_count = 0
while try_get_anomaly_id_redis_key_count < 20:
try_get_anomaly_id_redis_key_count += 1
try:
anomaly_id = int(self.redis_conn_decoded.get(anomaly_id_redis_key))
break
except:
sleep(1)
if not anomaly_id:
logger.error('error :: failed to determine anomaly_id from Redis key - %s' % anomaly_id_redis_key)
if anomaly_id:
# snab_id_redis_key = 'snab.id.%s.%s.%s.%s' % (algorithm_group, str(metric_timestamp), base_name, panorama_added_at)
snab_id_redis_key = 'snab.id.%s.%s.%s.%s' % (algorithm_group, str(int(metric[2])), base_name, panorama_added_at)
try_get_snab_id_redis_key_count = 0
while try_get_snab_id_redis_key_count < 20:
try_get_snab_id_redis_key_count += 1
try:
snab_id = int(self.redis_conn_decoded.get(snab_id_redis_key))
break
except:
sleep(1)
if not snab_id:
logger.error('error :: failed to determine snab_id from Redis key - %s' % snab_id_redis_key)
snab_details = ['snab_details', snab_id, anomaly_id, anomalyScore]
# Do not modify the alert list object, create a new one
new_alert = list(alert)
new_alert.append(snab_details)
# @modified 20201008 - Feature #3772: Add the anomaly_id to the http_alerter json
# Feature #3734: waterfall alerts
# Branch #3068: SNAB
# Use the appropriate alert context
if new_alert:
# @added 20210801 - Feature #4214: alert.paused
if not alert_paused:
logger.info('trigger_alert :: alert: %s, metric: %s, second_order_resolution_seconds: %s, context: %s' % (
str(new_alert), str(metric),
str(second_order_resolution_seconds),
str(alert_context)))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(new_alert, metric, second_order_resolution_seconds, alert_context, triggered_algorithms)
else:
# @added 20210801 - Feature #4214: alert.paused
if not alert_paused:
logger.info('trigger_alert :: alert: %s, metric: %s, second_order_resolution_seconds: %s, context: %s' % (
str(alert), str(metric),
str(second_order_resolution_seconds),
str(alert_context)))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
trigger_alert(alert, metric, second_order_resolution_seconds, alert_context, triggered_algorithms)
else:
# @added 20210801 - Feature #4214: alert.paused
if not alert_paused:
logger.info('smtp_trigger_alert :: alert: %s, metric: %s, second_order_resolution_seconds: %s, context: %s' % (
str(alert), str(metric),
str(second_order_resolution_seconds),
str(alert_context)))
# @modified 20210304 - Feature #3642: Anomaly type classification
# Feature #3970: custom_algorithm - adtk_level_shift
# Added triggered_algorithms
smtp_trigger_alert(alert, metric, second_order_resolution_seconds, alert_context, triggered_algorithms)
if not alert_paused:
logger.info('sent %s alert: For %s' % (alert[1], metric[1]))
except Exception as e:
logger.error('error :: could not send %s alert for %s: %s' % (alert[1], metric[1], e))
# @added 20220315 - Feature #4482: Test alerts
# Allow for full testing with the injection of an anomaly on a
# metric
if base_name in test_alert_metrics:
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_metric = test_alerts[test_alert_timestamp]['metric']
if test_metric != base_name:
continue
test_alert_redis_key = '%s.test_alerts' % skyline_app
self.redis_conn_decoded.hdel(test_alert_redis_key, test_alert_timestamp)
logger.info('removed test_alert for %s from Redis key %s' % (
str(test_metric), str(test_alert_redis_key)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove test_alert for %s from Redis key %s - %s' % (
str(test_metric), str(test_alert_redis_key), err))
continue
# @added 20200913 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
# Info #1792: Shapelet extraction
if SNAB_ENABLED:
timeseries_dir = base_name.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(int(metric[2])),
str(timeseries_dir))
anomaly_data = '%s/%s.json' % (training_dir, base_name)
check_full_duration = (int(alert[3]) * 60 * 60)
# TODO:
# Due to how the matrixprofile identifies
# discords, if a metric triggers as
# anomalous with 3sigma it must be checked
# for at x window periods thereafter as
# matrixprofile may only identify a discord
# later when the time series changes again.
for app in SNAB_CHECKS:
if app == skyline_app and check_full_duration:
for snab_context in SNAB_CHECKS[app]:
if snab_context == 'testing':
for algorithm in SNAB_CHECKS[app][snab_context]:
try:
alert_slack_channel = SNAB_CHECKS[app][snab_context][algorithm]['alert_slack_channel']
except:
alert_slack_channel = None
try:
algorithm_source = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_source']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to verify algorithm_source for %s for the %s context' % (
algorithm, snab_context))
break
if not os.path.isfile(algorithm_source):
logger.error('error :: algorithm_source file %s does not exist for %s for the %s context' % (
algorithm_source, algorithm, snab_context))
break
try:
algorithm_parameters = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_parameters']
except:
algorithm_parameters = {}
try:
max_execution_time = SNAB_CHECKS[app][snab_context][algorithm]['max_execution_time']
except:
max_execution_time = 1.0
try:
algo_debug_logging = SNAB_CHECKS[app][snab_context][algorithm]['debug_logging']
except:
algo_debug_logging = False
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
if namespace in base_name:
snab_check_details = {
'metric': base_name,
'timestamp': int(metric[2]),
'original_anomaly_timestamp': int(metric[2]),
'value': metric[0],
'full_duration': check_full_duration,
'anomaly_data': anomaly_data,
'source': 'mirage',
'added_at': added_to_snab_at,
'original_added_at': added_to_snab_at,
'context': snab_context,
'algorithm': algorithm,
'algorithm_source': algorithm_source,
'algorithm_parameters': algorithm_parameters,
'max_execution_time': max_execution_time,
'debug_logging': algo_debug_logging,
'alert_slack_channel': alert_slack_channel,
'processed': None,
'analysed': None,
'anomalous': None,
'anomalyScore': None,
'snab_only': False,
}
add_snab_check = True
if base_name in snab_checks_sent:
add_snab_check = False
break
if add_snab_check:
self.redis_conn.sadd('snab.work', str(snab_check_details))
logger.info('added snab check for %s with algorithm %s for alerter %s' % (
base_name, algorithm, str(alert[1])))
snab_checks_sent.append(base_name)
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check and add check_details to snab.work Redis set if required')
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere'
# @added 20200905 - Feature #3734: waterfall alerts
# Remove the metric from the waterfall_alerts Redis set
# [metric, timestamp, value, added_to_waterfall_timestamp]
# waterfall_data = [metric[1], metric[2], metric[0], added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'mirage.waterfall_alerts.sent_to_ionosphere'
literal_waterfall_alerts = []
try:
literal_waterfall_alerts = list(self.redis_conn_decoded.smembers(redis_set))
except:
literal_waterfall_alerts = []
waterfall_alerts = []
for literal_waterfall_alert in literal_waterfall_alerts:
waterfall_alert = literal_eval(literal_waterfall_alert)
waterfall_alerts.append(waterfall_alert)
for waterfall_alert in waterfall_alerts:
if waterfall_alert[0] == metric:
if int(waterfall_alert[1]) == metric_timestamp:
try:
self.redis_conn.srem(redis_set, str(waterfall_alert))
logger.info('removed waterfall alert item from Redis set %s - %s' % (
redis_set, str(waterfall_alert)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove waterfall alert item for %s at %s from Redis set %s' % (
base_name, str(metric_timestamp), redis_set))
# TODO - Add anomaly to Panorama when a waterfall alert triggers
else:
logger.info('alert Redis key %s exists not alerting for %s' % (str(cache_key), metric[1]))
except Exception as e:
logger.error('error :: could not query Redis for cache_key - %s' % e)
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
snab_only_checks = []
if SNAB_ENABLED:
try:
literal_mirage_snab_only_checks = list(self.redis_conn_decoded.smembers(mirage_snab_only_checks_redis_set))
for snab_only_check_string in literal_mirage_snab_only_checks:
snab_only_check = literal_eval(snab_only_check_string)
snab_only_checks.append(snab_only_check)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine list from mirage.not_anomalous_metrics Redis set')
snab_only_checks = []
if not snab_only_checks:
logger.info('there are no snab_only_checks')
if snab_only_checks:
snab_only_checks_sent = []
for alert in all_alerts:
for snab_check in snab_only_checks:
try:
metric = snab_check['metric']
anomaly_data = snab_check['anomaly_data']
anomaly_timestamp = snab_check['timestamp']
original_anomaly_timestamp = snab_check['original_anomaly_timestamp']
value = snab_check['value']
original_added_at = snab_check['original_added_at']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to details from snab_only_checks snab_check dict - %s' % str(snab_check))
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, metric, [alert[0]])
if LOCAL_DEBUG and pattern_match:
logger.debug('debug :: %s matched alert - %s' % (base_name, alert[0]))
try:
del metric_matched_by
except:
pass
except:
pattern_match = False
if pattern_match:
check_full_duration = (int(alert[3]) * 60 * 60)
for app in SNAB_CHECKS:
if app == skyline_app and check_full_duration:
for snab_context in SNAB_CHECKS[app]:
if snab_context == 'testing':
for algorithm in SNAB_CHECKS[app][snab_context]:
try:
alert_slack_channel = SNAB_CHECKS[app][snab_context][algorithm]['alert_slack_channel']
except:
alert_slack_channel = None
try:
algorithm_source = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_source']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to verify algorithm_source for %s for the %s context' % (
algorithm, snab_context))
break
if not os.path.isfile(algorithm_source):
logger.error('error :: algorithm_source file %s does not exist for %s for the %s context' % (
algorithm_source, algorithm, snab_context))
break
try:
algorithm_parameters = SNAB_CHECKS[app][snab_context][algorithm]['algorithm_parameters']
except:
algorithm_parameters = {}
try:
max_execution_time = SNAB_CHECKS[app][snab_context][algorithm]['max_execution_time']
except:
max_execution_time = 1.0
try:
algo_debug_logging = SNAB_CHECKS[app][snab_context][algorithm]['debug_logging']
except:
algo_debug_logging = False
try:
for namespace in SNAB_CHECKS[app][snab_context][algorithm]['namespaces']:
if namespace in metric:
snab_check_details = {
'metric': metric,
'timestamp': int(anomaly_timestamp),
'original_anomaly_timestamp': int(original_anomaly_timestamp),
'value': value,
'full_duration': check_full_duration,
'anomaly_data': anomaly_data,
'source': 'mirage',
'added_at': added_to_snab_at,
'original_added_at': original_added_at,
'context': snab_context,
'algorithm': algorithm,
'algorithm_source': algorithm_source,
'algorithm_parameters': algorithm_parameters,
'max_execution_time': max_execution_time,
'debug_logging': algo_debug_logging,
'alert_slack_channel': alert_slack_channel,
'processed': None,
'analysed': None,
'anomalous': None,
'anomalyScore': None,
'snab_only': True,
}
add_snab_check = True
if metric in snab_only_checks_sent:
add_snab_check = False
break
if add_snab_check:
self.redis_conn.sadd('snab.work', str(snab_check_details))
logger.info('added snab_only check for %s with algorithm %s for alerter %s' % (
metric, algorithm, str(alert[1])))
snab_only_checks_sent.append(metric)
break
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check and add check_details to snab.work Redis set if required')
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
mirage_not_anomalous_metrics = []
try:
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# literal_mirage_not_anomalous_metrics = list(self.redis_conn.smembers('mirage.not_anomalous_metrics'))
literal_mirage_not_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage.not_anomalous_metrics'))
for metric_list_string in literal_mirage_not_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_not_anomalous_metrics.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine list from mirage.not_anomalous_metrics Redis set')
mirage_not_anomalous_metrics = []
if settings.NEGATE_ANALYZER_ALERTS:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# if len(self.anomalous_metrics) == 0:
if len(mirage_anomalous_metrics) == 0:
# @modified 20200610 - Feature #3560: External alert config
# for negate_alert in settings.ALERTS:
for negate_alert in all_alerts:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# for not_anomalous_metric in self.not_anomalous_metrics:
for not_anomalous_metric in mirage_not_anomalous_metrics:
NEGATE_ALERT_MATCH_PATTERN = negate_alert[0]
NOT_ANOMALOUS_METRIC_PATTERN = not_anomalous_metric[1]
alert_match_pattern = re.compile(NEGATE_ALERT_MATCH_PATTERN)
negate_pattern_match = alert_match_pattern.match(NOT_ANOMALOUS_METRIC_PATTERN)
if negate_pattern_match:
try:
logger.info('negate alert sent: For %s' % (not_anomalous_metric[1]))
trigger_negater(negate_alert, not_anomalous_metric, second_order_resolution_seconds, metric_value)
except Exception as e:
logger.error('error :: could not send alert: %s' % e)
# @added 20220316 - Feature #4482: Test alerts
# Remove test_alert key if metric has been done and anything went wrong
# so that mirage does not loop continuosly on the metric failing
if test_alerts:
for test_alert_timestamp in list(test_alerts.keys()):
try:
test_metric = test_alerts[test_alert_timestamp]['metric']
alert_tested_key = 'mirage.test_alerts.done.%s' % test_metric
alert_tested = None
try:
alert_tested = self.redis_conn_decoded.get(alert_tested_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis key %s - %s' % (
alert_tested_key, err))
if alert_tested:
test_alert_redis_key = '%s.test_alerts' % skyline_app
self.redis_conn_decoded.hdel(test_alert_redis_key, test_alert_timestamp)
logger.info('removed test_alert for %s from Redis key %s' % (
str(test_metric), str(test_alert_redis_key)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to remove test_alert for %s from Redis key %s - %s' % (
str(test_metric), str(test_alert_redis_key), err))
# Log progress
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# if len(self.anomalous_metrics) > 0:
if len(mirage_anomalous_metrics) > 0:
logger.info('seconds since last anomaly :: %.2f' % (time() - now))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# logger.info('total anomalies :: %d' % len(self.anomalous_metrics))
logger.info('total anomalies :: %d' % len(mirage_anomalous_metrics))
logger.info('exception stats :: %s' % str(exceptions))
logger.info('anomaly breakdown :: %s' % str(anomaly_breakdown))
# Log to Graphite
if process_metric_check_files:
run_time = time() - run_timestamp
logger.info('seconds to run :: %.2f' % run_time)
graphite_run_time = '%.2f' % run_time
send_metric_name = skyline_app_graphite_namespace + '.run_time'
send_graphite_metric(skyline_app, send_metric_name, graphite_run_time)
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Add the mirage metric and its EXPIRATION_TIME to
# the mirage.metrics_expiration_times so that Mirage
# can determine the metric EXPIRATION_TIME without
# having to create and iterate the all_alerts
# object in the Mirage analysis phase so that the
# reported anomaly timestamp can be used to determine
# whether the EXPIRATION_TIME should be applied to a
# batch metric in the alerting and Ionosphere contexts
# @modified 20201107 - Feature #3830: metrics_manager
# Use metrics_manager data, now managed there
# mirage_metrics_expiration_times = []
# try:
# mirage_metrics_expiration_times = list(self.redis_conn_decoded.smembers('mirage.metrics_expiration_times'))
# if LOCAL_DEBUG:
# logger.info('debug :: fetched the mirage.metrics_expiration_times Redis set')
# except:
# logger.info('failed to fetch the mirage.metrics_expiration_times Redis set')
# mirage_metrics_expiration_times = []
# try:
# mirage_unique_metrics = list(self.redis_conn_decoded.smembers('mirage.unique_metrics'))
# mirage_unique_metrics_count = len(mirage_unique_metrics)
# logger.info('mirage.unique_metrics Redis set count - %s' % str(mirage_unique_metrics_count))
# if LOCAL_DEBUG:
# logger.info('debug :: fetched the mirage.unique_metrics Redis set')
# logger.info('debug :: %s' % str(mirage_unique_metrics))
# except:
# logger.info('failed to fetch the mirage.unique_metrics Redis set')
# mirage_unique_metrics == []
# for metric in mirage_unique_metrics:
# if metric.startswith(settings.FULL_NAMESPACE):
# base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
# else:
# base_name = metric
# mirage_alert_expiration_data = [base_name, int(alert[2])]
# if str(mirage_alert_expiration_data) not in mirage_metrics_expiration_times:
# try:
# self.redis_conn.sadd('mirage.metrics_expiration_times', str(mirage_alert_expiration_data))
# if LOCAL_DEBUG:
# logger.info('debug :: added %s to mirage.metrics_expiration_times' % str(mirage_alert_expiration_data))
# except:
# if LOCAL_DEBUG:
# logger.error('error :: failed to add %s to mirage.metrics_expiration_times set' % str(mirage_alert_expiration_data))
# Reset counters
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets instead of Manager().list()
# self.anomalous_metrics[:] = []
# self.not_anomalous_metrics[:] = []
# Reset metric_variables
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets instead of Manager().list()
# self.metric_variables[:] = []
# self.sent_to_crucible[:] = []
# self.sent_to_panorama[:] = []
# self.sent_to_ionosphere[:] = []
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets instead of Manager().list()
delete_redis_sets = [
'mirage.anomalous_metrics',
'mirage.not_anomalous_metrics',
'mirage.metric_variables',
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Handle once per minute
# 'mirage.sent_to_crucible',
# 'mirage.sent_to_panorama',
# 'mirage.sent_to_ionosphere',
]
for i_redis_set in delete_redis_sets:
redis_set_to_delete = i_redis_set
try:
self.redis_conn.delete(redis_set_to_delete)
logger.info('deleted Redis set - %s' % redis_set_to_delete)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % redis_set_to_delete)
# DEVELOPMENT ONLY
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
if ENABLE_MEMORY_PROFILING and garbage_collection_enabled:
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
for i in get_objects():
after[type(i)] += 1
gc_results = [(k, after[k] - before[k]) for k in after if after[k] - before[k]]
for gc_result in gc_results:
logger.info('debug :: %s' % str(gc_result))
# @added 20160806 - Bug #1558: Memory leak in Analyzer
# Debug with garbage collection - http://code.activestate.com/recipes/65333/
# show the dirt ;-)
try:
logger.info('garbage collecting')
all_the_garbage = str(self.dump_garbage())
except:
logger.error('error :: during garbage collecting')
logger.error(traceback.format_exc())
all_the_garbage = 'gc errored'
if settings.ENABLE_DEBUG or LOCAL_DEBUG:
logger.info(all_the_garbage)
logger.info('garbage collected')
if LOCAL_DEBUG:
logger.info('debug :: Memory usage end of run: %s (kb)' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
# Send checks.stale_discarded and checks.pending metrics
if int(time()) >= (last_sent_to_graphite + 60):
stale_check_discarded = []
try:
stale_check_discarded = list(self.redis_conn_decoded.smembers('mirage.stale_check_discarded'))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get mirage.stale_check_discarded set from Redis')
stale_check_discarded = []
stale_check_discarded_count = len(stale_check_discarded)
logger.info('checks.stale_discarded :: %s' % str(stale_check_discarded_count))
send_metric_name = '%s.checks.stale_discarded' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, str(stale_check_discarded_count))
checks_pending = [f_pending for f_pending in listdir(settings.MIRAGE_CHECK_PATH) if isfile(join(settings.MIRAGE_CHECK_PATH, f_pending))]
checks_pending_count = len(checks_pending)
logger.info('checks.pending :: %s' % str(checks_pending_count))
send_metric_name = '%s.checks.pending' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, str(checks_pending_count))
# @modified 20210309 - Task #3730: Validate Mirage running multiple processes
# Reimplement mirage.checks.done count as increment key
# checks_done = []
# try:
# checks_done = list(self.redis_conn_decoded.smembers('mirage.checks.done'))
checks_done = 0
try:
checks_done_str = self.redis_conn_decoded.getset('mirage.checks.done', 0)
if checks_done_str:
checks_done = int(checks_done_str)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get mirage.checks.done key from Redis')
checks_done = 0
# checks_done_count = len(checks_done)
# logger.info('checks.done :: %s' % str(checks_done_count))
logger.info('checks.done :: %s' % str(checks_done))
send_metric_name = '%s.checks.done' % skyline_app_graphite_namespace
# send_graphite_metric(skyline_app, send_metric_name, str(checks_done_count))
send_graphite_metric(skyline_app, send_metric_name, str(checks_done))
# @modified 20200903 - Task #3730: Validate Mirage running multiple processes
# Only send panorama, ionosphere and crucible metrics once a minute
if settings.ENABLE_CRUCIBLE and settings.MIRAGE_CRUCIBLE_ENABLED:
try:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# sent_to_crucible = str(len(self.sent_to_crucible))#
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# sent_to_crucible = str(len(list(self.redis_conn.smembers('mirage.sent_to_crucible'))))
sent_to_crucible = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_crucible'))))
except:
sent_to_crucible = '0'
logger.info('sent_to_crucible :: %s' % sent_to_crucible)
send_metric_name = '%s.sent_to_crucible' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, sent_to_crucible)
if settings.PANORAMA_ENABLED:
try:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# sent_to_panorama = str(len(self.sent_to_panorama))
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# sent_to_panorama = str(len(list(self.redis_conn.smembers('mirage.sent_to_panorama'))))
sent_to_panorama = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_panorama'))))
except:
sent_to_panorama = '0'
logger.info('sent_to_panorama :: %s' % sent_to_panorama)
send_metric_name = '%s.sent_to_panorama' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, sent_to_panorama)
if settings.IONOSPHERE_ENABLED:
try:
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# sent_to_ionosphere = str(len(self.sent_to_ionosphere))
# @modified 20191022 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# sent_to_ionosphere = str(len(list(self.redis_conn.smembers('mirage.sent_to_ionosphere'))))
sent_to_ionosphere = str(len(list(self.redis_conn_decoded.smembers('mirage.sent_to_ionosphere'))))
except Exception as e:
logger.error('error :: could not determine sent_to_ionosphere: %s' % e)
sent_to_ionosphere = '0'
logger.info('sent_to_ionosphere :: %s' % sent_to_ionosphere)
send_metric_name = '%s.sent_to_ionosphere' % skyline_app_graphite_namespace
send_graphite_metric(skyline_app, send_metric_name, sent_to_ionosphere)
last_sent_to_graphite = int(time())
delete_redis_sets = [
'mirage.sent_to_crucible',
'mirage.sent_to_panorama',
'mirage.sent_to_ionosphere',
'mirage.stale_check_discarded',
# @modified 20210309 - Task #3730: Validate Mirage running multiple processes
# Reimplement mirage.checks.done count as increment key
# 'mirage.checks.done',
# @added 20200916 - Branch #3068: SNAB
# Task #3744: POC matrixprofile
mirage_snab_only_checks_redis_set,
]
for i_redis_set in delete_redis_sets:
redis_set_to_delete = i_redis_set
try:
self.redis_conn.delete(redis_set_to_delete)
logger.info('deleted Redis set - %s' % redis_set_to_delete)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete Redis set - %s' % redis_set_to_delete)
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Refresh
try:
filesafe_names_dict = self.redis_conn_decoded.hgetall('metrics_manager.filesafe_base_names')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: hgetall metrics_manager.filesafe_base_names failed - %s' % err)
# Sleep if it went too fast
if time() - now < 1:
logger.info('sleeping due to low run time...')
# sleep(10)
sleep(1)