Source code for ionosphere.learn_repetitive_patterns

"""
learn_repetitive_patterns.py
"""
import logging
import traceback
import os
import copy
from time import time
from ast import literal_eval

import requests

import settings
from skyline_functions import get_redis_conn_decoded

from functions.ionosphere.get_training_data import get_training_data
from features_profile import calculate_features_profile
from functions.numpy.percent_different import get_percent_different
from ionosphere_functions import create_features_profile
# @added 20221215 - Feature #4658: ionosphere.learn_repetitive_patterns
from functions.ionosphere.get_repetitive_patterns_metrics_list_to_evaluate import get_repetitive_patterns_metrics_list_to_evaluate
from functions.metrics.get_base_names_and_metric_ids import get_base_names_and_metric_ids

skyline_app = 'ionosphere'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

this_host = str(os.uname()[1])

# Converting one settings variable into a local variable, just because it is a
# long string otherwise.
try:
    ENABLE_IONOSPHERE_DEBUG = settings.ENABLE_IONOSPHERE_DEBUG
except:
    logger.error('error :: learn :: cannot determine ENABLE_IONOSPHERE_DEBUG from settings')
    ENABLE_IONOSPHERE_DEBUG = False

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

try:
    learn_full_duration = int(settings.IONOSPHERE_LEARN_DEFAULT_FULL_DURATION_DAYS) * 86400
except:
    learn_full_duration = 86400 * 30  # 2592000

verify_ssl = True
try:
    running_on_docker = settings.DOCKER
except:
    running_on_docker = False
if running_on_docker:
    verify_ssl = False
try:
    overall_verify_ssl = settings.VERIFY_SSL
except:
    overall_verify_ssl = True
if not overall_verify_ssl:
    verify_ssl = False
user = None
password = None
if settings.WEBAPP_AUTH_ENABLED:
    user = str(settings.WEBAPP_AUTH_USER)
    password = str(settings.WEBAPP_AUTH_USER_PASSWORD)

function_str = 'learn_repetitive_patterns'

redis_conn_decoded = get_redis_conn_decoded(skyline_app)


