"""
find_repetitive_patterns.py
"""
import logging
import traceback
import os
import copy
from time import time
from ast import literal_eval
from shutil import rmtree
from timeit import default_timer as timer
import settings
from skyline_functions import get_redis_conn_decoded
from functions.ionosphere.get_training_data import get_training_data
from functions.database.queries.query_anomalies import get_anomalies_for_period
from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
from functions.metrics.get_metric_ids_and_base_names import get_metric_ids_and_base_names
from skyline_functions import mkdir_p, write_data_to_file, get_graphite_metric
from features_profile import calculate_features_profile
from functions.numpy.percent_different import get_percent_different
from functions.victoriametrics.get_victoriametrics_metric import get_victoriametrics_metric
from functions.ionosphere.get_repetitive_patterns_metrics_list_to_evaluate import get_repetitive_patterns_metrics_list_to_evaluate
from functions.database.queries.get_apps import get_apps
skyline_app = 'ionosphere'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
this_host = str(os.uname()[1])
# Converting one settings variable into a local variable, just because it is a
# long string otherwise.
try:
ENABLE_IONOSPHERE_DEBUG = settings.ENABLE_IONOSPHERE_DEBUG
except:
logger.error('error :: learn :: cannot determine ENABLE_IONOSPHERE_DEBUG from settings')
ENABLE_IONOSPHERE_DEBUG = False
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
learn_full_duration = int(settings.IONOSPHERE_LEARN_DEFAULT_FULL_DURATION_DAYS) * 86400
except:
learn_full_duration = 86400 * 30 # 2592000
verify_ssl = True
try:
running_on_docker = settings.DOCKER
except:
running_on_docker = False
if running_on_docker:
verify_ssl = False
try:
overall_verify_ssl = settings.VERIFY_SSL
except:
overall_verify_ssl = True
if not overall_verify_ssl:
verify_ssl = False
user = None
password = None
if settings.WEBAPP_AUTH_ENABLED:
user = str(settings.WEBAPP_AUTH_USER)
password = str(settings.WEBAPP_AUTH_USER_PASSWORD)
redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs]def get_features_profile_sum(until_timestamp, use_metric):
"""
Determine the features profile sum for the metric
"""
function_str = 'find_repetitive_patterns :: get_features_profile_sum'
try:
fp_csv, successful, fp_exists, fp_id, log_msg, traceback_format_exc, f_calc = calculate_features_profile(skyline_app, until_timestamp, use_metric, 'find_repetitive_patterns')
except Exception as err:
logger.error('error :: %s :: calculate_features_profile failed - %s' % (
function_str, err))
features_profile_details_file = None
timeseries_dir = use_metric.replace('.', '/')
training_data_dir = '%s/%s/%s' % (settings.SKYLINE_TMP_DIR, until_timestamp, timeseries_dir)
dir_list = os.listdir(training_data_dir)
for f in dir_list:
if f.endswith('.fp.details.txt'):
features_profile_details_file = '%s/%s' % (training_data_dir, f)
fp_details_str = None
if features_profile_details_file:
if os.path.isfile(features_profile_details_file):
with open(features_profile_details_file, 'r') as f:
fp_details_str = f.read()
fp_details = None
if fp_details_str:
try:
fp_details = literal_eval(fp_details_str)
except Exception as err:
logger.error('error :: %s :: literal_eval failed - %s' % (
function_str, err))
fp_details = None
features_profile_sum = None
if fp_details:
try:
features_profile_sum = fp_details[4]
except Exception as err:
logger.error('error :: %s :: failed fp_details[4] - %s' % (
function_str, err))
return features_profile_sum
[docs]def get_features_profile_sums(metric, use_metric, anomalies, graphite):
"""
For each anomaly generate the temporary training data and calculate the
features profile sum for each
"""
function_str = 'find_repetitive_patterns :: get_features_profile_sums'
features_profile_sums = {}
training_data_dirs = []
timeseries_dir = use_metric.replace('.', '/')
for a_id in list(anomalies.keys()):
if anomalies[a_id]['full_duration'] < (604800 - 3600):
continue
start = timer()
until_timestamp = anomalies[a_id]['anomaly_timestamp']
from_timestamp = until_timestamp - (86400 * 7)
training_data_dir = '%s/%s/%s' % (settings.SKYLINE_TMP_DIR, until_timestamp, timeseries_dir)
try:
mkdir_p(training_data_dir)
# Append to the list of training dirs which is used to remove them at
# the end of the process
training_data_dirs.append(training_data_dir)
except Exception as err:
logger.error('error :: %s :: failed to create dir %s - %s' % (
function_str, training_data_dir, err))
anomaly_json = '%s/%s.json' % (training_data_dir, use_metric)
metric_json_file_saved = None
if graphite:
metric_json_file_saved = get_graphite_metric(
skyline_app, metric, from_timestamp,
until_timestamp, 'json', anomaly_json)
else:
metric_data = {}
metric_json_file_saved = get_victoriametrics_metric(
skyline_app, use_metric, from_timestamp,
until_timestamp, 'json', anomaly_json, metric_data)
if not metric_json_file_saved:
logger.error('error :: %s :: file not saved %s' % (
function_str, anomaly_json))
continue
# To ensure that the features profile calculations are fast, if the
# timeseries is long resample it.
timeseries = []
try:
# Read the timeseries json file
with open(anomaly_json, 'r') as f:
raw_timeseries = f.read()
timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']')
del raw_timeseries
timeseries = literal_eval(timeseries_array_str)
except:
trace = traceback.format_exc()
current_logger.error(trace)
logger.error(
'error: %s :: failed to read timeseries data from %s' % (log_context, anomaly_json))
fail_msg = 'error: %s :: failed to read timeseries data from %s' % (log_context, anomaly_json)
if len(timeseries) > 2000:
try:
downsampled_timeseries = downsample_timeseries(skyline_app, timeseries, 60, 600, 'mean', 'end')
except Exception as err:
errors.append([labelled_metric, 'downsample_timeseries', str(err)])
downsampled_timeseries = list(timeseries)
del timeseries
anomaly_data = 'metric = \'%s\'\n' \
'full_duration = \'604800\'\n' \
% (metric)
anomaly_file = '%s/%s.txt' % (training_data_dir, use_metric)
write_data_to_file(skyline_app, anomaly_file, 'w', anomaly_data)
try:
features_profile_sum = get_features_profile_sum(until_timestamp, use_metric)
except Exception as err:
logger.error('error :: %s :: get_features_profile_sum failed - %s' % (
function_str, err))
if not features_profile_sum:
continue
features_profile_sums[until_timestamp] = features_profile_sum
# Only remove after training ... at the end
# They need to be copied to the the IONOSPHERE_DATA_FOLDER so they
# can be trained on
# if os.path.exists(training_data_dir):
# try:
# rmtree(training_data_dir)
# except Exception as err:
# print('error :: failed to rmtree - %s - %s' % (training_data_dir, err))
timer_end = timer()
logger.info('%s :: %s: %s (took %.6f seconds)' % (
function_str, str(until_timestamp), str(features_profile_sum), (timer_end - start)))
return features_profile_sums, training_data_dirs
[docs]def get_metrics_to_train(features_to_compare, metric):
"""
For each anomaly generate the temporary training data and calculate the
features profile sum for each
"""
function_str = 'find_repetitive_patterns :: get_metrics_to_train'
metrics_to_train = {}
found_training = False
metrics_to_train_key_data = {}
for i_metric in list(features_to_compare.keys()):
comparison_matrix = {
'train': False,
'training': {},
'timestamps_to_train': [],
'train_count': len(list(features_to_compare[i_metric].keys())),
}
training_timestamps = []
for index, t in enumerate(list(features_to_compare[i_metric].keys())):
comparison_matrix['training'][t] = {}
features_sum = features_to_compare[i_metric][t]
similar_count = 0
t_training_timestamps = []
t_training_timestamps = []
for i, it in [[index, t] for index, t in enumerate(list(features_to_compare[i_metric].keys()))]:
if t == it:
continue
skip = False
# skip if in the same week
if it > t:
if (t + (86400 * 7)) > it:
skip = True
if it < t:
if it > (t - (86400 * 7)):
skip = True
if skip:
continue
# similar_count = 0
if i == index:
continue
other_features_sum = features_to_compare[i_metric][it]
percent_different = 100
try:
percent_different = get_percent_different(features_sum, other_features_sum, True)
except Exception as err:
logger.error('error :: %s :: failed to calculate percent_different for metric: %s - %s' % (
function_str, metric, err))
continue
comparison_matrix['training'][t][it] = {'percent_different': percent_different, 'source_fp_sum': features_sum, 'fp_sum': other_features_sum}
if float(percent_different) <= float(2):
comparison_matrix['training'][t][it]['similar'] = True
similar_count += 1
t_training_timestamps.append(t)
t_training_timestamps.append(it)
else:
comparison_matrix['training'][t][it]['similar'] = False
# If more than 3 are similar, train
# But spread over what period...
if similar_count >= 2:
training_timestamps = training_timestamps + t_training_timestamps
logger.info('%s :: %s similar to %s with %s' % (
function_str, str(t), str(similar_count), str(features_sum)))
training_timestamps = list(set(training_timestamps))
comparison_matrix['train_on_avg'] = False
# If the average of the percentages is less than 4 train on
# the timestamps where the percent_difference is less than 2
percentages = []
for t in list(comparison_matrix['training'].keys()):
for tt in list(comparison_matrix['training'][t].keys()):
percentages.append(comparison_matrix['training'][t][tt]['percent_different'])
if percentages:
avg_percent_different = (sum(percentages) / len(percentages))
comparison_matrix['avg_percent_different'] = avg_percent_different
else:
avg_percent_different = 100
if len(training_timestamps) >= 3:
comparison_matrix['train'] = True
found_training = True
else:
# If the average of the percentages is less than 4 train on
# the timestamps where the percent_difference is less than 2
if len(training_timestamps) > 0:
if avg_percent_different <= 4:
comparison_matrix['train'] = True
comparison_matrix['train_on_avg'] = True
found_training = True
comparison_matrix['timestamps_to_train'] = training_timestamps
metrics_to_train_key_data[i_metric] = comparison_matrix
# if comparison_matrix['train']:
metrics_to_train[i_metric] = comparison_matrix
return metrics_to_train, found_training
[docs]def get_anomalies_to_evaluate(include_metrics, metric_ids_with_base_names, anomalies_in_period):
"""
Return anomalies that are for metrics in include_metrics
"""
function_str = 'find_repetitive_patterns :: get_anomalies_to_evaluate'
anomalies_to_evalute = {}
apps = {}
try:
apps = get_apps(skyline_app)
except Exception as err:
logger.error('error :: %s :: get_apps failed - %s' % (
function_str, err))
boundary_id = 0
try:
boundary_id = apps['boundary']
except Exception as err:
logger.warning('warning :: %s :: failed to determine boundary app id - %s' % (
function_str, err))
for anomaly_id in list(anomalies_in_period.keys()):
metric_id = anomalies_in_period[anomaly_id]['metric_id']
base_name = None
try:
base_name = metric_ids_with_base_names[metric_id]
except:
base_name = None
if base_name in include_metrics:
# Do not include boundary anomalies
if anomalies_in_period[anomaly_id]['app_id'] == boundary_id:
continue
anomalies_to_evalute[anomaly_id] = copy.deepcopy(anomalies_in_period[anomaly_id])
return anomalies_to_evalute
[docs]def find_repetitive_patterns(metric, metric_id, anomaly_timestamp, plot=False):
function_str = 'find_repetitive_patterns'
found_training = False
features_profile_sums = {}
training_data_dirs = []
metrics_to_train = {}
use_metric = str(metric)
graphite = True
if '_tenant_id="' in metric:
use_metric = 'labelled_metrics.%s' % str(metric_id)
graphite = False
until_timestamp = int(anomaly_timestamp)
from_timestamp = until_timestamp - (86400 * 30)
metric_ids = [metric_id]
anomalies = get_anomalies_for_period(skyline_app, metric_ids, from_timestamp, until_timestamp)
if not anomalies:
logger.info('%s :: no anomalies found for %s' % (function_str, metric))
return found_training, features_profile_sums, metrics_to_train, training_data_dirs
logger.info('%s :: %s anomalies found for %s' % (
function_str, str(len(anomalies)), metric))
if len(anomalies) < 3:
logger.info('%s :: insufficient anomalies data to evaluate for %s' % (function_str, metric))
return found_training, features_profile_sums, metrics_to_train, training_data_dirs
timeseries_dir = use_metric.replace('.', '/')
try:
features_profile_sums, training_data_dirs = get_features_profile_sums(metric, use_metric, anomalies, graphite)
except Exception as err:
logger.error('error :: %s :: get_features_profile_sums failed - %s' % (
function_str, err))
if not features_profile_sums:
logger.info('%s :: no features_profile_sums calculated for %s' % (
function_str, metric))
return found_training, features_profile_sums, metrics_to_train, training_data_dirs
logger.info('%s :: %s features_profile_sums calculated' % (
function_str, str(len(features_profile_sums))))
features_to_compare = {}
features_to_compare[use_metric] = features_profile_sums
metrics_to_train_key_data = {}
metrics_to_train, found_training = get_metrics_to_train(features_to_compare, metric)
if found_training:
logger.info('%s :: found %s similar patterns to train' % (
function_str, str(len(metrics_to_train[use_metric]['timestamps_to_train']))))
for t in sorted(metrics_to_train[use_metric]['timestamps_to_train']):
print(metric, t, features_profile_sums[t])
if plot:
try:
raw_timeseries = []
training_data_dir = '%s/%s/%s' % (settings.SKYLINE_TMP_DIR, t, timeseries_dir)
anomaly_json = '%s/%s.json' % (training_data_dir, use_metric)
with open(anomaly_json, 'r') as f:
raw_timeseries = f.read()
timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']')
del raw_timeseries
timeseries = literal_eval(timeseries_array_str)
plt.plot([item[0] for item in timeseries], [item[1] for item in timeseries])
plt.show()
except Exception as err:
print('error :: %s :: failed to plot metric: %s - %s' % (
function_str, metric, err))
else:
print('No training found')
return found_training, features_profile_sums, metrics_to_train, training_data_dirs
[docs]def ionosphere_find_repetitive_patterns(timestamp):
"""
Called by :class:`~skyline.skyline.Ionosphere.spawn_learn_process` to
re-evaluate anomalies and such, like creating learning features profiles,
when a human makes a features profile and the automated creation of learnt
features profiles and learn features profiles.
:param timestamp: timestamp at which learn was called
:type timestamp: int
:return: True or False
:rtype: boolean
"""
function_str = 'ionosphere_find_repetitive_patterns'
child_process_pid = os.getpid()
logger.info('%s :: child_process_pid - %s' % (function_str, str(child_process_pid)))
# Get the list of all metrics to find repetitive patterns on
# Get a list of anomalies that are ready to check for the metrics which have find repetitive patterns enabled
# Check the following:
# Are there multiple metrics firing at certain times?
# Are there groups of metrics that are aligned at these times?
# If so compare the fp sums of each training data set for each metric that occurs multiple times and is aligned in a group
last_evaluated_anomaly_timestamp = 0
try:
last_evaluated_anomaly_timestamp_data = redis_conn_decoded.get('ionosphere.find_repetitive_patterns.last_evaluated_anomaly_timestamp')
if last_evaluated_anomaly_timestamp_data:
last_evaluated_anomaly_timestamp = int(last_evaluated_anomaly_timestamp_data)
except Exception as err:
logger.error('error :: %s :: failed to get ionosphere.find_repetitive_patterns.last_evaluated_anomaly_timestamp from Redis - %s' % (
function_str, err))
if not last_evaluated_anomaly_timestamp:
# Evaluate the last days data if not set
last_evaluated_anomaly_timestamp = timestamp - settings.FULL_DURATION
# Determine which anomalies need to be evaluated
from_timestamp = int(last_evaluated_anomaly_timestamp)
# Period after the normal ionosphere learn after period
until_timestamp = int(time()) - (3600 + 900)
anomalies_in_period = {}
try:
metric_ids = []
anomalies_in_period = get_anomalies_for_period(skyline_app, metric_ids, from_timestamp, until_timestamp)
except Exception as err:
logger.error('error :: %s :: get_anomalies_for_period failed - %s' % (
function_str, err))
if not anomalies_in_period:
logger.info('%s :: no anomalies in period from %s, until %s, nothing to do' % (
function_str, str(from_timestamp), str(until_timestamp)))
return
# Get a list of all the metrics
metrics = []
try:
metrics = list(redis_conn_decoded.smembers('aet.analyzer.metrics_manager.db.metric_names'))
except Exception as err:
logger.error('error :: %s :: smembers failed on aet.analyzer.metrics_manager.db.metric_names - %s' % (
function_str, err))
# Create the list of metrics to evalute based on IONOSPHERE_REPETITIVE_PATTERNS_INCLUDE
# and IONOSPHERE_REPETITIVE_PATTERNS_EXCLUDE
include_metrics, exclude_metrics = [], []
try:
include_metrics, exclude_metrics = get_repetitive_patterns_metrics_list_to_evaluate('ionosphere', metrics)
except Exception as err:
logger.error('error :: %s :: get_repetitive_patterns_metrics_list_to_evaluate failed - %s' % (
function_str, err))
if not include_metrics:
logger.info('%s :: no metrics in include_metrics, nothing to do' % function_str)
return
logger.info('%s :: %s metrics found to include in find_repetitive_patterns, specifically excluding %s' % (
function_str, str(len(include_metrics)), str(len(exclude_metrics))))
metric_ids_with_base_names = {}
try:
metric_ids_with_base_names = get_metric_ids_and_base_names(skyline_app)
except Exception as err:
logger.error('error :: %s :: get_metric_ids_with_base_names failed - %s' % (
function_str, str(err)))
# Filter anomalies that are for include_metrics
anomalies_to_evalute = {}
try:
anomalies_to_evalute = get_anomalies_to_evaluate(include_metrics, metric_ids_with_base_names, anomalies_in_period)
except Exception as err:
logger.error('error :: %s :: get_anomalies_to_evaluate failed - %s' % (
function_str, str(err)))
if not anomalies_to_evalute:
logger.info('%s :: no anomalies to evaluate, nothing to do' % function_str)
return
# Iterate the anomalies and find repetitive patterns
for anomaly_id in list(anomalies_to_evalute.keys()):
if int(time()) > timestamp + 30:
break
found_training, features_profile_sums, metrics_to_train, training_data_dirs = False, {}, {}, []
try:
metric_id = anomalies_to_evalute[anomaly_id]['metric_id']
anomaly_timestamp = anomalies_to_evalute[anomaly_id]['anomaly_timestamp']
try:
metric = metric_ids_with_base_names[metric_id]
except:
metric = None
logger.info('%s :: processing %s for %s' % (
function_str, str(anomaly_timestamp), metric))
try:
found_training, features_profile_sums, metrics_to_train, training_data_dirs = find_repetitive_patterns(metric, metric_id, anomaly_timestamp)
except Exception as err:
logger.error('error :: %s :: find_repetitive_patterns failed - %s' % (
function_str, str(err)))
except Exception as err:
logger.error('error :: %s :: find_repetitive_patterns failed - %s' % (
function_str, str(err)))
features_to_compare = {}
try:
features_to_compare = get_features_to_compare(training_to_evaluate)
except Exception as err:
logger.error('error :: %s :: get_training_to_evaluate failed - %s' % (
function_str, err))
if len(features_to_compare) == 0:
logger.info('%s :: no metrics with features to compare' % function_str)
return
metrics_to_train = {}
try:
metrics_to_train = get_metrics_to_train(features_to_compare, timestamp)
except Exception as err:
logger.error('error :: %s :: get_training_to_evaluate failed - %s' % (
function_str, err))
if len(metrics_to_train) == 0:
logger.info('%s :: no metrics to train' % function_str)
return
logger.info('%s :: %s metrics to train based on repetitive patterns' % (
function_str, str(len(metrics_to_train))))
create_context = 'training_data'
created_fp_ids = []
for metric in list(metrics_to_train.keys()):
# Do not hold up normal evaluation, run for 50 seconds and then set the
# last evaluation time key appropriately and break so that as so as
# Ionosphere is not busy with normal work the process will run again.
now = int(time())
if now > (timestamp + 50):
new_last_evaluation_timestamp = last_evaluation_timestamp + 900
logger.info('%s :: stopping due to run time and resetting ionosphere.learn_repetitive_patterns.last_evaluation_timestamp to %s' % (
function_str, str(new_last_evaluation_timestamp)))
try:
redis_conn_decoded.set('ionosphere.learn_repetitive_patterns.last_evaluation_timestamp', new_last_evaluation_timestamp)
except Exception as err:
logger.error('error :: %s :: failed to set ionosphere.learn_repetitive_patterns.last_evaluation_timestamp in Redis - %s' % (
function_str, err))
break
logger.info('%s :: training metric: %s, training info: %s' % (
function_str, metric, str(metrics_to_train[metric])))
learn_parent_id = 0
# Any generation of 0 or 1 is automatically set as validated
# however this is handled in create_features_profile when the
# ionosphere_job is 'learn_repetitive_patterns'
generation = 2
for ts in sorted(metrics_to_train[metric]['timestamps_to_train']):
fetch_graphs = True
for f in training_data_dict[ts][metric]['files']:
if 'graphite_now' in f:
fetch_graphs = False
break
if fetch_graphs:
url = '%s/ionosphere?timestamp=%s&metric=%s&requested_timestamp=%s' % (
settings.SKYLINE_URL, str(ts), metric, str(ts))
try:
if user and password:
r = requests.get(url, timeout=30, auth=(user, password), verify=verify_ssl)
else:
r = requests.get(url, timeout=30, verify=verify_ssl)
if r.status_code == 200:
logger.info('%s :: fetch graphs for features profile' % function_str)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed get training to save graphs from %s - %s' % (
function_str, str(url), err))
fp_in_successful = None
try:
fp_learn = False
if learn_parent_id:
generation += 1
ionosphere_job = 'learn_repetitive_patterns'
slack_ionosphere_job = 'learn_repetitive_patterns'
user_id = 1
label = 'LEARNT - repetitive pattern'
fp_id, fp_in_successful, fp_exists, fail_msg, traceback_format_exc = create_features_profile(skyline_app, ts, metric, create_context, ionosphere_job, learn_parent_id, generation, fp_learn, slack_ionosphere_job, user_id, label)
if fp_exists:
logger.warning('warning :: %s :: failed to create a features profile for %s, %s as an fp already exists' % (
function_str, metric, str(ts)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to create a features profile for %s, %s' % (
function_str, metric, str(timestamp)))
if fp_in_successful is False:
logger.error(traceback_format_exc)
logger.error(fail_msg)
logger.error('error :: %s :: failed to create a features profile for %s, %s' % (
function_str, metric, str(ts)))
continue
if not fp_exists:
created_fp_ids.append(fp_id)
logger.info('%s :: features profile with id %s created' % (function_str, str(fp_id)))
learn_parent_id = int(fp_id)
logger.info('%s :: created %s features profiles: %s' % (
function_str, str(len(created_fp_ids)), str(created_fp_ids)))
return