"""
mirage_vortex.py
"""
import logging
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
from multiprocessing import Process, Queue
import traceback
import re
import json
import sys
import os
from ast import literal_eval
from math import ceil
import copy
import zlib
import gzip
import shutil
import settings
from skyline_functions import (
write_data_to_file, send_anomalous_metric_to, mkdir_p, filesafe_metricname,
get_redis_conn, get_redis_conn_decoded, nonNegativeDerivative,
)
from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
from functions.database.queries.insert_new_metric import insert_new_metric
from functions.database.queries.get_ionosphere_fp_ids_for_full_duration import get_ionosphere_fp_ids_for_full_duration
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.timeseries.downsample import downsample_timeseries
from functions.timeseries.strictly_increasing_monotonicity import strictly_increasing_monotonicity
# @added 20220504 - Feature #2580: illuminance
from functions.illuminance.add_illuminance_entries import add_illuminance_entries
from functions.graphite.send_graphite_metric import send_graphite_metric
from custom_algorithms import run_custom_algorithm_on_timeseries
# from custom_algorithm_sources.sigma.sigma import run_sigma_algorithms
skyline_app = 'mirage_vortex'
# skyline_app_logger = '%sLog' % skyline_app
skyline_app_logger = 'mirageLog'
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(sys.version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = settings.MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS
except:
MIRAGE_CHECK_REPETITIVE_DAILY_PEAKS = True
try:
VORTEX_ALGORITHMS = copy.deepcopy(settings.VORTEX_ALGORITHMS)
except Exception as outer_err:
logger.error('error :: mirage_vortex CRITICAL error failed to load VORTEX_ALGORITHMS from settings - %s' % outer_err)
sys.exit(1)
try:
MIRAGE_VORTEX_DEFAULT_ALGORITHMS = copy.deepcopy(settings.VORTEX_ALGORITHMS['default'])
except:
MIRAGE_VORTEX_DEFAULT_ALGORITHMS = {
'sigma': {'sigma': 3, 'consensus': settings.MIRAGE_CONSENSUS},
'spectral_residual': {},
'consensus': [['sigma', 'spectral_residual']],
}
try:
MIRAGE_VORTEX_DEFAULT_CONSENSUS = settings.VORTEX_ALGORITHMS['default']['consensus']
except:
MIRAGE_VORTEX_DEFAULT_CONSENSUS = [['sigma', 'spectral_residual']]
try:
VORTEX_TIMESERIES_JSON_TO_DISK = settings.VORTEX_TIMESERIES_JSON_TO_DISK
except:
VORTEX_TIMESERIES_JSON_TO_DISK = True
try:
VORTEX_SAVE_RESULTS_FOR = settings.VORTEX_SAVE_RESULTS_FOR
except:
VORTEX_SAVE_RESULTS_FOR = 86400
# Force downsampling which is requires to ensure speed and that features
# profiles are effective
try:
VORTEX_FULL_DURATION_RESOLUTIONS = settings.VORTEX_FULL_DURATION_RESOLUTIONS
except:
VORTEX_FULL_DURATION_RESOLUTIONS = {
86400: 60,
604800: 600,
}
try:
HORIZON_SHARDS = copy.deepcopy(settings.HORIZON_SHARDS)
except:
HORIZON_SHARDS = {}
try:
HORIZON_SHARD_DEBUG = settings.HORIZON_SHARD_DEBUG
except:
HORIZON_SHARD_DEBUG = True
number_of_horizon_shards = 0
this_host = str(os.uname()[1])
HORIZON_SHARD = 0
if HORIZON_SHARDS:
number_of_horizon_shards = len(HORIZON_SHARDS)
HORIZON_SHARD = HORIZON_SHARDS[this_host]
ALLOWED_ALGORITHMS = list(VORTEX_ALGORITHMS.keys())
skyline_app_graphite_namespace = 'skyline.mirage%s.vortex' % (SERVER_METRIC_PATH)
failed_checks_dir = '%s_vortex_failed' % settings.MIRAGE_CHECK_PATH
[docs]class MirageVortex(Thread):
"""
The MirageVortex thread
"""
def __init__(self, parent_pid):
"""
Initialize the MirageVortex
"""
super().__init__()
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = os.getpid()
self.mirage_vortex_exceptions_q = Queue()
self.mirage_vortex_anomaly_breakdown_q = Queue()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
os.kill(self.current_pid, 0)
os.kill(self.parent_pid, 0)
except:
logger.warning('warning :: parent or current process dead')
sys.exit(0)
[docs] def check_valid_algorithms(self, consensus):
"""
Verify that the algortihms are valid.
"""
unknown_algorithms = []
for item in consensus:
if isinstance(item, list):
for i_item in item:
if i_item not in ALLOWED_ALGORITHMS:
unknown_algorithms.append(i_item)
else:
if item not in ALLOWED_ALGORITHMS:
unknown_algorithms.append(item)
return unknown_algorithms
[docs] def check_consensus(self, consensus, triggered_algorithms, algorithms_run):
"""
Check is consensus is achieved and still possible
"""
consensus_impossible = False
for item in consensus:
consensus_reached = True
for algo in item:
if algo not in triggered_algorithms:
consensus_reached = False
if consensus_reached:
return item, consensus_impossible
consensus_possibles = []
for item in consensus:
consensus_possible = True
for algo in item:
if algo in algorithms_run:
if not algorithms_run[algo]:
consensus_possible = False
break
if consensus_possible:
consensus_possibles.append(item)
if len(consensus_possibles) == 0:
consensus_impossible = True
return [], consensus_impossible
[docs] def create_echo_timeseries(self, vortex_metric_data, timeseries):
"""
Add timeseries for echo
"""
metric_data_dir = None
use_base_name = vortex_metric_data['labelled_metric_name']
metric_timestamp = vortex_metric_data['metric_timestamp']
timeseries_dir = use_base_name.replace('.', '/')
metric_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, metric_timestamp,
timeseries_dir)
if not os.path.exists(metric_data_dir):
try:
mkdir_p(metric_data_dir)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to create dir - %s - %s' % (
metric_data_dir, err))
return False
echo_json = '%s/%s.mirage.redis.24h.json' % (metric_data_dir, use_base_name)
echo_timeseries = list(timeseries)
start_timestamp = metric_timestamp - 86400
echo_timeseries = [item for item in timeseries if item[0] > start_timestamp]
if vortex_metric_data['resolution'] < 60:
try:
echo_timeseries = downsample_timeseries('mirage', echo_timeseries, vortex_metric_data['resolution'], 60, 'mean', 'end')
except Exception as err:
logger.error('error :: mirage_vortex :: downsample_timeseries failed for echo data for request_id %s - %s' % (
vortex_metric_data['request_id'], err))
return False
# Convert the timeseries to json and save
try:
with open(echo_json, 'w') as f:
f.write(json.dumps(echo_timeseries))
except Exception as err:
logger.error('error :: mirage_vortex :: failed to dump echo_timeseries to echo_json %s for request_id %s - %s' % (
echo_json, vortex_metric_data['request_id'], err))
return False
logger.info('mirage_vortex :: dumped echo_timeseries to echo_json %s for request_id %s' % (
echo_json, vortex_metric_data['request_id']))
return echo_json
[docs] def add_training_data(self, request_id, vortex_metric_data, timeseries, ionosphere_enabled):
"""
Add training_data
"""
metric_data_dir = None
use_base_name = vortex_metric_data['internal_metric_name']
labelled_metric_name = vortex_metric_data['labelled_metric_name']
if labelled_metric_name:
use_base_name = str(labelled_metric_name)
metric_timestamp = vortex_metric_data['metric_timestamp']
timeseries_dir = use_base_name.replace('.', '/')
metric_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, metric_timestamp,
timeseries_dir)
anomaly_json = '%s/%s.json' % (metric_data_dir, use_base_name)
check_file = '%s/%s.txt' % (metric_data_dir, use_base_name)
logger.info('mirage_vortex :: saving data for request_id %s to %s' % (
request_id, metric_data_dir))
if not os.path.exists(metric_data_dir):
try:
mkdir_p(metric_data_dir)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to create dir - %s - %s' % (
metric_data_dir, err))
return False
# if not ionosphere_enabled:
# Add a metric vars file
anomaly_data = None
try:
value = vortex_metric_data['value']
from_timestamp = vortex_metric_data['from_timestamp']
algorithms = list(vortex_metric_data['algorithms'].keys())
triggered_algorithms = [i for i in list(vortex_metric_data['results']['triggered_algorithms'].keys()) if vortex_metric_data['results']['triggered_algorithms'][i]]
full_duration = vortex_metric_data['nearest_full_duration']
parent_id = 0
algorithms_run = vortex_metric_data['results']['algorithms_run']
anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'anomaly_dir = \'%s\'\n' \
'graphite_metric = False\n' \
'run_crucible_tests = False\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
'full_duration = \'%s\'\n' \
'ionosphere_parent_id = \'%s\'\n' \
'algorithms_run = %s\n' \
% (str(use_base_name), str(value), str(from_timestamp),
str(vortex_metric_data['metric_timestamp']),
str(algorithms), str(triggered_algorithms),
metric_data_dir, 'mirage_vortex',
str(int(time())), str(full_duration),
str(parent_id), str(algorithms_run))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to create anomaly_data - %s' % (
err))
return False
if anomaly_data:
try:
write_data_to_file('mirage', check_file, 'w', anomaly_data)
logger.info('mirage_vortex :: added check_file - %s' % check_file)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to add check_file %s - %s' % (
check_file, err))
return False
if VORTEX_TIMESERIES_JSON_TO_DISK and vortex_metric_data['downsampled']:
jsonfile = None
gzip_jsonfile = '%s/undownsampled.%s.json.gz' % (metric_data_dir, use_base_name)
try:
jsonfile = vortex_metric_data['timeseries']
except Exception as err:
logger.error('error :: mirage_vortex :: no timeseries (jsonfile) in vortex_metric_data - %s' % (
err))
if os.path.isfile(jsonfile):
try:
with open(jsonfile, 'rb') as f_in:
with gzip.open(gzip_jsonfile, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
logger.info('mirage_vortex :: added undownsampled timeseries - %s' % gzip_jsonfile)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to gzip jsonfile %s for request_id %s - %s' % (
jsonfile, request_id, err))
try:
if not isinstance(vortex_metric_data['timeseries'], str):
del vortex_metric_data['timeseries']
except:
pass
jsonfile = '%s/vortex.metric_data.%s.json' % (metric_data_dir, request_id)
gzip_jsonfile = '%s.gz' % jsonfile
try:
with open(jsonfile, 'w') as fw:
json.dump(vortex_metric_data, fw)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to save metric_data for request_id %s to %s - %s' % (
request_id, jsonfile, err))
return False
if os.path.isfile(jsonfile):
try:
with open(jsonfile, 'rb') as f_in:
with gzip.open(gzip_jsonfile, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to gzip jsonfile %s for request_id %s - %s' % (
jsonfile, request_id, err))
if os.path.isfile(gzip_jsonfile):
try:
os.remove(jsonfile)
except OSError:
logger.error(
'error :: listen :: mirage_vortex - failed to remove file - %s' % jsonfile)
# Convert the timeseries to json and save
try:
with open(anomaly_json, 'w') as f:
f.write(json.dumps(timeseries))
except Exception as err:
logger.error('error :: mirage_vortex :: failed to dump timeseries to anomaly_json %s for request_id %s - %s' % (
anomaly_json, request_id, err))
return False
redis_set = 'ionosphere.training_data'
data = [use_base_name, int(metric_timestamp), vortex_metric_data['nearest_full_duration']]
try:
logger.info('adding to Redis set %s - %s' % (
redis_set, str(data)))
self.redis_conn.sadd(redis_set, str(data))
except:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to add %s to %s Redis set' % (str(data), redis_set))
return metric_data_dir
[docs] def add_results(self, request_id, metric_data, analysis_start_time):
"""
Add result to the mirage.vortex Redis hash
"""
added_results = 0
request_id_elements = request_id.split('.')
request_id_timestamp = request_id_elements[0]
request_id_time = '.'.join(request_id_elements[0:2])
request_id_timestamp_aligned = int(request_id_timestamp) // 3600 * 3600
vortex_save_path = None
if VORTEX_SAVE_RESULTS_FOR:
save_path = '%s/%s' % (str(request_id_timestamp_aligned), str(request_id_time))
vortex_save_path = '%s/flux/vortex/results/%s/%s' % (
settings.SKYLINE_DIR, str(request_id_timestamp_aligned),
str(request_id_time))
if vortex_save_path:
logger.info('mirage_vortex :: saving data for request_id %s to %s' % (
request_id, vortex_save_path))
if not os.path.exists(vortex_save_path):
try:
mkdir_p(vortex_save_path)
except Exception as err:
logger.error('error :: flux :: failed to create dir - %s - %s' % (
vortex_save_path, err))
jsonfile = '%s/vortex.%s.json' % (vortex_save_path, request_id)
gzip_jsonfile = '%s.gz' % jsonfile
try:
with open(jsonfile, 'w') as fw:
json.dump(metric_data, fw)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to save metric_data for request_id %s to %s - %s' % (
request_id, jsonfile, err))
if os.path.isfile(jsonfile):
try:
with open(jsonfile, 'rb') as f_in:
with gzip.open(gzip_jsonfile, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to gzip jsonfile %s for request_id %s - %s' % (
jsonfile, request_id, err))
if os.path.isfile(gzip_jsonfile):
try:
os.remove(jsonfile)
except OSError:
logger.error('error :: mirage_vortex - failed to remove file - %s' % jsonfile)
try:
added_save = self.redis_conn_decoded.hset('mirage.vortex_saved', request_id, save_path)
if added_save:
logger.info('mirage_vortex :: added key for %s to mirage.vortex_saved' % (
request_id))
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hset key for %s in mirage.vortex_saved - %s' % (
request_id, err))
if VORTEX_TIMESERIES_JSON_TO_DISK:
jsonfile = None
try:
jsonfile = metric_data['timeseries']
except Exception as err:
logger.error('error :: mirage_vortex :: no timeseries in metric_data for %s' % (
request_id))
if os.path.isfile(jsonfile):
try:
os.remove(jsonfile)
except OSError:
logger.error('error :: mirage_vortex - failed to remove file - %s' % jsonfile)
try:
del metric_data['timeseries']
except:
pass
analysis_runtime = time() - analysis_start_time
metric_data['results']['total_analysis_runtime'] = analysis_runtime
# Remove results AFTER saving if return_results is not declared
for algorithm in list(metric_data['algorithms'].keys()):
return_results = False
try:
return_results = metric_data['algorithms'][algorithm]['algorithm_parameters']['return_results']
except:
pass
if return_results:
continue
try:
del metric_data['results']['algorithms'][algorithm]['anomalies']
except:
pass
try:
del metric_data['results']['algorithms'][algorithm]['scores']
except:
pass
if metric_data['source_app'] == 'flux':
try:
added_results = self.redis_conn_decoded.hset('mirage.vortex', request_id, str(metric_data))
if added_results:
logger.info('mirage_vortex :: added results for %s to mirage.vortex, analysis_runtime: %s' % (
request_id, str(analysis_runtime)))
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hest results for %s to mirage.vortex - %s' % (
request_id, err))
return added_results
[docs] def return_shard_test(self, request_id, metric_data, analysis_start_time):
"""
Return a shard test.
"""
logger.info('mirage_vortex :: SHARD_TEST returning results')
metric_data['anomalous'] = None
metric_data['results']['success'] = True
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
return added_results
[docs] def shard_host(self, metric, check_horizon_shards):
"""
Return the shard host to which a metric belongs.
"""
if not check_horizon_shards:
return HORIZON_SHARD
metric_as_bytes = str(metric).encode()
value = zlib.adler32(metric_as_bytes)
modulo_result = value % len(check_horizon_shards)
for shost in check_horizon_shards:
if modulo_result == check_horizon_shards[shost]:
return shost
[docs] def is_labelled_metric(self, metric):
"""
Check whether a metric is a labelled_metric
"""
labelled_metric = False
if '{' in metric:
if metric.endswith('}'):
labelled_metric = True
return labelled_metric
[docs] def generate_labelled_metric_name(self, metric, metric_namespace_prefix, server_id=None):
if server_id:
server_id_str = str(server_id)
else:
server_id_str = '1'
if metric_namespace_prefix:
skyline_labels = '{_tenant_id="%s",_server_id="%s",' % (str(metric_namespace_prefix), server_id_str)
labelled_metric_name = metric.replace('{', skyline_labels)
return labelled_metric_name
[docs] def get_metric_data_from_archive(self, request_id, metric, metric_timestamp):
metric_data = {}
try:
timeseries_dir = metric.replace('.', '/')
metric_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, metric_timestamp,
timeseries_dir)
metric_data_archive = '%s/vortex.metric_data.%s.json.gz' % (metric_data_dir, request_id)
with gzip.open(metric_data_archive, 'rb') as f:
file_content = f.read()
metric_data = json.loads(file_content)
except Exception as err:
logger.error('error :: mirage_vortex :: failed ungzip and load metric_data from %s - %s' % (
metric_data_archive, err))
return metric_data
[docs] def process_ionosphere_results(self, run_timestamp, ionosphere_results):
"""
Process results from Ionosphere.
"""
processed_results = []
all_ionosphere_results = {}
try:
all_ionosphere_results = self.redis_conn_decoded.hgetall('ionosphere.vortex_results')
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hgetall ionosphere.vortex_results - %s' % (
err))
logger.info('mirage_vortex :: ionosphere.vortex_results has %s items' % (
str(len(all_ionosphere_results))))
mirage_vortex_sent_to_ionosphere = {}
try:
mirage_vortex_sent_to_ionosphere = self.redis_conn_decoded.hgetall('mirage_vortex.sent_to_ionosphere')
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hgetall ionosphere.vortex_results - %s' % (
err))
for i_result in ionosphere_results:
results_dict = {}
try:
results_dict_str = all_ionosphere_results[i_result]
if results_dict_str:
results_dict = literal_eval(results_dict_str)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to literal_eval %s in all_ionosphere_results - %s' % (
str(i_result), err))
try:
self.redis_conn_decoded.hdel('ionosphere.vortex_results', i_result)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from ionosphere.vortex_results - %s' % (
str(i_result), err))
if not results_dict:
logger.error('error :: mirage_vortex :: not results_dict for %s' % (
str(i_result)))
continue
try:
metric = results_dict['metric']
anomalous = results_dict['anomalous']
metric_timestamp = results_dict['metric_timestamp']
matched = results_dict['matched']
except Exception as err:
logger.error('error :: mirage_vortex :: failed to interpolate data from %s results_dict - %s' % (
str(i_result), err))
continue
sent_dict = {}
for request_id in list(mirage_vortex_sent_to_ionosphere.keys()):
sent_dict_str = None
try:
sent_dict_str = mirage_vortex_sent_to_ionosphere[request_id]
if sent_dict_str:
i_sent_dict = literal_eval(sent_dict_str)
if metric == i_sent_dict['metric']:
if int(metric_timestamp) == int(i_sent_dict['timestamp']):
sent_dict = copy.deepcopy(i_sent_dict)
break
except Exception as err:
logger.error('error :: mirage_vortex :: error iterating %s from mirage_vortex_sent_to_ionosphere - %s' % (
str(i_result), err))
if not sent_dict:
logger.error('error :: mirage_vortex :: failed to find metric: %s and metric_timestamp: %s in any item in mirage_vortex.sent_to_ionosphere, cannot reconcile' % (
str(metric)))
continue
try:
request_id = sent_dict['request_id']
except Exception as err:
logger.error('error :: mirage_vortex :: failed to determine request_id for %s from sent_dict - %s' % (
str(i_result), err))
continue
metric_data = {}
try:
metric_data = self.get_metric_data_from_archive(request_id, metric, metric_timestamp)
except Exception as err:
logger.error('error :: mirage_vortex :: get_metric_data_from_archive failed for %s - %s' % (
request_id, err))
continue
added_results = False
if metric_data:
metric_data['matched'] = matched
metric_data['anomalous'] = anomalous
metric_data['results']['anomalous'] = anomalous
metric_data['results']['matched'] = matched
# Resave the metric_data_archive with the Ionosphere results
timeseries_dir = metric.replace('.', '/')
metric_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, metric_timestamp,
timeseries_dir)
jsonfile = '%s/vortex.metric_data.%s.json' % (metric_data_dir, request_id)
gzip_jsonfile = '%s.gz' % jsonfile
try:
with open(jsonfile, 'w') as fw:
json.dump(metric_data, fw)
except Exception as err:
logger.error('error :: mirage_vortex :: process_ionosphere_results - failed to save metric_data for request_id %s to %s - %s' % (
request_id, jsonfile, err))
if os.path.isfile(jsonfile):
try:
with open(jsonfile, 'rb') as f_in:
with gzip.open(gzip_jsonfile, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
except Exception as err:
logger.error('error :: mirage_vortex :: process_ionosphere_results - failed to gzip jsonfile %s for request_id %s - %s' % (
jsonfile, request_id, err))
if os.path.isfile(gzip_jsonfile):
try:
os.remove(jsonfile)
except OSError:
logger.error('error :: mirage_vortex - process_ionosphere_results - failed to remove file - %s' % jsonfile)
try:
analysis_start_time = metric_data['begin_analysis']
added_results = self.add_results(request_id, metric_data, analysis_start_time)
except Exception as err:
logger.error('error :: mirage_vortex :: add_results failed for request_id: %s - %s' % (
request_id, err))
if added_results:
logger.info('mirage_vortex :: results added with Ionosphere results of anomalous: %s, matched: %s and removed %s from mirage_vortex.sent_to_ionosphere' % (
str(anomalous), str(matched), request_id))
try:
self.redis_conn_decoded.hdel('mirage_vortex.sent_to_ionosphere', request_id)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from mirage_vortex.sent_to_ionosphere - %s' % (
request_id, err))
if anomalous:
logger.info('mirage_vortex :: anomaly detected - %s' % metric)
try:
value = metric_data['value']
triggered_algorithms = [i for i in list(metric_data['results']['triggered_algorithms'].keys()) if metric_data['results']['triggered_algorithms'][i]]
current_illuminance_dict = self.add_to_illuminance(run_timestamp, metric, metric_timestamp, value, triggered_algorithms)
except Exception as err:
logger.error('error :: mirage_vortex :: add_illuminance_entries failed - %s' % (
err))
if current_illuminance_dict:
logger.info('mirage_vortex :: added %s anomaly to illuminance' % metric)
logger.info('mirage_vortex :: adding %s to mirage_vortex.anomalous_metrics to be alerted on by mirage' % metric)
added_to_mirage_vortex_anomalous_metrics = False
try:
added_to_mirage_vortex_anomalous_metrics = self.add_to_mirage_vortex_anomalous_metrics(metric_data)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from mirage_vortex.sent_to_ionosphere - %s' % (
request_id, err))
logger.info('mirage_vortex :: added mirage_vortex.anomalous_metrics: %s' % str(added_to_mirage_vortex_anomalous_metrics))
processed_results.append(i_result)
return processed_results
[docs] def return_request_results(self, run_timestamp, return_results_for):
"""
Assign a metrics for a process to analyze.
"""
processed_results = []
for request_id in list(return_results_for.keys()):
sent_dict = {}
try:
sent_dict_str = return_results_for[request_id]
if sent_dict_str:
sent_dict = literal_eval(sent_dict_str)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to literal_eval %s in return_results_for - %s' % (
request_id, err))
try:
self.redis_conn_decoded.hdel('mirage_vortex.sent_to_ionosphere', request_id)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from imirage_vortex.sent_to_ionosphere - %s' % (
request_id, err))
if not sent_dict:
logger.error('error :: mirage_vortex :: no sent_dict for %s' % (
str(request_id)))
continue
try:
request_id = sent_dict['request_id']
metric = sent_dict['metric']
metric_timestamp = sent_dict['timestamp']
except Exception as err:
logger.error('error :: mirage_vortex :: failed to interpolate data from %s results_dict - %s' % (
str(request_id), err))
continue
metric_data = {}
try:
metric_data = self.get_metric_data_from_archive(request_id, metric, metric_timestamp)
except Exception as err:
logger.error('error :: mirage_vortex :: get_metric_data_from_archive failed for %s - %s' % (
request_id, err))
continue
anomalous = False
try:
anomalous = metric_data['results']['anomalous']
except KeyError:
anomalous = metric_data['anomalous']
except Exception as err:
logger.error('error :: mirage_vortex :: failed to determine if %s is anomalous from metric_data - %s' % (
str(metric), err))
added_results = False
metric_data['matched'] = None
metric_data['results']['matched'] = None
try:
analysis_start_time = metric_data['begin_analysis']
added_results = self.add_results(request_id, metric_data, analysis_start_time)
except Exception as err:
logger.error('error :: mirage_vortex :: add_results failed for request_id: %s - %s' % (
request_id, err))
if added_results:
logger.info('mirage_vortex :: results added with anomalous: %s, matched: None' % (
str(anomalous)))
if anomalous:
logger.info('mirage_vortex :: anomaly detected - %s' % metric)
if settings.PANORAMA_ENABLED:
send_to_panorama = True
if send_to_panorama:
try:
triggered_algorithms = []
value = metric_data['value']
from_timestamp = metric_data['from_timestamp']
algorithms_run = metric_data['results']['algorithms_run']
triggered_algorithms = [i for i in list(metric_data['results']['triggered_algorithms'].keys()) if metric_data['results']['triggered_algorithms'][i]]
sent_to_panorama = self.send_anomaly_to_panorama(
metric, value, from_timestamp,
metric_timestamp, algorithms_run, triggered_algorithms)
except Exception as err:
logger.error('error :: mirage_vortex :: send_anomaly_to_panorama failed - %s' % (
err))
if sent_to_panorama:
logger.info('mirage_vortex :: sent %s anomaly to panorama' % metric)
current_illuminance_dict = {}
try:
current_illuminance_dict = self.add_to_illuminance(run_timestamp, metric, metric_timestamp, value, triggered_algorithms)
except Exception as err:
logger.error('error :: mirage_vortex :: add_illuminance_entries failed - %s' % (
err))
if current_illuminance_dict:
logger.info('mirage_vortex :: added %s anomaly to illuminance' % metric)
logger.info('mirage_vortex :: adding %s to mirage_vortex.anomalous_metrics to be alerted on by mirage' % metric)
added_to_mirage_vortex_anomalous_metrics = False
try:
added_to_mirage_vortex_anomalous_metrics = self.add_to_mirage_vortex_anomalous_metrics(metric_data)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from mirage_vortex.sent_to_ionosphere - %s' % (
request_id, err))
logger.info('mirage_vortex :: added mirage_vortex.anomalous_metrics: %s' % str(added_to_mirage_vortex_anomalous_metrics))
return processed_results
[docs] def add_to_mirage_vortex_anomalous_metrics(self, metric_data):
added = False
try:
triggered_algorithms = [algo for algo in list(metric_data['results']['triggered_algorithms'].keys()) if metric_data['results']['triggered_algorithms'][algo]]
anomalous_metric = [
float(metric_data['value']), str(metric_data['internal_metric_name']),
metric_data['metric_timestamp'], metric_data['nearest_full_duration'],
triggered_algorithms, metric_data['results']['algorithms_run']]
redis_set = 'mirage_vortex.anomalous_metrics'
data = str(anomalous_metric)
try:
self.redis_conn_decoded.sadd(redis_set, data)
added = True
except Exception as err:
logger.error('error :: failed to add %s to mirage_vortex.anomalous_metrics Redis set - %s' % (
str(data), err))
except Exception as err:
logger.error('error :: failed to interpolate metric_data to add to mirage_vortex.anomalous_metrics - %s' % (
err))
return added
[docs] def add_to_illuminance(self, run_timestamp, labelled_metric_name, metric_timestamp, value, triggered_algorithms):
current_illuminance_dict = {}
illuminance_dict = {}
try:
illuminance_dict[labelled_metric_name] = {
'timestamp': int(metric_timestamp),
'value': float(value),
'triggered_algorithms_count': len(triggered_algorithms)}
logger.info('mirage_vortex :: calling add_illuminance_entries with %s entries to add' % (
str(len(illuminance_dict))))
try:
current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict)
except Exception as err:
logger.error('error :: mirage_vortex :: add_illuminance_entries failed - %s' % (
err))
logger.info('mirage_vortex :: illuminance Redis hash now has %s entries' % (
str(len(current_illuminance_dict))))
except Exception as err:
logger.error('error :: mirage_vortex :: add_to_illuminance failed - %s' % (
err))
return current_illuminance_dict
[docs] def send_anomaly_to_panorama(
self, labelled_metric_name, value, from_timestamp, metric_timestamp, algorithms_run,
triggered_algorithms):
sent_to_panorama = False
send_to_panorama = False
if settings.PANORAMA_ENABLED:
send_to_panorama = True
if send_to_panorama:
if not os.path.exists(settings.PANORAMA_CHECK_PATH):
mkdir_p(settings.PANORAMA_CHECK_PATH)
# Note:
# The values are enclosed is single quoted intentionally
# as the imp.load_source used results in a shift in the
# decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
source = 'vortex'
added_at = int(time())
panoroma_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'app = \'%s\'\n' \
'source = \'%s\'\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (labelled_metric_name, str(value), str(from_timestamp),
str(metric_timestamp), str(algorithms_run),
triggered_algorithms, 'vortex', source, this_host,
str(added_at))
# Create an anomaly file with details about the anomaly
sane_metricname = filesafe_metricname(str(labelled_metric_name))
panoroma_anomaly_file = '%s/%s.%s.txt' % (
settings.PANORAMA_CHECK_PATH, added_at, sane_metricname)
try:
write_data_to_file(
'mirage', panoroma_anomaly_file, 'w',
panoroma_anomaly_data)
logger.info('mirage_vortex :: added panorama anomaly file :: %s' % (panoroma_anomaly_file))
sent_to_panorama = True
except Exception as err:
logger.error('error :: mirage_vortex :: failed to add panorama anomaly file - %s - %s' % (
panoroma_anomaly_file, err))
redis_set = 'mirage.sent_to_panorama'
data = str(labelled_metric_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
return sent_to_panorama
[docs] def spin_process(self, i, run_timestamp, assigned_checks, ionosphere_results, return_results_for):
"""
Assign a metrics or results to process.
"""
if not assigned_checks and not ionosphere_results and not return_results_for:
logger.info('mirage_vortex :: no checks to assign or results to process, nothing to do')
return
process_start_timestamp = int(time())
now = int(process_start_timestamp)
if return_results_for:
logger.info('mirage_vortex :: returning %s results' % str(len(return_results_for)))
processed_results = []
try:
processed_results = self.return_request_results(run_timestamp, return_results_for)
except Exception as err:
logger.error('error :: mirage_vortex :: return_request_results failed - %s' % (
err))
logger.info('mirage_vortex :: returned %s results' % str(len(processed_results)))
return
if ionosphere_results:
logger.info('mirage_vortex :: %s results from ionosphere to process' % str(len(ionosphere_results)))
processed_results = []
try:
processed_results = self.process_ionosphere_results(run_timestamp, ionosphere_results)
except Exception as err:
logger.error('error :: mirage_vortex :: process_ionosphere_results failed - %s' % (
err))
logger.info('mirage_vortex :: processed and returned %s results from ionosphere' % str(len(processed_results)))
return
redis_metrics_processed_key = 'mirage_vortex.%s.metrics_processed' % str(i)
try:
exists = self.redis_conn_decoded.exists(redis_metrics_processed_key)
if exists:
last_redis_metrics_processed_key = 'mirage_vortex.%s.metrics_processed.last' % str(i)
self.redis_conn_decoded.rename(redis_metrics_processed_key, last_redis_metrics_processed_key)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to rename %s Redis hash - %s' % (
redis_metrics_processed_key, err))
checks_processed = 0
ionosphere_unique_metrics = []
for check_item in assigned_checks:
if int(time()) >= (process_start_timestamp + 50):
logger.info('mirage_vortex :: run time limit reached - stopping')
break
checks_processed += 1
analysis_start_time = time()
results = {
'success': False,
'algorithms': {}
}
check_str = None
try:
check_str = self.redis_conn_decoded.hget('flux.vortex', check_item)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hget %s from flux.vortex - %s' % (
check_item, err))
if check_str:
try:
self.redis_conn_decoded.hdel('flux.vortex', check_item)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from flux.vortex - %s' % (
check_item, err))
metric_data = {}
if check_str:
try:
metric_data = literal_eval(check_str)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to literal_eval data for %s - %s' % (
check_item, err))
if not metric_data:
logger.error('error :: mirage_vortex :: no metric_data for %s' % (
check_item))
continue
request_id = check_item
try:
del metric_data['key']
except:
pass
# The source_app can be flux, mirage or luminosity
try:
source_app = metric_data['source_app']
except:
source_app = 'flux'
metric_data['begin_analysis'] = time()
metric_data['status_code'] = 200
logger.info('mirage_vortex :: processing request_id: %s' % str(request_id))
consensus = list(MIRAGE_VORTEX_DEFAULT_CONSENSUS)
algorithms = {'default': {}}
try:
algorithms = metric_data['algorithms']
except:
algorithms = {'default': {}}
if len(list(algorithms.keys())) == 1:
if list(algorithms.keys())[0] == 'default':
logger.info('mirage_vortex :: using default algorithms for request_id: %s' % str(request_id))
algorithms = copy.deepcopy(MIRAGE_VORTEX_DEFAULT_ALGORITHMS)
algorithms_to_be_run = list(algorithms.keys())
if 'test_anomaly' in list(metric_data.keys()):
if metric_data['test_anomaly']:
logger.info('mirage_vortex :: test_anomaly request for request_id: %s' % str(request_id))
metric_data['anomalous'] = True
results = {
'success': True,
'anomalous': True,
'additional info': 'TEST ANOMALY ONLY - no analysis done',
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = True
metric_data['results'] = results
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed test_anomaly request_id: %s' % str(request_id))
continue
try:
consensus = list(metric_data['consensus'])
except:
consensus = list(MIRAGE_VORTEX_DEFAULT_CONSENSUS)
for item in consensus:
if isinstance(item, str):
consensus.remove(item)
consensus.append([item])
# @added 20230616 - Feature #4952: vortex - consensus_count
try:
consensus_count = int(metric_data['consensus_count'])
except:
consensus_count = 0
triggered_algorithms = []
unknown_algorithms = []
try:
unknown_algorithms = self.check_valid_algorithms(consensus)
except:
unknown_algorithms = []
if unknown_algorithms:
metric_data['anomalous'] = None
additional_info = 'unknown algorithm values passed - %s' % str(unknown_algorithms)
logger.warning('warning :: mirage_vortex :: not processing request_id: %s, %s' % (
str(request_id), additional_info))
results = {
'success': False,
'anomalous': None,
'additional info': additional_info,
'algorithms': {}
}
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
unknown_algorithms = []
try:
unknown_algorithms = self.check_valid_algorithms(algorithms_to_be_run)
except:
unknown_algorithms = []
if unknown_algorithms:
metric_data['anomalous'] = None
additional_info = 'unknown algorithm values passed - %s' % str(unknown_algorithms)
logger.warning('warning :: mirage_vortex :: not processing request_id: %s, %s' % (
str(request_id), additional_info))
results = {
'success': False,
'anomalous': None,
'additional info': additional_info,
'algorithms': {}
}
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
# Check that the algorithms_to_be_run are in consensus
in_consensus = False
for consensus_item in consensus:
for item in consensus_item:
if item in algorithms_to_be_run:
in_consensus = True
if not in_consensus:
metric_data['anomalous'] = None
additional_info = 'input error - algorithms passed do not match any algorithms in consensus no consensus could ever be achieved'
logger.warning('warning :: mirage_vortex :: not processing request_id: %s, %s' % (
str(request_id), additional_info))
results = {
'success': False,
'anomalous': None,
'additional info': additional_info,
'algorithms': {}
}
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
try:
metric = metric_data['metric']
except Exception as err:
logger.error('error :: mirage_vortex :: no metric in metric_data for %s' % (
check_item))
metric_data['anomalous'] = False
additional_info = 'no metric name passed'
results = {
'success': False,
'anomalous': False,
'additional info': additional_info,
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = None
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
timeseries = []
if VORTEX_TIMESERIES_JSON_TO_DISK:
jsonfile = None
try:
jsonfile = metric_data['timeseries']
except Exception as err:
logger.error('error :: mirage_vortex :: no timeseries in metric_data for %s' % (
check_item))
file_metric_data = {}
try:
with open(jsonfile) as fh:
file_metric_data = json.load(fh)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to load file_metric_data from %s for %s - %s' % (
jsonfile, request_id, err))
try:
timeseries = file_metric_data['timeseries']
logger.info('mirage_vortex :: loaded timeseries from %s' % jsonfile)
except Exception as err:
logger.error('error :: mirage_vortex :: no timeseries in file_metric_data for %s- %s' % (
check_item, err))
else:
try:
timeseries = metric_data['timeseries']
except Exception as err:
logger.error('error :: mirage_vortex :: no timeseries in metric_data for %s' % (
check_item))
if isinstance(timeseries, dict):
new_timeseries = []
for ts in list(timeseries.keys()):
value = None
try:
value = float(timeseries[ts])
except:
continue
if isinstance(value, float):
try:
new_timeseries.append([int(ts), timeseries[ts]])
except:
continue
timeseries = list(new_timeseries)
del new_timeseries
if not timeseries:
logger.warning('warning :: mirage_vortex :: no timeseries in metric_data for %s' % (
check_item))
metric_data['anomalous'] = False
additional_info = 'no timeseries data found'
results = {
'success': False,
'anomalous': False,
'additional info': additional_info,
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = None
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
shard_test = False
try:
shard_test = metric_data['shard_test']
except:
shard_test = False
if shard_test:
try:
TEST_HORIZON_SHARDS = {
this_host: 0,
'another-test-node-1': 1,
'another-test-node-2': 2,
}
shost = self.shard_host(metric, TEST_HORIZON_SHARDS)
metric_data['results'] = {}
metric_data['results']['shard_test'] = shard_test
metric_data['results']['shard_host'] = shost
metric_data['results']['processing_shard'] = TEST_HORIZON_SHARDS[shost]
shard_test_done = self.return_shard_test(request_id, metric_data, analysis_start_time)
if shard_test_done:
logger.info('mirage_vortex :: SHARD_TEST processed request_id: %s' % str(request_id))
continue
except Exception as err:
logger.error('error :: mirage_vortex :: shard_test failed - %s' % (
err))
metric_timestamp = None
try:
metric_timestamp = int(timeseries[-1][0])
except Exception as err:
logger.error('error :: mirage_vortex :: failed to determine last timestamp for %s' % (
request_id))
if not metric_timestamp:
logger.error('error :: mirage_vortex :: failed no metric_timestamp for %s' % request_id)
metric_data['anomalous'] = False
additional_info = 'no valid timeseries data found'
results = {
'success': False,
'anomalous': False,
'additional info': additional_info,
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = None
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
is_unix_timestamp = False
if isinstance(metric_timestamp, int):
is_unix_timestamp = True
if isinstance(metric_timestamp, float):
is_unix_timestamp = True
if not is_unix_timestamp:
logger.error('error :: mirage_vortex :: not unix timestamps for %s' % request_id)
metric_data['anomalous'] = False
additional_info = 'date format incorrect, unix timestamps are required'
results = {
'success': False,
'anomalous': False,
'additional info': additional_info,
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = None
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
self.check_if_parent_is_alive()
# Transform the timeseries data to ensure all non values are removed
# and that timestamps are coerced into int and values into floats
use_timeseries = []
coerce_errors = []
for index, item in enumerate(timeseries):
try:
use_timeseries.append([int(item[0]), float(item[1])])
except Exception as err:
coerce_errors.append([index, err])
if coerce_errors:
logger.info('mirage_vortex :: request_id: %s encountered %s coerce_errors, last err: %s' % (
str(request_id), str(len(coerce_errors)), str(coerce_errors[-1])))
if not use_timeseries:
logger.error('error :: mirage_vortex :: no use_timeseries for request_id: %s' % request_id)
metric_data['anomalous'] = None
additional_info = 'no data to analyse after preprocessing, invalid timestamps and values - hint: %s' % str(coerce_errors[-1])
results = {
'success': False,
'anomalous': False,
'additional info': additional_info,
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = None
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
try:
return_image_urls = metric_data['return_image_urls']
except:
return_image_urls = False
try:
trigger_anomaly = metric_data['trigger_anomaly']
except:
trigger_anomaly = False
try:
algorithms_test_only = metric_data['algorithms_test_only']
except:
algorithms_test_only = False
full_duration = int(use_timeseries[-1][0] - use_timeseries[0][0])
metric_data['full_duration'] = full_duration
hours_to_resolve = int(full_duration / 3600)
nearest_full_duration = 86400 * round(full_duration / 86400)
metric_data['nearest_full_duration'] = nearest_full_duration
timeseries_length = len(timeseries)
metric_data['timeseries_length'] = timeseries_length
from_timestamp = int(use_timeseries[0][0])
metric_data['from_timestamp'] = from_timestamp
value = use_timeseries[-1][1]
metric_data['value'] = value
resolution = determine_data_frequency('mirage', timeseries, False)
metric_data['resolution'] = resolution
# Set whether the timeseries can be trained on. It is not valid for
# training if it has negative values or large gaps in the data.
trainable = True
# Determine whether the appropriate analysis period and whether the
# timeseries needs to be downsampled
downsample = False
downsample_resolution = int(resolution)
analysis_period = int(full_duration)
# Analyse the data at 1 or 7 days
if full_duration < ((86400 * 7) - 7200):
analysis_period = 86400
if full_duration > (86400 * 7):
analysis_period = 86400 * 7
# @added 20230129
try:
override_7_day_limit = metric_data['override_7_day_limit']
except:
override_7_day_limit = False
if override_7_day_limit:
analysis_period = timeseries[-1][0] - timeseries[0][0]
metric_data['analysis_period'] = analysis_period
# Remove any data before the start of the analysis period
analysis_start_timestamp = metric_timestamp - analysis_period
metric_data['analysis_period_start_timestamp'] = analysis_start_timestamp
if timeseries[0][0] < analysis_start_timestamp:
use_timeseries = [item for item in timeseries if item[0] >= analysis_start_timestamp]
# Determine the downsample resolution
if resolution < 60:
downsample = True
downsample_resolution = 60
if resolution < 600 and analysis_period > 86400:
downsample = True
downsample_resolution = 600
downsampled_timeseries = None
downsampled = False
no_downsample = False
try:
no_downsample = metric_data['no_downsample']
except:
no_downsample = False
if no_downsample and downsample:
logger.info('mirage_vortex :: no_downsample passed for request_id: %s NOT downsampling timeseries from %s data points at %s to %s at %s' % (
str(request_id), str(timeseries_length), str(resolution),
str(int(full_duration / downsample_resolution)),
str(downsample_resolution)))
downsample = False
# Not trainable if timeseries should be downsample and is not
# trainable = False
check_all_consensuses = False
try:
check_all_consensuses = metric_data['check_all_consensuses']
except:
check_all_consensuses = False
# @added 20230616 - Feature #4952: vortex - consensus_count
if consensus_count:
check_all_consensuses = True
consensus_count_results = {}
consensus_count_results['algorithms'] = {}
downsample_method = 'mean'
is_strictly_increasing_monotonicity = False
try:
is_strictly_increasing_monotonicity = strictly_increasing_monotonicity(use_timeseries)
except Exception as err:
logger.error('error :: mirage_vortex :: is_strictly_increasing_monotonicity failed for request_id: %s - %s' % (
request_id, err))
metric_data['monotonic'] = is_strictly_increasing_monotonicity
if is_strictly_increasing_monotonicity:
downsample_method = 'sum'
if downsample:
logger.info('mirage_vortex :: downsampling request_id: %s timeseries from %s data points at %s to %s at %s using the %s' % (
str(request_id), str(timeseries_length), str(resolution),
str(int(full_duration / downsample_resolution)),
str(downsample_resolution), downsample_method))
try:
downsampled_timeseries = downsample_timeseries('mirage', use_timeseries, resolution, 600, downsample_method, 'end')
except Exception as err:
logger.error('error :: mirage_vortex :: downsample_timeseries failed for request_id %s - %s' % (
request_id, err))
if downsampled_timeseries:
# Coerce timestamps to ints
downsampled_timeseries = [[int(t), v] for t, v in downsampled_timeseries]
downsampled = True
metric_data['downsampled_resolution'] = downsample_resolution
metric_data['downsampled_length'] = len(downsampled_timeseries)
use_timeseries = downsampled_timeseries
metric_data['downsample_resolution'] = downsample_resolution
metric_data['downsampled'] = downsampled
expected_timeseries_length = (86400 * 7) / 600
if analysis_period == 86400:
expected_timeseries_length = 86400 / 60
use_timeseries_length = len(use_timeseries)
metric_data['use_timeseries_length'] = use_timeseries_length
if is_strictly_increasing_monotonicity:
# Calculate the derivate AFTER downsampling
try:
use_timeseries = nonNegativeDerivative(use_timeseries)
except Exception as err:
logger.error('error :: mirage_vortex :: nonNegativeDerivative failed for request_id: %s - %s' % (
request_id, err))
metric_data_no_timeseries = copy.deepcopy(metric_data)
try:
del metric_data_no_timeseries['timeseries']
except:
pass
logger.info('mirage_vortex :: request_id: %s metric_data (excl timeseries): %s' % (
str(request_id), str(metric_data_no_timeseries)))
del metric_data_no_timeseries
# Return if there is insufficient data
if int(100 * (len(use_timeseries) / expected_timeseries_length)) < 90:
logger.info('mirage_vortex :: after preprocessing there is insufficient data for analysis - request_id: %s, len(use_timeseries): %s, expected length: %s' % (
request_id, str(len(use_timeseries)), str(expected_timeseries_length)))
metric_data['anomalous'] = False
additional_info = 'after preprocessing there insufficient data for analysis'
results = {
'success': False,
'anomalous': False,
'additional info': additional_info,
'algorithms': {}
}
for algorithm in algorithms:
results['algorithms'][algorithm] = None
metric_data['results'] = results
metric_data['error'] = additional_info
metric_data['success'] = False
metric_data['status_code'] = 400
added_results = self.add_results(request_id, metric_data, analysis_start_time)
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
continue
# Determine if there are any negatives values
negative_values = [v for t, v in use_timeseries if v < 0]
if negative_values:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
trainable = False
run_negatives_present = False
anomalous = None
triggered_algorithms = []
algorithms_run = {}
metric_data['results'] = {}
metric_data['results']['success'] = False
metric_data['results']['anomalous'] = None
metric_data['results']['algorithms'] = {}
metric_data['results']['triggered_algorithms'] = {}
metric_data['metric_timestamp'] = int(timeseries[-1][0])
added_results = None
sent_to_ionosphere = False
last_algorithm = algorithms_to_be_run[-1]
# for algorithm in algorithms:
for algorithm in algorithms_to_be_run:
try:
if added_results or sent_to_ionosphere:
break
custom_algorithm = algorithm
if algorithm == 'prophet':
custom_algorithm = 'skyline_prophet'
if algorithm == 'matrixprofile':
custom_algorithm = 'skyline_matrixprofile'
if algorithm == 'sigma':
logger.info('mirage_vortex :: running sigma for request_id: %s' % str(request_id))
algorithm_parameters = {}
try:
algorithm_parameters = VORTEX_ALGORITHMS[algorithm]['algorithm_parameters']
except:
algorithm_parameters = {}
return_results = False
anomalous = None
anomalyScore = None
custom_algorithm = algorithm
try:
if 'algorithm_parameters' in list(algorithms['sigma'].keys()):
algorithm_parameters = copy.deepcopy(algorithms['sigma']['algorithm_parameters'])
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: error determining algorithm_parameters - %s' % err)
try:
return_results = algorithm_parameters['return_results']
except:
return_results = False
try:
return_anomalies_only = algorithm_parameters['return_anomalies_only']
except:
return_anomalies_only = False
try:
anomaly_window = algorithm_parameters['anomaly_window']
except:
anomaly_window = 1
algorithm_parameters['anomaly_window'] = anomaly_window
algorithm_parameters['base_name'] = metric
algorithm_parameters['debug_print'] = False
algorithm_parameters['debug_logging'] = True
algorithm_parameters['return_results'] = True
algorithm_parameters['return_anomalies'] = True
custom_algorithms_to_run = {
'sigma': {
'algorithm_source': '/opt/skyline/github/skyline/skyline/custom_algorithms/sigma.py',
'max_execution_time': 30.0,
'algorithm_parameters': algorithm_parameters,
},
}
sigma_analysis_start_time = time()
sigma_anomalous = anomalyScore = None
success = False
anomalies = []
try:
use_debug_logging = False
sigma_anomalous, anomalyScore, anomalies = run_custom_algorithm_on_timeseries('mirage', os.getpid(), metric, use_timeseries, custom_algorithm, custom_algorithms_to_run[custom_algorithm], use_debug_logging)
# Try direct
# sigma_anomalous, anomalies = run_sigma_algorithms('mirage', timeseries, algorithm_parameters['sigma_value'], algorithm_parameters['consensus'], anomaly_window)
if sigma_anomalous:
anomalyScore = 1.0
else:
anomalyScore = 0.0
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: unhandled error - %s' % err)
algorithms_run['sigma'] = sigma_anomalous
if sigma_anomalous:
triggered_algorithms.append('sigma')
sigma_completed_at = time()
sigma_run_time = sigma_completed_at - sigma_analysis_start_time
logger.info('mirage_vortex :: sigma analysis on %s completed in %.2f seconds with anomalous: %s' % (
request_id, sigma_run_time, str(sigma_anomalous)))
anomalies_in_window = len(anomalies)
if sigma_anomalous is not None:
success = True
metric_data['results']['algorithms']['sigma'] = {
'success': success,
'anomalous': sigma_anomalous,
'anomalyScore': anomalyScore,
'anomalies_in_window': anomalies_in_window,
'analysis_runtime': sigma_run_time,
}
if return_results:
metric_data['results']['algorithms']['sigma']['anomalies'] = anomalies
if not success:
metric_data['results']['algorithms']['sigma']['error'] = 'an error occurred during sigma analysis'
# @added 20230616 - Feature #4952: vortex - consensus_count
if consensus_count:
consensus_count_results['algorithms']['sigma'] = {}
consensus_count_results['algorithms']['sigma']['anomalies'] = anomalies
anomalyScore_list = []
anomalies_timestamps = [int(ts) for ts in list(anomalies.keys())]
for ts, v in timeseries:
score = 0
if int(ts) in anomalies_timestamps:
score = 1
anomalyScore_list.append(score)
consensus_count_results['algorithms']['sigma']['scores'] = anomalyScore_list
else:
try:
if algorithm == 'prophet':
custom_algorithm = 'skyline_prophet'
if algorithm == 'matrixprofile':
custom_algorithm = 'skyline_matrixprofile'
logger.info('mirage_vortex :: running %s for request_id: %s' % (
custom_algorithm, str(request_id)))
algorithm_parameters = {}
try:
algorithm_parameters = VORTEX_ALGORITHMS[algorithm]['algorithm_parameters']
except:
algorithm_parameters = {}
outlier_value = VORTEX_ALGORITHMS[algorithm]['outlier_value']
return_results = False
anomalous = None
anomalyScore = None
custom_algorithm = algorithm
if algorithm == 'prophet':
custom_algorithm = 'skyline_prophet'
if algorithm == 'matrixprofile':
custom_algorithm = 'skyline_matrixprofile'
try:
if 'algorithm_parameters' in list(algorithms[algorithm].keys()):
algorithm_parameters = copy.deepcopy(algorithms[algorithm]['algorithm_parameters'])
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: error determining algorithm_parameters - %s' % err)
try:
return_results = algorithm_parameters['return_results']
if 'return_results' in metric_data:
return_results = metric_data['return_results']
except:
return_results = False
try:
return_anomalies_only = algorithm_parameters['return_anomalies_only']
except:
return_anomalies_only = False
try:
anomaly_window = algorithm_parameters['anomaly_window']
except:
anomaly_window = 1
if algorithm == 'pca':
try:
outlier_value = algorithm_parameters['threshold']
except:
outlier_value = VORTEX_ALGORITHMS[algorithm]['outlier_value']
algorithm_parameters['anomaly_window'] = anomaly_window
algorithm_parameters['base_name'] = metric
algorithm_parameters['debug_print'] = False
algorithm_parameters['debug_logging'] = True
algorithm_parameters['return_results'] = True
algorithm_parameters['return_anomalies'] = True
if algorithm == 'prophet':
custom_algorithm = 'skyline_prophet'
if algorithm == 'matrixprofile':
custom_algorithm = 'skyline_matrixprofile'
algorithm_source = '/opt/skyline/github/skyline/skyline/custom_algorithms/%s.py' % custom_algorithm
custom_algorithms_to_run = {
custom_algorithm: {
'algorithm_source': algorithm_source,
'max_execution_time': 30.0,
'algorithm_parameters': algorithm_parameters,
},
}
if algorithm == 'mstl':
try:
custom_algorithms_to_run[custom_algorithm]['max_execution_time'] = algorithm_parameters['max_execution_time']
except:
custom_algorithms_to_run[custom_algorithm]['max_execution_time'] = 180
ca_analysis_start_time = time()
anomalous = anomalyScore = None
success = False
results = {'anomalies': {}, 'anomalyScore_list': [], 'scores': []}
anomalyScore_list = []
try:
use_debug_logging = False
anomalous, anomalyScore, results = run_custom_algorithm_on_timeseries('mirage', os.getpid(), metric, use_timeseries, custom_algorithm, custom_algorithms_to_run[custom_algorithm], use_debug_logging)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: %s unhandled error - %s' % (custom_algorithm, err))
anomalous = anomalyScore = None
success = False
results = {'anomalies': {}, 'anomalyScore_list': [], 'scores': []}
algorithms_run[algorithm] = anomalous
if anomalous:
triggered_algorithms.append(algorithm)
ca_run_time = time() - ca_analysis_start_time
logger.info('mirage_vortex :: %s analysis on %s completed in %.2f seconds with anomalous: %s' % (
custom_algorithm, request_id, ca_run_time, str(anomalous)))
try:
anomalyScore_list = results['anomalyScore_list']
except:
try:
anomalyScore_list = results['scores']
except:
pass
total_anomalies = len([x for x in anomalyScore_list if x == 1])
anomalies_in_window = len([x for x in anomalyScore_list[-anomaly_window:] if x == 1])
if anomalous is not None:
success = True
unreliable = False
if algorithm == 'dbscan':
if anomalous is not None:
success = True
else:
try:
if 'unreliable' in results['error']:
success = True
unreliable = True
except:
pass
metric_data['results']['algorithms'][algorithm] = {
'success': success,
'anomalous': anomalous,
'anomalyScore': anomalyScore,
'anomalies_in_window': anomalies_in_window,
'total_anomalies': total_anomalies,
'analysis_runtime': ca_run_time,
'unreliable': unreliable,
}
if return_results and not return_anomalies_only:
metric_data['results']['algorithms'][algorithm]['anomalies'] = results['anomalies']
metric_data['results']['algorithms'][algorithm]['scores'] = results['scores']
if return_anomalies_only:
anomalies = {}
for index, item in enumerate(timeseries):
try:
if results['scores'][index]:
anomalies[item[0]] = {'value': item[1], 'index': index, 'score': results['scores'][index]}
except:
pass
metric_data['results']['algorithms'][algorithm]['anomalies'] = anomalies
if not success:
metric_data['results']['algorithms'][algorithm]['error'] = 'an error occurred during %s analysis' % algorithm
if algorithm == 'dbscan':
if unreliable:
metric_data['results']['algorithms'][algorithm]['error'] = results['error']
metric_data['results']['algorithms'][algorithm]['unreliable'] = True
if algorithm == 'spectral_entropy':
metric_data['results']['algorithms'][algorithm]['low_entropy_value'] = results['low_entropy_value']
# @added 20230616 - Feature #4952: vortex - consensus_count
if consensus_count:
consensus_count_results['algorithms'][algorithm] = {}
consensus_count_results['algorithms'][algorithm]['anomalies'] = results['anomalies']
consensus_count_results['algorithms'][algorithm]['scores'] = anomalyScore_list
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: %s unhandled error - %s' % (custom_algorithm, err))
# Determine consensus status
try:
consensus_reached, consensus_impossible = self.check_consensus(consensus, triggered_algorithms, algorithms_run)
except Exception as err:
logger.error('error :: mirage_vortex :: check_consensus failed for request_id: %s - %s' % (
request_id, err))
if check_all_consensuses:
if algorithm != last_algorithm:
consensus_reached = []
consensus_impossible = False
if consensus_reached or consensus_impossible:
if consensus_reached:
anomalous = True
metric_data['results']['consensus_achieved'] = True
logger.info('mirage_vortex :: consensus reached with %s for request_id: %s' % (
str(consensus_reached), str(request_id)))
if consensus_impossible:
anomalous = False
metric_data['results']['consensus_achieved'] = False
logger.info('mirage_vortex :: consensus cannot be reached for request_id: %s' % (
str(request_id)))
save_training_data = False
try:
metric_data['results']['consensus_reached'] = consensus_reached
metric_data['results']['success'] = True
metric_data['results']['anomalous'] = anomalous
metric_data['results']['algorithms_run'] = list(algorithms_run.keys())
for i_algorithm in algorithms:
triggered = i_algorithm in triggered_algorithms
if i_algorithm not in algorithms_run:
triggered = None
if triggered is not None:
metric_data['results']['triggered_algorithms'][i_algorithm] = triggered
if i_algorithm == 'dbscan':
try:
if anomalous is None:
metric_data['results']['triggered_algorithms'][i_algorithm] = None
except:
pass
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: check_consensus failed for request_id: %s - %s' % (
request_id, err))
consensus_anomalies = {}
consensus_scores = []
if return_results and not return_anomalies_only:
anomalies = {}
algorithms_with_results = list(metric_data['results']['algorithms'].keys())
try:
for index, item in enumerate(timeseries):
ts = int(item[0])
value = item[1]
score = 0
consensus_anomalies[ts] = {'value': value, 'triggered': [], 'score': 0}
for i_algorithm in algorithms_with_results:
algo_anomaly = None
try:
algo_anomaly = metric_data['results']['algorithms'][i_algorithm]['anomalies'][ts]
except:
pass
if algo_anomaly:
consensus_anomalies[ts]['triggered'].append(i_algorithm)
if consensus_anomalies[ts]['triggered']:
try:
i_consensus_reached, i_consensus_impossible = self.check_consensus(consensus, consensus_anomalies[ts]['triggered'], [])
except Exception as err:
logger.error('error :: mirage_vortex :: check_consensus failed for request_id: %s - %s' % (
request_id, err))
if i_consensus_reached:
score = len(i_consensus_reached)
consensus_anomalies[ts]['score'] = score
anomalies[str(ts)] = {'value': value, 'index': index, 'score': score, 'triggered': list(consensus_anomalies[ts]['triggered'])}
consensus_scores.append(score)
except Exception as err:
logger.error('error :: mirage_vortex :: failed building consensus_anomalies for request_id: %s - %s' % (
request_id, err))
if consensus_anomalies:
metric_data['results']['consensus_results'] = {}
metric_data['results']['consensus_results']['anomalous'] = True
metric_data['results']['consensus_results']['anomaly_window'] = anomaly_window
anomalies_in_window = len([x for x in consensus_scores[-anomaly_window:] if x >= 1])
metric_data['results']['consensus_results']['anomalies_in_window'] = anomalies_in_window
metric_data['results']['consensus_results']['unreliable'] = False
metric_data['results']['consensus_results']['anomalies'] = copy.deepcopy(anomalies)
metric_data['results']['consensus_results']['scores'] = list(consensus_scores)
# metric_data['results']['consensus_results']['consensus_anomalies'] = copy.deepcopy(consensus_anomalies)
ionosphere_enabled = False
labelled_metric_name = str(metric)
metric_data['labelled_metric_name'] = labelled_metric_name
save_training_data = False
if 'save_training_data_on_false' in list(metric_data.keys()):
if metric_data['save_training_data_on_false']:
logger.info('mirage_vortex :: save_training_data_on_false pased for request_id: %s, saving' % (
str(request_id)))
save_training_data = True
if trigger_anomaly:
logger.info('mirage_vortex :: trigger_anomaly was passed for request_id: %s, setting to anomalous' % (
str(request_id)))
save_training_data = True
anomalous = True
metric_data['results']['anomalous'] = anomalous
metric_data['anomalous'] = anomalous
if algorithms_test_only:
logger.info('mirage_vortex :: algorithms_test_only was passed for request_id: %s, saving training_data' % (
str(request_id)))
save_training_data = True
if anomalous or save_training_data:
save_training_data = True
metric_namespace_prefix = metric_data['metric_namespace_prefix']
skyline_metric = None
labelled_metric = False
prefixed_metric = '%s' % str(metric)
possible_metric_names = [str(metric)]
try:
labelled_metric = self.is_labelled_metric(metric)
except:
labelled_metric = False
if labelled_metric:
if '_tenant_id="' not in metric:
try:
server_id = metric_data['server_id']
except:
server_id = 1
try:
labelled_metric_name = self.generate_labelled_metric_name(metric, metric_namespace_prefix, server_id=server_id)
except Exception as err:
logger.error('error :: mirage_vortex :: generate_labelled_metric_name failed for request_id: %s metric: %s - %s' % (
request_id, str(metric), err))
possible_metric_names.append(labelled_metric_name)
if not labelled_metric:
if metric_namespace_prefix:
if not metric.startswith(metric_namespace_prefix):
prefixed_metric = '%s.%s' % (metric_namespace_prefix, metric)
possible_metric_names.append(prefixed_metric)
metric_id = 0
for possible_metric_name in possible_metric_names:
try:
metric_id = get_metric_id_from_base_name('mirage', possible_metric_name)
except:
metric_id = 0
if metric_id:
skyline_metric = possible_metric_name
break
if not metric_id:
if labelled_metric:
skyline_metric = labelled_metric_name
labelled_metric_name = 'labelled_metrics.%s' % str(metric_id)
else:
skyline_metric = prefixed_metric
labelled_metric_name = None
logger.info('mirage_vortex :: inserting new metric into DB for request_id: %s' % str(request_id))
try:
metric_id = insert_new_metric('mirage', skyline_metric)
if metric_id:
logger.info('mirage_vortex :: inserted new metric with id %s for request_id: %s, metric: %s' % (
str(metric_id), str(request_id), str(metric)))
except Exception as err:
logger.error('error :: mirage_vortex :: insert_new_metric failed for request_id: %s metric: %s - %s' % (
request_id, str(skyline_metric), err))
if metric_id and skyline_metric:
if labelled_metric:
labelled_metric_name = 'labelled_metrics.%s' % str(metric_id)
metric_data['internal_metric_name'] = skyline_metric
metric_data['labelled_metric_name'] = labelled_metric_name
metric_data['metric_id'] = metric_id
try:
ionosphere_enabled = get_ionosphere_fp_ids_for_full_duration('mirage', metric_id, full_duration=nearest_full_duration, enabled=True)
except Exception as err:
logger.error('error :: mirage_vortex :: get_ionosphere_fp_ids_for_full_duration failed for metric_id: %s request_id: %s - %s' % (
str(metric_id), request_id, err))
save_training_data = True
logger.info('mirage_vortex :: adding results for for request_id: %s and saving training_data' % str(request_id))
else:
logger.error('error :: mirage_vortex :: failed to determine a metric_id or skyline_metric for request_id: %s metric: %s, training_data will not be saved' % (
request_id, str(metric)))
saved_training_data = False
if save_training_data:
if not trainable:
logger.info('mirage_vortex :: not saving training_data for request_id: %s, trainable: %s' % (
str(request_id), str(trainable)))
save_training_data = False
send_to_ionosphere = False
if 'send_to_ionosphere' in list(metric_data.keys()):
send_to_ionosphere = metric_data['send_to_ionosphere']
if send_to_ionosphere:
save_training_data = True
# @added 20230616 - Feature #4952: vortex - consensus_count
if consensus_count:
logger.info('mirage_vortex :: adding consensus_count results for request_id: %s' % (
str(request_id)))
consensus_count_anomalies = {}
consensus_count_scores = []
consensus_count_algos = list(consensus_count_results['algorithms'].keys())
for index, item in enumerate(timeseries):
score = 0
ts = int(item[0])
triggered = []
for algo in consensus_count_algos:
try:
if consensus_count_results['algorithms'][algorithm]['scores'][index] == 1:
triggered.append(algo)
except:
pass
if len(triggered) >= consensus_count:
consensus_count_anomalies[ts] = {'value': item[1], 'index': index, 'score': len(triggered), 'triggered': triggered}
score = len(triggered)
consensus_count_scores.append(score)
anomalies_in_window = len([x for x in consensus_count_scores[-anomaly_window:] if x >= 1])
anomalous = False
if anomalies_in_window:
anomalous = True
metric_data['results']['consensus_count_results'] = {}
metric_data['results']['consensus_count_results']['anomalous'] = anomalous
metric_data['results']['consensus_count_results']['anomaly_window'] = anomaly_window
metric_data['results']['consensus_count_results']['anomalies_in_window'] = anomalies_in_window
metric_data['results']['consensus_count_results']['unreliable'] = False
metric_data['results']['consensus_count_results']['anomalies'] = copy.deepcopy(consensus_count_anomalies)
metric_data['results']['consensus_count_results']['scores'] = list(consensus_count_scores)
if return_image_urls:
image_urls = []
full_duration_in_hours = int(metric_data['nearest_full_duration'] / 3600)
use_base_name = str(metric_data['labelled_metric_name'])
timeseries_dir = use_base_name.replace('.', '/')
metric_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, metric_timestamp,
timeseries_dir)
for i_algorithm in metric_data['results']['algorithms_run']:
image_file = '%s/vortex.algorithm.%s.%s.%sh.png' % (metric_data_dir, i_algorithm, metric_data['labelled_metric_name'], str(full_duration_in_hours))
image_url = '%s/ionosphere_images?image=%s' % (settings.SKYLINE_URL, image_file)
image_urls.append(image_url)
if consensus_anomalies:
image_file = '%s/vortex.algorithm.consensus.%s.%sh.png' % (metric_data_dir, metric_data['labelled_metric_name'], str(full_duration_in_hours))
image_url = '%s/ionosphere_images?image=%s' % (settings.SKYLINE_URL, image_file)
image_urls.append(image_url)
# @added 20230616 - Feature #4952: vortex - consensus_count
if consensus_count:
image_file = '%s/vortex.algorithm.consensus_count.%s.%sh.png' % (metric_data_dir, metric_data['labelled_metric_name'], str(full_duration_in_hours))
image_url = '%s/ionosphere_images?image=%s' % (settings.SKYLINE_URL, image_file)
image_urls.append(image_url)
metric_data['results']['image_urls'] = image_urls
if save_training_data:
create_echo_data = False
if metric_data['resolution'] <= 60 and metric_data['full_duration'] >= 86400:
if metric_data['analysis_period'] > 86400:
create_echo_data = True
if create_echo_data:
try:
created_echo_data = self.create_echo_timeseries(metric_data, timeseries)
except Exception as err:
logger.error('error :: mirage_vortex :: create_echo_timeseries failed for request_id: %s - %s' % (
request_id, err))
if not created_echo_data:
logger.warning('warning :: mirage_vortex :: create_echo_timeseries failed for request_id: %s' % (
request_id))
try:
vortex_metric_data = copy.deepcopy(metric_data)
saved_training_data = self.add_training_data(request_id, vortex_metric_data, use_timeseries, ionosphere_enabled)
if saved_training_data:
logger.info('mirage_vortex :: saved training_data for request_id: %s to %s' % (
str(request_id), saved_training_data))
else:
logger.info('mirage_vortex :: failed to save training_data for request_id: %s' % (
str(request_id)))
trainable = False
try:
del vortex_metric_data
except:
pass
except Exception as err:
logger.error('error :: mirage_vortex :: add_training_data failed for request_id: %s - %s' % (
request_id, err))
if saved_training_data and return_image_urls:
image_urls = []
full_duration_in_hours = int(metric_data['nearest_full_duration'] / 3600)
for i_algorithm in metric_data['results']['algorithms_run']:
image_file = '%s/vortex.algorithm.%s.%s.%sh.png' % (saved_training_data, i_algorithm, metric_data['labelled_metric_name'], str(full_duration_in_hours))
image_url = '%s/ionosphere_images?image=%s' % (settings.SKYLINE_URL, image_file)
image_urls.append(image_url)
if consensus_anomalies:
image_file = '%s/vortex.algorithm.consensus.%s.%sh.png' % (saved_training_data, metric_data['labelled_metric_name'], str(full_duration_in_hours))
image_url = '%s/ionosphere_images?image=%s' % (settings.SKYLINE_URL, image_file)
image_urls.append(image_url)
metric_data['results']['image_urls'] = image_urls
metric_data['training_data'] = saved_training_data
metric_data['results']['trainable'] = False
add_results = True
if trigger_anomaly:
logger.info('mirage_vortex :: trigger_anomaly was passed for request_id: %s, setting to send_to_ionosphere and ionosphere_enabled to False' % (
str(request_id)))
ionosphere_enabled = send_to_ionosphere = False
add_results = True
if algorithms_test_only:
logger.info('mirage_vortex :: algorithms_test_only was passed for request_id: %s, setting to anomalous, send_to_ionosphere and ionosphere_enabled to False' % (
str(request_id)))
anomalous = ionosphere_enabled = send_to_ionosphere = False
add_results = True
if ionosphere_enabled or send_to_ionosphere:
add_results = False
# Send to ionosphere
ionosphere_parent_id = 0
try:
timeseries_dir = labelled_metric_name.replace('.', '/')
send_anomalous_metric_to(
'mirage_vortex', 'ionosphere', timeseries_dir,
str(metric_timestamp), labelled_metric_name,
str(timeseries[-1][1]), str(int(timeseries[0][0])),
triggered_algorithms, timeseries,
nearest_full_duration, str(ionosphere_parent_id),
list(algorithms_run.keys()))
sent_to_ionosphere = True
logger.info('mirage_vortex :: sent %s to ionosphere for request_id: %s' % (
str(labelled_metric_name), request_id))
except Exception as err:
logger.error('error :: mirage_vortex :: add_training_data failed for request_id: %s - %s' % (
request_id, err))
if sent_to_ionosphere:
metric_data_file = '%s/vortex.metric_data.%s.json' % (saved_training_data, request_id)
key_data = {
'request_id': request_id,
'metric': labelled_metric_name,
'timestamp': metric_timestamp,
'training_data': saved_training_data,
'metric_data_file': metric_data_file,
'timeout': metric_data['timeout'],
}
try:
self.redis_conn_decoded.hset('mirage_vortex.sent_to_ionosphere', request_id, str(key_data))
except Exception as err:
logger.error('error :: mirage_vortex :: failed to add request_id: %s to mirage_vortex.sent_to_ionosphere - %s' % (
request_id, err))
continue
if add_results:
try:
added_results = self.add_results(request_id, metric_data, analysis_start_time)
except Exception as err:
logger.error('error :: mirage_vortex :: add_results failed for request_id: %s - %s' % (
request_id, err))
break
if added_results:
logger.info('mirage_vortex :: processed request_id: %s' % str(request_id))
if anomalous:
logger.info('mirage_vortex :: anomaly detected - %s' % labelled_metric_name)
# If the metric has no features profiles and is anomalous
# add a panorama anomaly because Ionosphere will not add
# one.
send_to_panorama = False
if not ionosphere_enabled:
if settings.PANORAMA_ENABLED:
send_to_panorama = True
if send_to_panorama:
try:
sent_to_panorama = self.send_anomaly_to_panorama(
labelled_metric_name, value, from_timestamp,
metric_timestamp, algorithms_run, triggered_algorithms)
except Exception as err:
logger.error('error :: mirage_vortex :: send_anomaly_to_panorama failed - %s' % (
err))
if sent_to_panorama:
logger.info('mirage_vortex :: sent %s anomaly to panorama' % labelled_metric_name)
current_illuminance_dict = {}
try:
current_illuminance_dict = self.add_to_illuminance(run_timestamp, labelled_metric_name, metric_timestamp, value, triggered_algorithms)
except Exception as err:
logger.error('error :: mirage_vortex :: add_illuminance_entries failed - %s' % (
err))
if current_illuminance_dict:
logger.info('mirage_vortex :: added %s anomaly to illuminance' % labelled_metric_name)
logger.info('mirage_vortex :: adding %s to mirage_vortex.anomalous_metrics to be alerted on by mirage' % labelled_metric_name)
added_to_mirage_vortex_anomalous_metrics = False
try:
added_to_mirage_vortex_anomalous_metrics = self.add_to_mirage_vortex_anomalous_metrics(metric_data)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to hdel %s from mirage_vortex.sent_to_ionosphere - %s' % (
request_id, err))
logger.info('mirage_vortex :: added mirage_vortex.anomalous_metrics: %s' % str(added_to_mirage_vortex_anomalous_metrics))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: algorithm %s failed - %s' % str(err))
try:
self.redis_conn.incr('mirage_vortex.checks.done')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to increment mirage_vortex.checks.done Redis key - %s' % str(err))
# @added 20220420 - Feature #4530: namespace.analysed_events
parent_namespace = metric_data['metric_namespace_prefix']
if not parent_namespace:
try:
parent_namespace = metric.split('.')[0]
except:
pass
date_string = str(strftime('%Y-%m-%d', gmtime()))
namespace_analysed_events_hash = 'namespace.analysed_events.mirage.%s' % (date_string)
try:
self.redis_conn.hincrby(namespace_analysed_events_hash, parent_namespace, 1)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to increment %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
try:
self.redis_conn.expire(namespace_analysed_events_hash, (86400 * 15))
logger.info('mirage_vortex :: updated %s Redis hash' % namespace_analysed_events_hash)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to set expire %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
if added_results or sent_to_ionosphere:
continue
completed_at = time()
analysis_run_time = completed_at - process_start_timestamp
logger.info('mirage_vortex :: %s checks processed in in %.2f seconds' % (
str(checks_processed), analysis_run_time))
[docs] def run(self):
"""
Called when the process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('mirage_vortex :: starting %s run' % skyline_app)
last_sent_to_graphite = int(time())
filesafe_names_dict = {}
last_sleep_log = int(time())
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except:
logger.error('error :: mirage_vortex :: skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
logger.info('mirage_vortex :: attempting to connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as err:
logger.error('error :: mirage_vortex :: failed to connect to Redis - %s' % err)
try:
self.redis_conn.ping()
logger.info('mirage_vortex :: connected to redis')
except Exception as err:
logger.error('error :: mirage_vortex :: failed to ping Redis - %s' % err)
# Determine if any metric to analyze or Ionosphere alerts to be sent
while True:
# Report app up
try:
redis_is_up = self.redis_conn.setex(skyline_app, 120, now)
if redis_is_up:
try:
self.redis_conn.setex('redis', 120, now)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: could not update the Redis redis key - %s' % (
e))
except Exception as e:
logger.error('error :: mirage_vortex :: failed to update Redis key for %s up - %s' % (skyline_app, e))
items_to_analyse = []
ionosphere_results = []
return_results_for = {}
if len(items_to_analyse) == 0 and len(ionosphere_results) == 0 and len(return_results_for) == 0:
sleep_for = 1
next_send_to_graphite = last_sent_to_graphite + 60
seconds_to_next_send_to_graphite = next_send_to_graphite - int(time())
if seconds_to_next_send_to_graphite < 10:
if seconds_to_next_send_to_graphite > 1:
sleep_for = seconds_to_next_send_to_graphite
else:
break
if int(time()) > last_sleep_log + 10:
logger.info('mirage_vortex :: sleeping no metrics...')
last_sleep_log = int(time())
sleep(sleep_for)
# Return results that are about to timeout
return_results_for = {}
mirage_vortex_sent_to_ionosphere = {}
c_time = int(time())
try:
mirage_vortex_sent_to_ionosphere = self.redis_conn_decoded.hgetall('mirage_vortex.sent_to_ionosphere')
except Exception as err:
logger.error('mirage_vortex :: hkeys failed on ionosphere.vortex_results - %s' % err)
for request_id in list(mirage_vortex_sent_to_ionosphere.keys()):
try:
timeout = int(mirage_vortex_sent_to_ionosphere['request_id']['timeout'])
except:
timeout = 55
try:
sent_timestamp_str = request_id.split('.')[0]
except:
sent_timestamp_str = '60'
if c_time > (int(sent_timestamp_str) + ((timeout - 3) * 3)):
return_results_for[request_id] = copy.deepcopy(mirage_vortex_sent_to_ionosphere[request_id])
# Send ionosphere results
ionosphere_results = []
if not return_results_for:
try:
ionosphere_results = self.redis_conn_decoded.hkeys('ionosphere.vortex_results')
except Exception as err:
logger.error('mirage_vortex :: hkeys failed on failed ionosphere.vortex_results - %s' % err)
# Get added checks
items_to_analyse = []
if not return_results_for and not ionosphere_results:
try:
items_to_analyse = self.redis_conn_decoded.hkeys('flux.vortex')
except Exception as err:
logger.error('mirage_vortex :: hkeys failed on failed flux.vortex - %s' % err)
if len(items_to_analyse) > 0 or len(ionosphere_results) > 0 or len(return_results_for) > 0:
break
process_metric_checks = False
if items_to_analyse:
items_to_analyse_sorted = sorted(items_to_analyse)
if items_to_analyse_sorted:
process_metric_checks = True
if process_metric_checks or ionosphere_results or return_results_for:
# @added 20200903 - Task #3730: Validate Mirage running multiple processes
checks_to_process = len(items_to_analyse_sorted)
if items_to_analyse:
logger.info('mirage_vortex :: %s checks to process' % str(checks_to_process))
if ionosphere_results:
logger.info('mirage_vortex :: %s ionosphere_results to process' % str(len(ionosphere_results)))
if return_results_for:
logger.info('mirage_vortex :: returning results for %s request_ids about to timeout' % str(len(return_results_for)))
# Remove any existing algorithm.error files from any previous runs
# that did not cleanup for any reason
pattern = '%s.*.algorithm.error' % skyline_app
try:
for f in os.listdir(settings.SKYLINE_TMP_DIR):
if re.search(pattern, f):
try:
os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f))
logger.info('mirage_vortex :: cleaning up old error file - %s' % (str(f)))
except OSError:
pass
except:
logger.error('failed to cleanup mirage_algorithm.error files - %s' % (traceback.format_exc()))
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
if len(items_to_analyse) > 1:
try:
MIRAGE_PROCESSES = int(settings.MIRAGE_PROCESSES)
if len(items_to_analyse) < MIRAGE_PROCESSES:
MIRAGE_PROCESSES = len(items_to_analyse)
except:
MIRAGE_PROCESSES = 1
else:
MIRAGE_PROCESSES = 1
MIRAGE_PROCESSES = 1
run_timestamp = int(time())
for i in range(1, MIRAGE_PROCESSES + 1):
assigned_checks = []
if items_to_analyse:
checks_per_processor = int(ceil(float(len(items_to_analyse_sorted)) / float(MIRAGE_PROCESSES)))
if i == MIRAGE_PROCESSES:
assigned_max = len(items_to_analyse_sorted)
else:
assigned_max = min(len(items_to_analyse_sorted), i * checks_per_processor)
assigned_min = (i - 1) * checks_per_processor
assigned_keys = range(assigned_min, assigned_max)
# Compile assigned metrics
assigned_checks = [items_to_analyse_sorted[index] for index in assigned_keys]
logger.info('mirage_vortex :: processing %s checks' % str(len(assigned_checks)))
if ionosphere_results:
logger.info('mirage_vortex :: processing %s ionosphere_results' % str(len(ionosphere_results)))
if return_results_for:
logger.info('mirage_vortex :: returning %s results' % str(len(return_results_for)))
p = Process(target=self.spin_process, args=(i, run_timestamp, assigned_checks, ionosphere_results, return_results_for))
pids.append(p)
pid_count += 1
logger.info('mirage_vortex :: starting %s of %s spin_process/es' % (
str(pid_count), str(MIRAGE_PROCESSES)))
p.start()
spawned_pids.append([p.pid, i])
logger.info('mirage_vortex :: started spin_process %s with pid %s' % (str(pid_count), str(p.pid)))
# Self monitor processes and terminate if any spin_process has run
# for longer than 180 seconds - 20160512 @earthgecko
p_starts = time()
while time() - p_starts <= (settings.MAX_ANALYZER_PROCESS_RUNTIME * 3):
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('mirage_vortex :: %s :: %s spin_process/es completed in %.2f seconds' % (
skyline_app, str(MIRAGE_PROCESSES), time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('mirage_vortex :: %s :: timed out, killing all spin_process processes' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('mirage_vortex :: %s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive())))
p.join()
# @added 20200607 - Feature #3508: ionosphere.untrainable_metrics
# Check to non 3sigma algorithm errors too
check_algorithm_errors = ['negatives_present']
for algorithm in list(settings.MIRAGE_ALGORITHMS):
check_algorithm_errors.append(algorithm)
for completed_pid, mirage_process in spawned_pids:
logger.info('mirage_vortex :: spin_process with pid %s completed' % (str(completed_pid)))
# Check to non 3sigma algorithm errors too and wrapped in try
try:
# for algorithm in settings.MIRAGE_ALGORITHMS:
for algorithm in check_algorithm_errors:
algorithm_error_file = '%s/%s.%s.%s.algorithm.error' % (
settings.SKYLINE_TMP_DIR, skyline_app,
str(completed_pid), algorithm)
if os.path.isfile(algorithm_error_file):
logger.info(
'error :: mirage_vortex :: spin_process with pid %s has reported an error with the %s algorithm' % (
str(completed_pid), algorithm))
try:
with open(algorithm_error_file, 'r') as f:
error_string = f.read()
logger.error('%s' % str(error_string))
except:
logger.error('error :: mirage_vortex :: failed to read %s error file' % algorithm)
try:
os.remove(algorithm_error_file)
except OSError:
pass
except:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to check algorithm errors')
redis_metrics_processed_key = 'mirage_vortex.%s.metrics_processed' % str(mirage_process)
redis_metrics_processed = {}
try:
redis_metrics_processed = self.redis_conn_decoded.hgetall(redis_metrics_processed_key)
# if redis_metrics_processed:
# self.redis_conn_decoded.delete(redis_metrics_processed_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: %s Redis hash operation failed - %s' % (redis_metrics_processed_key, err))
if items_to_analyse:
logger.info('mirage_vortex :: process %s checked %s metrics' % (
str(mirage_process), str(len(redis_metrics_processed))))
# Remove checks from flux.vortex
if redis_metrics_processed:
try:
self.redis_conn.hdel('flux.vortex', *set(list(redis_metrics_processed.keys())))
except Exception as err:
logger.error('error :: mirage_vortex :: hdel on flux.vortex failed - %s' % err)
mirage_vortex_anomalous_metrics = []
try:
literal_mirage_vortex_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage_vortex.anomalous_metrics'))
for metric_list_string in literal_mirage_vortex_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_vortex_anomalous_metrics.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to determine list from mirage_vortex.anomalous_metrics Redis set')
mirage_vortex_anomalous_metrics = []
mirage_not_anomalous_metrics = []
try:
literal_mirage_not_anomalous_metrics = list(self.redis_conn_decoded.smembers('mirage_vortex.not_anomalous_metrics'))
for metric_list_string in literal_mirage_not_anomalous_metrics:
metric = literal_eval(metric_list_string)
mirage_not_anomalous_metrics.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to determine list from mirage_vortex.not_anomalous_metrics Redis set')
mirage_not_anomalous_metrics = []
# Log progress
logger.info('mirage_vortex :: total anomalies :: %d' % len(mirage_vortex_anomalous_metrics))
# Log to Graphite
if process_metric_checks:
run_time = time() - run_timestamp
logger.info('mirage_vortex :: seconds to run :: %.2f' % run_time)
graphite_run_time = '%.2f' % run_time
send_metric_name = skyline_app_graphite_namespace + '.run_time'
send_graphite_metric(self, skyline_app, send_metric_name, graphite_run_time)
if int(time()) >= (last_sent_to_graphite + 60):
checks_done = 0
try:
# @modified 20230205 - Task #4844: Replace Redis getset with set with get
# As of Redis version 6.2.0, this command is regarded as deprecated.
# It can be replaced by SET with the GET argument when migrating or writing new code.
# checks_done_str = self.redis_conn_decoded.getset('mirage_vortex.checks.done', 0)
checks_done_str = self.redis_conn_decoded.set('mirage_vortex.checks.done', 0, get=True)
if checks_done_str:
checks_done = int(checks_done_str)
except:
logger.error(traceback.format_exc())
logger.error('error :: mirage_vortex :: failed to get mirage_vortex.checks.done key from Redis')
checks_done = 0
logger.info('mirage_vortex :: checks.done :: %s' % str(checks_done))
send_metric_name = '%s.checks.done' % skyline_app_graphite_namespace
send_graphite_metric(self, skyline_app, send_metric_name, str(checks_done))
last_sent_to_graphite = int(time())
# Sleep if it went too fast
if time() - now < 59:
logger.info('mirage_vortex :: sleeping due to low run time...')
sleep(1)