[docs]def get_training_to_evaluate(metrics_to_evaluate): """ Return a dictionary of training data to evaluate. :param metrics_to_evaluate: the dictionary of metrics_to_evaluate :type metrics_to_evaluate: dict :return: training_to_evaluate :rtype: dict """ # Determine training data sets that are roughly aligned hourly or daily training_to_evaluate = {} hourly_tolerance = 180 daily_tolerance = 900 for metric in list(metrics_to_evaluate.keys()): # Sort timestamps oldest to latest timestamps = sorted(list(metrics_to_evaluate[metric].keys())) count = len(timestamps) aligned_count = 0 # @added 20230105 - Feature #4658: ionosphere.learn_repetitive_patterns # Check that the timestamps are not within recent 1 hour periods of each # other. This prevents a similar pattern occurring over short period # for being identified as a repetitive pattern, which is undesired. For # instance if there are training data sets for 16:05, 17:06, 18:07 on # the same day we do not want to compare those. last_training_data_timestamp = None for index, t in enumerate(timestamps): # The last timestamp, we are done if index == (count - 1): break timestamp_difference = timestamps[(index + 1)] - t # @added 20230105 - Feature #4658: ionosphere.learn_repetitive_patterns # Check that the timestamps are not within recent 1 hour periods of # each other if last_training_data_timestamp and last_training_data_timestamp < t + (3600 * 3): last_training_data_timestamp = t continue last_training_data_timestamp = t if timestamp_difference in list(range((86400 - daily_tolerance), (86400 + daily_tolerance))): aligned_count += 1 continue if timestamp_difference in list(range((3600 - hourly_tolerance), (3600 + hourly_tolerance))): aligned_count += 1 if aligned_count == (count - 1): training_to_evaluate[metric] = copy.deepcopy(metrics_to_evaluate[metric]) return training_to_evaluate
[docs]def get_metrics_to_evaluate(training_data_dict, last_evaluation_timestamp): """ Return a dictionary of metrics_to_evaluate. :param training_data_dict: the dictionary of training data :param last_evaluation_timestamp: the last timestamp that training was evaluated :type training_data_dict: dict :type last_evaluation_timestamp: int :return: metrics_to_evaluate :rtype: dict """ metrics_to_evaluate = {} training_data_dict_by_metric = {} training_to_evaluate = {} # @added 20221215 - Feature #4658: ionosphere.learn_repetitive_patterns # Added exclude_metrics list metrics = [] base_names_with_ids = {} ids_with_base_names = {} try: base_names_with_ids = get_base_names_and_metric_ids(skyline_app) metrics = list(base_names_with_ids.keys()) except Exception as err: logger.error('error :: %s :: get_base_names_and_metric_ids failed - %s' % ( function_str, err)) for metric in metrics: ids_with_base_names[base_names_with_ids[metric]] = metric include_metrics = exclude_metrics = [] try: include_metrics, exclude_metrics = get_repetitive_patterns_metrics_list_to_evaluate(skyline_app, metrics) except Exception as err: logger.info('error :: %s :: get_repetitive_patterns_metrics_list_to_evaluate falied - %s' % ( function_str, err)) for t in list(training_data_dict.keys()): for metric in training_data_dict[t]: # @added 20221215 - Feature #4658: ionosphere.learn_repetitive_patterns # Added exclude_metrics list base_name = str(metric) if metric.startswith('labelled_metrics.'): metric_id_str = metric.replace('labelled_metrics.', '') metric_id = int(metric_id_str) try: base_name = ids_with_base_names[metric_id] except: pass if base_name in exclude_metrics: continue if metric not in training_data_dict_by_metric: training_data_dict_by_metric[metric] = {} training_data_dict_by_metric[metric][t] = training_data_dict[t][metric] if t >= last_evaluation_timestamp: # Do not add if already trained if training_data_dict[t][metric]['trained']: continue if metric not in training_to_evaluate: training_to_evaluate[metric] = {} training_to_evaluate[metric][t] = training_data_dict[t][metric] logger.info('%s :: there are %s training data set to consider for evaluation' % ( function_str, str(len(training_to_evaluate)))) # Only evalute metrics that have more than 2 training data sets present training_to_evaluate_counts = {} for metric in list(training_to_evaluate.keys()): count = len(list(training_data_dict_by_metric[metric].keys())) if count > 2: training_to_evaluate_counts[metric] = len(list(training_data_dict_by_metric[metric].keys())) metrics_to_evaluate[metric] = copy.deepcopy(training_data_dict_by_metric[metric]) logger.info('%s :: there are %s metrics with multiple training data sets to check if the events align periodically' % ( function_str, str(len(training_to_evaluate_counts)))) return metrics_to_evaluate
[docs]def get_features_to_compare(training_to_evaluate): """ Return a dictionary offeatures_to_compare. :param training_to_evaluate: the dictionary of training data to evaluate :type training_to_evaluate: dict :return: features_to_compare :rtype: dict """ features_to_compare = {} for metric in list(training_to_evaluate.keys()): features_profile_sums = {} for t in list(training_to_evaluate[metric].keys()): features_profile_details_file = None fp_details = [] training_data_dir = training_to_evaluate[metric][t]['training_data_dir'] for f in training_to_evaluate[metric][t]['files']: if f.endswith('.fp.details.txt'): features_profile_details_file = '%s/%s' % (training_to_evaluate[metric][t]['training_data_dir'], f) break if not features_profile_details_file: try: fp_csv, successful, fp_exists, fp_id, log_msg, traceback_format_exc, f_calc = calculate_features_profile(skyline_app, t, metric, 'training_data') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: calculate_features_profile failed for metric: %s, timestamp: %s - %s' % ( function_str, metric, str(t), err)) dir_list = os.listdir(training_data_dir) for f in dir_list: if f.endswith('.fp.details.txt'): features_profile_details_file = '%s/%s' % (training_data_dir, f) if os.path.isfile(features_profile_details_file): fp_details_str = None with open(features_profile_details_file, 'r') as f: fp_details_str = f.read() try: fp_details = literal_eval(fp_details_str) except Exception as err: logger.error('error :: %s :: literal_eval failed for for fp_details for metric: %s - %s' % ( function_str, metric, err)) fp_details = None if fp_details: try: features_profile_sums[t] = fp_details[4] except Exception as err: logger.error('error :: %s :: literal_eval failed for for fp_details for metric: %s - %s' % ( function_str, metric, err)) if len(features_profile_sums) < 3: logger.info('%s :: %s has insufficient features sums to learn with, not evaluating' % ( function_str, metric)) continue features_to_compare[metric] = features_profile_sums return features_to_compare
[docs]def get_metrics_to_train(features_to_compare, timestamp): """ Return a dictionary of metrics_to_train. :param features_to_compare: the dictionary of features_to_compare :type features_to_compare: dict :return: metrics_to_train :rtype: dict """ metrics_to_train = {} metrics_to_train_key_data = {} for metric in list(features_to_compare.keys()): comparison_matrix = { 'train': False, 'training': {}, 'timestamps_to_train': [], 'train_count': len(list(features_to_compare[metric].keys())), } training_timestamps = [] for index, t in enumerate(list(features_to_compare[metric].keys())): comparison_matrix['training'][t] = {} features_sum = features_to_compare[metric][t] for i, it in [[index, t] for index, t in enumerate(list(features_to_compare[metric].keys()))]: similar_count = 0 if i == index: continue other_features_sum = features_to_compare[metric][it] percent_different = 100 try: percent_different = get_percent_different(features_sum, other_features_sum, True) except Exception as err: logger.error('error :: %s :: failed to calculate percent_different for metric: %s - %s' % ( function_str, metric, err)) continue comparison_matrix['training'][t][it] = {'percent_different': percent_different, 'source_fp_sum': features_sum, 'fp_sum': other_features_sum} if float(percent_different) <= float(2): comparison_matrix['training'][t][it]['similar'] = True similar_count += 1 training_timestamps.append(t) training_timestamps.append(it) else: comparison_matrix['training'][t][it]['similar'] = False training_timestamps = list(set(training_timestamps)) comparison_matrix['train_on_avg'] = False # If the average of the percentages is less than 4 train on # the timestamps where the percent_difference is less than 2 precentages = [] for t in list(comparison_matrix['training'].keys()): for tt in list(comparison_matrix['training'][t].keys()): precentages.append(comparison_matrix['training'][t][tt]['percent_different']) avg_percent_different = (sum(precentages) / len(precentages)) comparison_matrix['avg_percent_different'] = avg_percent_different if len(training_timestamps) >= 3: comparison_matrix['train'] = True else: # If the average of the percentages is less than 4 train on # the timestamps where the percent_difference is less than 2 if len(training_timestamps) > 0: if avg_percent_different <= 4: comparison_matrix['train'] = True comparison_matrix['train_on_avg'] = True comparison_matrix['timestamps_to_train'] = training_timestamps metrics_to_train_key_data[metric] = comparison_matrix if comparison_matrix['train']: metrics_to_train[metric] = comparison_matrix try: key = 'ionosphere.learn_repetitive_patterns.metrics_to_train_evaluation.%s' % str(timestamp) redis_conn_decoded.hset(key, timestamp, str(metrics_to_train_key_data)) redis_conn_decoded.expire(key, (86400 * 3)) except Exception as err: logger.error('error :: %s :: failed to hset ionosphere.learn_repetitive_patterns.metrics_to_train_evaluation in Redis - %s' % ( function_str, err)) return metrics_to_train
[docs]def ionosphere_learn_repetitive_patterns(timestamp): """ Called by :class:`~skyline.skyline.learn_repetitive_patterns.ionosphere_learn_repetitive_patterns` to re-evaluate anomalies and such, like creating learning features profiles, when a human makes a features profile and the automated creation of learnt features profiles and learn features profiles. :param timestamp: timestamp at which learn was called :type timestamp: int :return: True or False :rtype: boolean """ child_process_pid = os.getpid() logger.info('%s :: child_process_pid - %s' % (function_str, str(child_process_pid))) # Get a list of all the training_data and check the following: # Are there multiple metrics firing at certain times? # Are there groups of metrics that are aligned at these times? # If so compare the fp sums of each training data set for each metric that occurs multiple times and is aligned in a group last_evaluation_timestamp = 0 try: last_evaluation_timestamp_data = redis_conn_decoded.get('ionosphere.learn_repetitive_patterns.last_evaluation_timestamp') if last_evaluation_timestamp_data: last_evaluation_timestamp = int(last_evaluation_timestamp_data) except Exception as err: logger.error('error :: %s :: failed to get ionosphere.learn_repetitive_patterns.last_evaluation_timestamp from Redis - %s' % ( function_str, err)) if last_evaluation_timestamp: next_evaluation = last_evaluation_timestamp + 3600 if next_evaluation > timestamp: second_until = int(next_evaluation - timestamp) logger.info('%s :: next training_data evaluation scheduled to run in %s seconds, nothing to do' % ( function_str, str(second_until))) return if not last_evaluation_timestamp: # Evaluate the last days data if not set last_evaluation_timestamp = timestamp - settings.FULL_DURATION else: last_evaluation_timestamp = last_evaluation_timestamp - 3600 try: redis_conn_decoded.set('ionosphere.learn_repetitive_patterns.last_evaluation_timestamp', timestamp) except Exception as err: logger.error('error :: %s :: failed to set ionosphere.learn_repetitive_patterns.last_evaluation_timestamp from Redis - %s' % ( function_str, err)) training_data_dict = {} try: training_data_dict = get_training_data(skyline_app, metrics=None, namespaces=None, key_by='timestamp') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: get_training_data failed - %s' % (function_str, err)) if not training_data_dict: logger.info('%s :: no training_data_dict to evaluate, nothing to do' % function_str) return metrics_to_evaluate = {} try: metrics_to_evaluate = get_metrics_to_evaluate(training_data_dict, last_evaluation_timestamp) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: get_metrics_to_evaluate failed - %s' % (function_str, err)) if len(metrics_to_evaluate) == 0: logger.info('%s :: no metrics to evaluate' % function_str) return training_to_evaluate = {} try: training_to_evaluate = get_training_to_evaluate(metrics_to_evaluate) except Exception as err: logger.error('error :: %s :: get_training_to_evaluate failed - %s' % ( function_str, err)) logger.info('%s :: there are %s metrics with multiple training data sets that align periodically' % ( function_str, str(len(training_to_evaluate)))) if len(training_to_evaluate) == 0: logger.info('%s :: no metrics with periodically aligned events to evaluate' % function_str) return features_to_compare = {} try: features_to_compare = get_features_to_compare(training_to_evaluate) except Exception as err: logger.error('error :: %s :: get_features_to_compare failed - %s' % ( function_str, err)) if len(features_to_compare) == 0: logger.info('%s :: no metrics with features to compare' % function_str) return metrics_to_train = {} try: metrics_to_train = get_metrics_to_train(features_to_compare, timestamp) except Exception as err: logger.error('error :: %s :: get_metrics_to_train failed - %s' % ( function_str, err)) if len(metrics_to_train) == 0: logger.info('%s :: no metrics to train' % function_str) return logger.info('%s :: %s metrics to train based on repetitive patterns' % ( function_str, str(len(metrics_to_train)))) create_context = 'training_data' created_fp_ids = [] for metric in list(metrics_to_train.keys()): # Do not hold up normal evaluation, run for 50 seconds and then set the # last evaluation time key appropriately and break so that as so as # Ionosphere is not busy with normal work the process will run again. now = int(time()) if now > (timestamp + 50): new_last_evaluation_timestamp = last_evaluation_timestamp + 900 logger.info('%s :: stopping due to run time and resetting ionosphere.learn_repetitive_patterns.last_evaluation_timestamp to %s' % ( function_str, str(new_last_evaluation_timestamp))) try: redis_conn_decoded.set('ionosphere.learn_repetitive_patterns.last_evaluation_timestamp', new_last_evaluation_timestamp) except Exception as err: logger.error('error :: %s :: failed to set ionosphere.learn_repetitive_patterns.last_evaluation_timestamp in Redis - %s' % ( function_str, err)) break logger.info('%s :: training metric: %s, training info: %s' % ( function_str, metric, str(metrics_to_train[metric]))) learn_parent_id = 0 # Any generation of 0 or 1 is automatically set as validated # however this is handled in create_features_profile when the # ionosphere_job is 'learn_repetitive_patterns' generation = 2 for ts in sorted(metrics_to_train[metric]['timestamps_to_train']): fetch_graphs = True for f in training_data_dict[ts][metric]['files']: if 'graphite_now' in f: fetch_graphs = False break if fetch_graphs: url = '%s/ionosphere?timestamp=%s&metric=%s&requested_timestamp=%s' % ( settings.SKYLINE_URL, str(ts), metric, str(ts)) try: if user and password: r = requests.get(url, timeout=30, auth=(user, password), verify=verify_ssl) else: r = requests.get(url, timeout=30, verify=verify_ssl) if r.status_code == 200: logger.info('%s :: fetch graphs for features profile' % function_str) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed get training to save graphs from %s - %s' % ( function_str, str(url), err)) fp_in_successful = None try: fp_learn = False if learn_parent_id: generation += 1 ionosphere_job = 'learn_repetitive_patterns' slack_ionosphere_job = 'learn_repetitive_patterns' user_id = 1 label = 'LEARNT - repetitive pattern' fp_id, fp_in_successful, fp_exists, fail_msg, traceback_format_exc = create_features_profile(skyline_app, ts, metric, create_context, ionosphere_job, learn_parent_id, generation, fp_learn, slack_ionosphere_job, user_id, label) if fp_exists: logger.warning('warning :: %s :: failed to create a features profile for %s, %s as an fp already exists' % ( function_str, metric, str(ts))) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed to create a features profile for %s, %s' % ( function_str, metric, str(timestamp))) if fp_in_successful is False: logger.error(traceback_format_exc) logger.error(fail_msg) logger.error('error :: %s :: failed to create a features profile for %s, %s' % ( function_str, metric, str(ts))) continue if not fp_exists: created_fp_ids.append(fp_id) logger.info('%s :: features profile with id %s created' % (function_str, str(fp_id))) learn_parent_id = int(fp_id) logger.info('%s :: created %s features profiles: %s' % ( function_str, str(len(created_fp_ids)), str(created_fp_ids))) return