Source code for ionosphere_functions

from __future__ import division
import logging
from os import path
import time
# import string
# import operator
import re
import csv
# import datetime
import shutil
import glob
from ast import literal_eval
import traceback
from datetime import datetime

from redis import StrictRedis
from sqlalchemy import (
    create_engine, Column, Table, Integer, String, MetaData, DateTime)
from sqlalchemy.dialects.mysql import DOUBLE, TINYINT
from sqlalchemy.sql import select
# import json
from tsfresh import __version__ as tsfresh_version

import settings
import skyline_version
from skyline_functions import (
    RepresentsInt, mkdir_p, write_data_to_file)
from tsfresh_feature_names import TSFRESH_FEATURES

from database import (
    get_engine, ionosphere_table_meta, metrics_table_meta,
    # @added 20180414 - Branch #2270: luminosity
    luminosity_table_meta)

skyline_version = skyline_version.__absolute_version__

try:
    full_duration_seconds = int(settings.FULL_DURATION)
except:
    full_duration_seconds = 86400

full_duration_in_hours = full_duration_seconds / 60 / 60

# @created 20170114 - Feature #1854: Ionosphere learn
# This function was moved in its entirety from webapp/ionosphere_backend.py
# so as to decouple the creation of features profiles from the webapp as
# ionosphere/learn.py now requires the ability to create features profiles to.
# The only things modified were that current_skyline_app, current_logger and
# generations parameters were added the function:
# ionosphere_job, parent_id, generation


[docs]def fp_create_get_an_engine(current_skyline_app): current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) try: engine, fail_msg, trace = get_engine(current_skyline_app) return engine, fail_msg, trace except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: fp_create_get_an_engine :: failed to get MySQL engine' current_logger.error('%s' % fail_msg) return None, fail_msg, trace
[docs]def fp_create_engine_disposal(current_skyline_app, engine): current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) if engine: current_logger.error('fp_create_engine_disposal :: calling engine.dispose()') try: engine.dispose() except: current_logger.error(traceback.format_exc()) current_logger.error('error :: fp_create_engine_disposal :: calling engine.dispose()') return
# @added 20170115 - Feature #1854: Ionosphere learn - generations # Added determination of the learn related variables so that Panorama, # webapp/ionosphere and learn can access this function to determine what the # default IONOSPHERE_LEARN_DEFAULT_ or if a namespace has specific values in # settings.IONOSPHERE_LEARN_NAMESPACE_CONFIG # learn_full_duration_days, learn_valid_ts_older_than, # max_generations and max_percent_diff_from_origin value to the # insert statement if the table is the metrics table. # Set the learn generations variables with the IONOSPHERE_LEARN_DEFAULT_ and any # settings.IONOSPHERE_LEARN_NAMESPACE_CONFIG values. These will later be # overridden by any database values determined for the specific metric if # they exist.
[docs]def get_ionosphere_learn_details(current_skyline_app, base_name): """ Determines what the default ``IONOSPHERE_LEARN_DEFAULT_`` values and what the specific override values are if the metric matches a pattern defined in :mod:`settings.IONOSPHERE_LEARN_NAMESPACE_CONFIG`. This is used in Panorama, webapp/ionosphere_backend :param current_skyline_app: the Skyline app name calling the function :param base_name: thee base_name of the metric :type current_skyline_app: str :type base_name: str :return: tuple :return: (use_full_duration, valid_learning_duration, use_full_duration_days, max_generations, max_percent_diff_from_origin) :rtype: (int, int, int, int, float) """ current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) use_full_duration = None valid_learning_duration = None use_full_duration_days = None max_generations = None max_percent_diff_from_origin = None try: use_full_duration = int(settings.IONOSPHERE_LEARN_DEFAULT_FULL_DURATION_DAYS * 86400) valid_learning_duration = int(settings.IONOSPHERE_LEARN_DEFAULT_VALID_TIMESERIES_OLDER_THAN_SECONDS) use_full_duration_days = int(settings.IONOSPHERE_LEARN_DEFAULT_FULL_DURATION_DAYS) max_generations = int(settings.IONOSPHERE_LEARN_DEFAULT_MAX_GENERATIONS) max_percent_diff_from_origin = float(settings.IONOSPHERE_LEARN_DEFAULT_MAX_PERCENT_DIFF_FROM_ORIGIN) for namespace_config in settings.IONOSPHERE_LEARN_NAMESPACE_CONFIG: NAMESPACE_MATCH_PATTERN = str(namespace_config[0]) pattern_match = False try: # Match by regex namespace_match_pattern = re.compile(NAMESPACE_MATCH_PATTERN) pattern_match = namespace_match_pattern.match(base_name) if pattern_match: try: use_full_duration_days = int(namespace_config[1]) use_full_duration = int(namespace_config[1]) * 86400 valid_learning_duration = int(namespace_config[2]) max_generations = int(namespace_config[3]) max_percent_diff_from_origin = float(namespace_config[4]) current_logger.info('get_ionosphere_learn_details :: %s matches %s' % (base_name, str(namespace_config))) break except: pattern_match = False except: pattern_match = False if not pattern_match: # Match by substring if str(namespace_config[0]) in base_name: try: use_full_duration_days = int(namespace_config[1]) use_full_duration = int(namespace_config[1]) * 86400 valid_learning_duration = int(namespace_config[2]) max_generations = int(namespace_config[3]) max_percent_diff_from_origin = float(namespace_config[4]) current_logger.info('get_ionosphere_learn_details :: %s matches %s' % (base_name, str(namespace_config))) break except: pattern_match = False if not pattern_match: current_logger.info('get_ionosphere_learn_details :: no specific namespace matches found, using default settings') else: current_logger.info('get_ionosphere_learn_details :: found namespace config match settings') except: current_logger.error(traceback.format_exc()) current_logger.error('error :: get_ionosphere_learn_details :: failed to check namespace config settings matches') current_logger.info('get_ionosphere_learn_details :: use_full_duration_days :: %s days' % (str(use_full_duration_days))) current_logger.info('get_ionosphere_learn_details :: use_full_duration :: %s seconds' % (str(use_full_duration))) current_logger.info('get_ionosphere_learn_details :: valid_learning_duration :: %s seconds' % (str(valid_learning_duration))) current_logger.info('get_ionosphere_learn_details :: max_generations :: %s' % (str(max_generations))) current_logger.info('get_ionosphere_learn_details :: max_percent_diff_from_origin :: %s' % (str(max_percent_diff_from_origin))) return use_full_duration, valid_learning_duration, use_full_duration_days, max_generations, max_percent_diff_from_origin
[docs]def create_features_profile(current_skyline_app, requested_timestamp, data_for_metric, context, ionosphere_job, fp_parent_id, fp_generation, fp_learn): """ Add a features_profile to the Skyline ionosphere database table. :param current_skyline_app: Skyline app name :param requested_timestamp: The timestamp of the dir that the features profile data is in :param data_for_metric: The base_name of the metric :param context: The context of the caller :param ionosphere_job: The ionosphere_job name related to creation request valid jobs are ``learn_fp_human``, ``learn_fp_generation``, ``learn_fp_learnt`` and ``learn_fp_automatic``. :param fp_parent_id: The id of the parent features profile that this was learnt from, 0 being an original human generated features profile :param fp_generation: The number of generations away for the original human generated features profile, 0 being an original human generated features profile. :param fp_learn: Whether Ionosphere should learn at use_full_duration_days :type current_skyline_app: str :type requested_timestamp: int :type data_for_metric: str :type context: str :type ionosphere_job: str :type fp_parent_id: int :type fp_generation: int :type fp_learn: boolean :return: fp_id, fp_in_successful, fp_exists, fail_msg, traceback_format_exc :rtype: str, boolean, boolean, str, str """ current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) base_name = data_for_metric.replace(settings.FULL_NAMESPACE, '', 1) if context == 'training_data': ionosphere_job = 'learn_fp_human' current_logger.info('create_features_profile :: %s :: requested for %s at %s' % ( context, str(base_name), str(requested_timestamp))) metric_timeseries_dir = base_name.replace('.', '/') # @modified 20190327 - Feature #2484: FULL_DURATION feature profiles # Added context ionosphere_echo if context == 'training_data' or context == 'ionosphere_echo' or context == 'ionosphere_echo_check': metric_training_data_dir = '%s/%s/%s' % ( settings.IONOSPHERE_DATA_FOLDER, str(requested_timestamp), metric_timeseries_dir) if context == 'features_profiles': metric_training_data_dir = '%s/%s/%s' % ( settings.IONOSPHERE_PROFILES_FOLDER, metric_timeseries_dir, str(requested_timestamp)) # @added 20170113 - Feature #1854: Ionosphere learn if context == 'ionosphere_learn': # @modified 20170116 - Feature #1854: Ionosphere learn # Allowing ionosphere_learn to create a features profile for a training # data set that it has learnt is not anomalous if ionosphere_job != 'learn_fp_automatic': metric_training_data_dir = '%s/%s/%s' % ( settings.IONOSPHERE_LEARN_FOLDER, str(requested_timestamp), metric_timeseries_dir) else: metric_training_data_dir = '%s/%s/%s' % ( settings.IONOSPHERE_DATA_FOLDER, str(requested_timestamp), metric_timeseries_dir) features_file = '%s/%s.tsfresh.input.csv.features.transposed.csv' % ( metric_training_data_dir, base_name) features_profile_dir = '%s/%s' % ( settings.IONOSPHERE_PROFILES_FOLDER, metric_timeseries_dir) ts_features_profile_dir = '%s/%s' % ( features_profile_dir, str(requested_timestamp)) features_profile_created_file = '%s/%s.%s.fp.created.txt' % ( metric_training_data_dir, str(requested_timestamp), base_name) features_profile_details_file = '%s/%s.%s.fp.details.txt' % ( metric_training_data_dir, str(requested_timestamp), base_name) anomaly_check_file = '%s/%s.txt' % (metric_training_data_dir, base_name) trace = 'none' fail_msg = 'none' new_fp_id = False calculated_time = False fcount = None fsum = None # @added 20170104 - Feature #1842: Ionosphere - Graphite now graphs # Added the ts_full_duration parameter so that the appropriate graphs can be # embedded for the user in the training data page ts_full_duration = '0' if context == 'ionosphere_learn': if not path.isfile(features_profile_details_file): current_logger.error('error :: create_features_profile :: no features_profile_details_file - %s' % features_profile_details_file) return 'none', False, False, fail_msg, trace if path.isfile(features_profile_details_file): current_logger.info('create_features_profile :: getting features profile details from - %s' % features_profile_details_file) # Read the details file with open(features_profile_details_file, 'r') as f: fp_details_str = f.read() fp_details = literal_eval(fp_details_str) calculated_time = str(fp_details[2]) fcount = str(fp_details[3]) fsum = str(fp_details[4]) try: ts_full_duration = str(fp_details[5]) except: current_logger.error('error :: create_features_profile :: could not determine the full duration from - %s' % features_profile_details_file) ts_full_duration = '0' if context != 'ionosphere_learn': if ts_full_duration == '0': if path.isfile(anomaly_check_file): current_logger.info('create_features_profile :: determining the full duration from anomaly_check_file - %s' % anomaly_check_file) # Read the details file with open(anomaly_check_file, 'r') as f: anomaly_details = f.readlines() for i, line in enumerate(anomaly_details): if 'full_duration' in line: _ts_full_duration = '%s' % str(line).split("'", 2) full_duration_array = literal_eval(_ts_full_duration) ts_full_duration = str(int(full_duration_array[1])) current_logger.info('create_features_profile :: determined the full duration as - %s' % str(ts_full_duration)) if path.isfile(features_profile_created_file): # Read the created file with open(features_profile_created_file, 'r') as f: fp_created_str = f.read() fp_created = literal_eval(fp_created_str) new_fp_id = fp_created[0] return str(new_fp_id), True, True, fail_msg, trace # Have data if path.isfile(features_file): current_logger.info('create_features_profile :: features_file exists: %s' % features_file) else: trace = traceback.format_exc() current_logger.error(trace) fail_msg = 'error :: create_features_profile :: features_file does not exist: %s' % features_file current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # Raise to webbapp I believe to provide traceback to user in UI raise else: return False, False, False, fail_msg, trace features_data = [] with open(features_file, 'rb') as fr: reader = csv.reader(fr, delimiter=',') for i, line in enumerate(reader): feature_name_item = False fname_id = False f_value = False feature_name = str(line[0]) feature_name_item = filter( lambda x: x[1] == feature_name, TSFRESH_FEATURES) if feature_name_item: feature_name_list = feature_name_item[0] fname_id = int(feature_name_list[0]) f_value = str(line[1]) if fname_id and f_value: features_data.append([fname_id, f_value]) # @added 20170113 - Feature #1854: Ionosphere learn - generations # Set the learn generations variables with the IONOSPHERE_LEARN_DEFAULT_ and any # settings.IONOSPHERE_LEARN_NAMESPACE_CONFIG values. These will later be # overridden by any database values determined for the specific metric if # they exist. # Set defaults use_full_duration_days = int(settings.IONOSPHERE_LEARN_DEFAULT_FULL_DURATION_DAYS) valid_learning_duration = int(settings.IONOSPHERE_LEARN_DEFAULT_VALID_TIMESERIES_OLDER_THAN_SECONDS) max_generations = int(settings.IONOSPHERE_LEARN_DEFAULT_MAX_GENERATIONS) max_percent_diff_from_origin = float(settings.IONOSPHERE_LEARN_DEFAULT_MAX_PERCENT_DIFF_FROM_ORIGIN) try: use_full_duration, valid_learning_duration, use_full_duration_days, max_generations, max_percent_diff_from_origin = get_ionosphere_learn_details(current_skyline_app, base_name) learn_full_duration_days = use_full_duration_days except: current_logger.error(traceback.format_exc()) current_logger.error('error :: create_features_profile :: failed to get_ionosphere_learn_details') current_logger.info('create_features_profile :: learn_full_duration_days :: %s days' % (str(learn_full_duration_days))) current_logger.info('create_features_profile :: valid_learning_duration :: %s seconds' % (str(valid_learning_duration))) current_logger.info('create_features_profile :: max_generations :: %s' % (str(max_generations))) current_logger.info('create_features_profile :: max_percent_diff_from_origin :: %s' % (str(max_percent_diff_from_origin))) current_logger.info('create_features_profile :: getting MySQL engine') try: engine, fail_msg, trace = fp_create_get_an_engine(current_skyline_app) current_logger.info(fail_msg) except: trace = traceback.format_exc() current_logger.error(trace) fail_msg = 'error :: create_features_profile :: could not get a MySQL engine' current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # Raise to webbapp I believe to provide traceback to user in UI raise else: return False, False, False, fail_msg, trace if not engine: trace = 'none' fail_msg = 'error :: create_features_profile :: engine not obtained' current_logger.error(fail_msg) if context == 'training' or context == 'features_profile': # Raise to webbapp I believe to provide traceback to user in UI raise else: return False, False, False, fail_msg, trace # Get metric details from the database metrics_id = False # Use the learn details as per config metric_learn_full_duration_days = int(use_full_duration_days) metric_learn_valid_ts_older_than = int(valid_learning_duration) metric_max_generations = int(max_generations) metric_max_percent_diff_from_origin = int(max_percent_diff_from_origin) metrics_table = None try: metrics_table, fail_msg, trace = metrics_table_meta(current_skyline_app, engine) current_logger.info(fail_msg) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to get metrics_table meta for %s' % base_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: disposing of any engine') fp_create_engine_disposal(current_skyline_app, engine) return False, False, False, fail_msg, trace current_logger.info('create_features_profile :: metrics_table OK') try: connection = engine.connect() # @modified 20161209 - - Branch #922: ionosphere # Task #1658: Patterning Skyline Ionosphere # result = connection.execute('select id from metrics where metric=\'%s\'' % base_name) # for row in result: # while not metrics_id: # metrics_id = row['id'] stmt = select([metrics_table]).where(metrics_table.c.metric == base_name) result = connection.execute(stmt) for row in result: metrics_id = row['id'] # @added 20170113 - Feature #1854: Ionosphere learn - generations # Added Ionosphere LEARN generation related variables try: metric_learn_full_duration_days = int(row['learn_full_duration_days']) metric_learn_valid_ts_older_than = int(row['learn_valid_ts_older_than']) metric_max_generations = int(row['max_generations']) metric_max_percent_diff_from_origin = float(row['max_percent_diff_from_origin']) except: current_logger.error('error :: create_features_profile :: failed to determine learn related values from DB for %s' % base_name) row = result.fetchone() # metric_db_object = row connection.close() current_logger.info('create_features_profile :: determined db metric id: %s' % str(metrics_id)) current_logger.info('create_features_profile :: determined db metric learn_full_duration_days: %s' % str(metric_learn_full_duration_days)) current_logger.info('create_features_profile :: determined db metric learn_valid_ts_older_than: %s' % str(metric_learn_valid_ts_older_than)) current_logger.info('create_features_profile :: determined db metric max_generations: %s' % str(metric_max_generations)) current_logger.info('create_features_profile :: determined db metric max_percent_diff_from_origin: %s' % str(metric_max_percent_diff_from_origin)) except: trace = traceback.format_exc() current_logger.error(trace) fail_msg = 'error :: create_features_profile :: could not determine id of metric from DB: %s' % base_name current_logger.error('%s' % fail_msg) if metric_learn_full_duration_days: learn_full_duration_days = metric_learn_full_duration_days # learn_full_duration = int(learn_full_duration_days) * 86400 if metric_learn_valid_ts_older_than: learn_valid_ts_older_than = metric_learn_valid_ts_older_than if metric_max_generations: max_generations = metric_max_generations if metric_max_percent_diff_from_origin: max_percent_diff_from_origin = metric_max_percent_diff_from_origin current_logger.info('create_features_profile :: generation info - learn_full_duration_days :: %s' % (str(learn_full_duration_days))) current_logger.info('create_features_profile :: generation info - learn_valid_ts_older_than :: %s' % (str(learn_valid_ts_older_than))) current_logger.info('create_features_profile :: generation info - max_generations :: %s' % (str(max_generations))) current_logger.info('create_features_profile :: generation info - max_percent_diff_from_origin :: %s' % (str(max_percent_diff_from_origin))) # @added 20170120 - Feature #1854: Ionosphere learn # Always use the timestamp from the anomaly file use_anomaly_timestamp = int(requested_timestamp) # @modified 20190327 - Feature #2484: FULL_DURATION feature profiles # Added ionosphere_echo if context == 'ionosphere_learn' or context == 'ionosphere_echo' or context == 'ionosphere_echo_check': if path.isfile(anomaly_check_file): current_logger.info('create_features_profile :: determining the metric_timestamp from anomaly_check_file - %s' % anomaly_check_file) # Read the details file with open(anomaly_check_file, 'r') as f: anomaly_details = f.readlines() for i, line in enumerate(anomaly_details): if 'metric_timestamp' in line: _metric_timestamp = '%s' % str(line).split("'", 2) metric_timestamp_array = literal_eval(_metric_timestamp) use_anomaly_timestamp = (int(metric_timestamp_array[1])) current_logger.info('create_features_profile :: determined the anomaly metric_timestamp as - %s' % str(use_anomaly_timestamp)) ionosphere_table = None try: ionosphere_table, fail_msg, trace = ionosphere_table_meta(current_skyline_app, engine) current_logger.info(fail_msg) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to get ionosphere_table meta for %s' % base_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # Raise to webbapp I believe to provide traceback to user in UI # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) raise else: current_logger.info('create_features_profile :: disposing of any engine') fp_create_engine_disposal(current_skyline_app, engine) return False, False, False, fail_msg, trace current_logger.info('create_features_profile :: ionosphere_table OK') # @added 20170403 - Feature #2000: Ionosphere - validated # Set all learn_fp_human features profiles to validated. fp_validated = 0 if ionosphere_job == 'learn_fp_human': fp_validated = 1 # @added 20170424 - Feature #2000: Ionosphere - validated # Set all generation 0 and 1 as validated if int(fp_generation) <= 1: fp_validated = 1 # @modified 20190327 - Feature #2484: FULL_DURATION feature profiles # Added ionosphere_echo if context == 'ionosphere_echo' or context == 'ionosphere_echo_check': echo_fp_value = 1 else: echo_fp_value = 0 new_fp_id = False try: connection = engine.connect() # @added 20170113 - Feature #1854: Ionosphere learn # Added learn values parent_id, generation # @modified 20170120 - Feature #1854: Ionosphere learn # Added anomaly_timestamp # @modified 20170403 - Feature #2000: Ionosphere - validated # @modified 20190327 - Feature #2484: FULL_DURATION feature profiles # Added ionosphere_echo echo_fp # @modified 20190327 - Feature #2484: FULL_DURATION feature profiles # Handle ionosphere_echo change in timestamp to the next second and a mismatch # of 1 second between the features profile directory timestamp and the DB # created_timestamp if context == 'ionosphere_echo' or context == 'ionosphere_echo_check': ts_for_db = int(requested_timestamp) db_created_timestamp = datetime.utcfromtimestamp(ts_for_db).strftime('%Y-%m-%d %H:%M:%S') ins = ionosphere_table.insert().values( metric_id=int(metrics_id), full_duration=int(ts_full_duration), anomaly_timestamp=int(use_anomaly_timestamp), enabled=1, tsfresh_version=str(tsfresh_version), calc_time=calculated_time, features_count=fcount, features_sum=fsum, parent_id=fp_parent_id, generation=fp_generation, validated=fp_validated, echo_fp=echo_fp_value, created_timestamp=db_created_timestamp) else: ins = ionosphere_table.insert().values( metric_id=int(metrics_id), full_duration=int(ts_full_duration), anomaly_timestamp=int(use_anomaly_timestamp), enabled=1, tsfresh_version=str(tsfresh_version), calc_time=calculated_time, features_count=fcount, features_sum=fsum, parent_id=fp_parent_id, generation=fp_generation, validated=fp_validated, echo_fp=echo_fp_value) result = connection.execute(ins) connection.close() new_fp_id = result.inserted_primary_key[0] current_logger.info('create_features_profile :: new ionosphere fp_id: %s' % str(new_fp_id)) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to insert a new record into the ionosphere table for %s' % base_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: disposing of any engine') fp_create_engine_disposal(current_skyline_app, engine) return False, False, False, fail_msg, trace if not RepresentsInt(new_fp_id): trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: unknown new ionosphere new_fp_id for %s' % base_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: disposing of any engine') fp_create_engine_disposal(current_skyline_app, engine) return False, False, False, fail_msg, trace # Create z_fp_<metric_id> table fp_table_created = False fp_table_name = 'z_fp_%s' % str(metrics_id) try: fp_meta = MetaData() # @modified 20161222 - Task #1812: z_fp table type # Changed to InnoDB from MyISAM as no files open issues and MyISAM clean # up, there can be LOTS of file_per_table z_fp_ tables/files without # the MyISAM issues. z_fp_ tables are mostly read and will be shuffled # in the table cache as required. fp_metric_table = Table( fp_table_name, fp_meta, Column('id', Integer, primary_key=True), Column('fp_id', Integer, nullable=False, key='fp_id'), Column('feature_id', Integer, nullable=False), Column('value', DOUBLE(), nullable=True), mysql_charset='utf8', # @modified 20180324 - Bug #2340: MySQL key_block_size # MySQL key_block_size #45 # Removed as under MySQL 5.7 breaks # mysql_key_block_size='255', mysql_engine='InnoDB') fp_metric_table.create(engine, checkfirst=True) fp_table_created = True except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to create table - %s' % fp_table_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: %s - automated so the table should exists continuing' % context) if not fp_table_created: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to determine True for create table - %s' % fp_table_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: %s - automated so the table should exists continuing' % context) # Insert features and values insert_statement = [] for fname_id, f_value in features_data: insert_statement.append({'fp_id': new_fp_id, 'feature_id': fname_id, 'value': f_value},) if insert_statement == []: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: empty insert_statement for %s inserts' % fp_table_name current_logger.error('%s' % fail_msg) # raise # else: # feature_count = sum(1 for x in a if isinstance(x, insert_statement)) # current_logger.info( # 'fp_id - %s - %s feature values in insert_statement for %s ' % # (str(feature_count), str(new_fp_id), fp_table_name)) # feature_count = sum(1 for x in a if isinstance(x, insert_statement)) # current_logger.info( # 'fp_id - %s - feature values in insert_statement for %s ' % # (str(new_fp_id), fp_table_name)) try: connection = engine.connect() connection.execute(fp_metric_table.insert(), insert_statement) connection.close() current_logger.info('create_features_profile :: fp_id - %s - feature values inserted into %s' % (str(new_fp_id), fp_table_name)) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to insert a feature values into %s' % fp_table_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: %s - automated so the table should exists continuing' % context) # Create metric ts table if not exists ts_<metric_id> # Create z_ts_<metric_id> table # @modified 20170121 - Feature #1854: Ionosphere learn - generations # TODO Adding the option to not save timeseries to DB, as default? # ts_table_created = False ts_table_name = 'z_ts_%s' % str(metrics_id) try: ts_meta = MetaData() # @modified 20161222 - Task #1812: z_fp table type # Changed to InnoDB from MyISAM as no files open issues and MyISAM clean # up, there can be LOTS of file_per_table z_fp_ tables/files without # the MyISAM issues. z_fp_ tables are mostly read and will be shuffled # in the table cache as required. ts_metric_table = Table( ts_table_name, ts_meta, Column('id', Integer, primary_key=True), Column('fp_id', Integer, nullable=False, key='fp_id'), Column('timestamp', Integer, nullable=False), Column('value', DOUBLE(), nullable=True), mysql_charset='utf8', # @modified 20180324 - Bug #2340: MySQL key_block_size # MySQL key_block_size #45 # Removed as under MySQL 5.7 breaks # mysql_key_block_size='255', mysql_engine='InnoDB') ts_metric_table.create(engine, checkfirst=True) # ts_table_created = True current_logger.info('create_features_profile :: metric ts table created OK - %s' % (ts_table_name)) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to create table - %s' % ts_table_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: current_logger.info('create_features_profile :: %s - automated so the table should exists continuing' % context) # Insert timeseries that the features profile was created from raw_timeseries = [] anomaly_json = '%s/%s.json' % (metric_training_data_dir, base_name) # @added 20190327 - Feature #2484: FULL_DURATION feature profiles if context == 'ionosphere_echo' or context == 'ionosphere_echo_check': full_duration_in_hours = int(settings.FULL_DURATION / 60 / 60) anomaly_json = '%s/%s.mirage.redis.%sh.json' % (metric_training_data_dir, base_name, str(full_duration_in_hours)) if path.isfile(anomaly_json): current_logger.info('create_features_profile :: metric anomaly json found OK - %s' % (anomaly_json)) try: # Read the timeseries json file with open(anomaly_json, 'r') as f: raw_timeseries = f.read() except: trace = traceback.format_exc() current_logger.error(trace) fail_msg = 'error :: create_features_profile :: failed to read timeseries data from %s' % anomaly_json current_logger.error('%s' % (fail_msg)) fail_msg = 'error: failed to read timeseries data from %s' % anomaly_json # end = timer() if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) # Raise to webbapp I believe to provide traceback to user in UI raise else: trace = 'none' fail_msg = 'error: file not found - %s' % (anomaly_json) current_logger.error(fail_msg) # raise # Convert the timeseries to csv timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']') del raw_timeseries timeseries = literal_eval(timeseries_array_str) datapoints = timeseries validated_timeseries = [] for datapoint in datapoints: try: new_datapoint = [str(int(datapoint[0])), float(datapoint[1])] validated_timeseries.append(new_datapoint) # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests except: # nosec continue del timeseries del timeseries_array_str del datapoints insert_statement = [] for ts, value in validated_timeseries: insert_statement.append({'fp_id': new_fp_id, 'timestamp': ts, 'value': value},) try: connection = engine.connect() connection.execute(ts_metric_table.insert(), insert_statement) connection.close() current_logger.info('create_features_profile :: fp_id - %s - timeseries inserted into %s' % (str(new_fp_id), ts_table_name)) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to insert the timeseries into %s' % ts_table_name current_logger.error('%s' % fail_msg) if context == 'training' or context == 'features_profile': # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) raise else: current_logger.info('create_features_profile :: %s - automated so the table should exist continuing' % context) del validated_timeseries del insert_statement # Create a created features profile file try: # data = '[%s, %s, ]' % (new_fp_id, str(int(time.time()))) # write_data_to_file(skyline_app, features_profile_created_file, 'w', data) # @modified 20170115 - Feature #1854: Ionosphere learn - generations # Added parent_id and generation data = '[%s, %s, \'%s\', %s, %s, %s, %s, %s, %s]' % ( new_fp_id, str(int(time.time())), str(tsfresh_version), str(calculated_time), str(fcount), str(fsum), str(ts_full_duration), str(fp_parent_id), str(fp_generation)) write_data_to_file(current_skyline_app, features_profile_created_file, 'w', data) del data except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: create_features_profile :: failed to write fp.created file' current_logger.error('%s' % fail_msg) # Set ionosphere_enabled for the metric try: # update_statement = 'UPDATE metrics SET ionosphere_enabled=1 WHERE id=%s' % str(metrics_id) connection = engine.connect() # result = connection.execute('UPDATE metrics SET ionosphere_enabled=1 WHERE id=%s' % str(metrics_id)) # connection.execute(ts_metric_table.insert(), insert_statement) connection.execute( metrics_table.update( metrics_table.c.id == metrics_id).values(ionosphere_enabled=1)) connection.close() current_logger.info('create_features_profile :: ionosphere_enabled set on metric id: %s' % str(metrics_id)) except: trace = traceback.format_exc() current_logger.error(trace) fail_msg = 'error :: create_features_profile :: could not update metrics table and set ionosphere_enabled on id %s' % str(metrics_id) current_logger.error('%s' % fail_msg) # raise # Copy data from training data dir to features_profiles dir if not path.isdir(ts_features_profile_dir): mkdir_p(ts_features_profile_dir) if path.isdir(ts_features_profile_dir): current_logger.info('create_features_profile :: fp_id - %s - features profile dir created - %s' % (str(new_fp_id), ts_features_profile_dir)) # src_files = os.listdir(src) # for file_name in src_files: # full_file_name = path.join(src, file_name) # if (path.isfile(full_file_name)): # shutil.copy(full_file_name, dest) data_files = [] try: glob_path = '%s/*.*' % metric_training_data_dir data_files = glob.glob(glob_path) except: trace = traceback.format_exc() current_logger.error('%s' % trace) current_logger.error('error :: create_features_profile :: glob - fp_id - %s - training data not copied to %s' % (str(new_fp_id), ts_features_profile_dir)) for i_file in data_files: try: shutil.copy(i_file, ts_features_profile_dir) current_logger.info('create_features_profile :: fp_id - %s - training data copied - %s' % (str(new_fp_id), i_file)) except shutil.Error as e: trace = traceback.format_exc() current_logger.error('%s' % trace) current_logger.error('error :: create_features_profile :: shutil error - fp_id - %s - training data not copied to %s' % (str(new_fp_id), ts_features_profile_dir)) current_logger.error('error :: create_features_profile :: %s' % (e)) # Any error saying that the directory doesn't exist except OSError as e: trace = traceback.format_exc() current_logger.error('%s' % trace) current_logger.error('error :: create_features_profile :: OSError error - fp_id - %s - training data not copied to %s' % (str(new_fp_id), ts_features_profile_dir)) current_logger.error('error :: create_features_profile :: %s' % (e)) current_logger.info('create_features_profile :: fp_id - %s - training data copied to %s' % (str(new_fp_id), ts_features_profile_dir)) else: current_logger.error('error :: create_features_profile :: fp_id - %s - training data not copied to %s' % (str(new_fp_id), ts_features_profile_dir)) current_logger.info('create_features_profile :: disposing of any engine') try: if engine: fp_create_engine_disposal(current_skyline_app, engine) else: current_logger.info('create_features_profile :: no engine to dispose of' % (str(new_fp_id), ts_features_profile_dir)) except: trace = traceback.format_exc() current_logger.error('%s' % trace) current_logger.error('error :: create_features_profile :: OSError error - fp_id - %s - training data not copied to %s' % (str(new_fp_id), ts_features_profile_dir)) # @added 20170113 - Feature #1854: Ionosphere learn - Redis ionosphere.learn.work namespace # Ionosphere learn needs Redis works sets # When a features profile is created there needs to be work added to a Redis # set. When a human makes a features profile, we want Ionosphere to make a # use_full_duration_days features profile valid_learning_duration (e.g. # 3361) later. if settings.IONOSPHERE_LEARN and new_fp_id: create_redis_work_item = False if context == 'training_data' and ionosphere_job == 'learn_fp_human': create_redis_work_item = True # @modified 20170120 - Feature #1854: Ionosphere learn - generations # Added fp_learn parameter to allow the user to not learn the # use_full_duration_days if not fp_learn: create_redis_work_item = False current_logger.info('fp_learn is False not adding an item to Redis ionosphere.learn.work set') if ionosphere_job == 'learn_fp_automatic': create_redis_work_item = True # @added 20170131 - Feature #1886 Ionosphere learn - child like parent with evolutionary maturity # TODO: here a check may be required to evaluate whether the origin_fp_id # had a use_full_duration features profile created, however # due to the fact that it is in learn, suggests that it did # have, not 100% sure. # origin_fp_id_was_allowed_to_learn = False child_use_full_duration_count_of_origin_fp_id = 1 # TODO: Determine the state # child_use_full_duration_count_of_origin_fp_id = SELECT COUNT(id) FROM ionosphere WHERE parent_id=origin_fp_id AND full_duration=use_full_duration if child_use_full_duration_count_of_origin_fp_id == 0: current_logger.info('the origin parent was not allowed to learn not adding to Redis ionosphere.learn.work set') create_redis_work_item = False if create_redis_work_item: try: current_logger.info( 'adding work to Redis ionosphere.learn.work set - [\'Soft\', \'%s\', %s, \'%s\', %s, %s] to make a learn features profile later' % ( str(ionosphere_job), str(requested_timestamp), base_name, str(new_fp_id), str(fp_generation))) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) # @modified 20190414 - Task #2824: Test redis-py upgrade # Task #2926: Update dependencies # redis_conn.sadd('ionosphere.learn.work', ['Soft', str(ionosphere_job), int(requested_timestamp), base_name, int(new_fp_id), int(fp_generation)]) redis_conn.sadd('ionosphere.learn.work', str(['Soft', str(ionosphere_job), int(requested_timestamp), base_name, int(new_fp_id), int(fp_generation)])) except: current_logger.error(traceback.format_exc()) current_logger.error( 'error :: failed adding work to Redis ionosphere.learn.work set - [\'Soft\', \'%s\', %s, \'%s\', %s, %s] to make a learn features profile later' % ( str(ionosphere_job), str(requested_timestamp), base_name, str(new_fp_id), str(fp_generation))) # @added 20170806 - Bug #2130: MySQL - Aborted_clients # Added missing disposal if engine: fp_create_engine_disposal(current_skyline_app, engine) return str(new_fp_id), True, False, fail_msg, trace
# @added 20180414 - Branch #2270: luminosity
[docs]def get_correlations(current_skyline_app, anomaly_id): """ Get all the correlations for an anomaly from the database :param current_skyline_app: the Skyline app name calling the function :param anomaly_id: thee base_name of the metric :type current_skyline_app: str :type anomaly_id: int :return: list :return: [[metric_name, coefficient, shifted, shifted_coefficient],[metric_name, coefficient, ...]] :rtype: [[str, float, float, float]] """ current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) func_name = 'get_correlations' correlations = [] current_logger.info('get_correlations :: getting MySQL engine') try: engine, fail_msg, trace = fp_create_get_an_engine(current_skyline_app) current_logger.info(fail_msg) except: trace = traceback.format_exc() current_logger.error(trace) fail_msg = 'error :: could not get a MySQL engine' current_logger.error('%s' % fail_msg) # return False, False, fail_msg, trace, False raise # to webapp to return in the UI metrics_table = None try: metrics_table, fail_msg, trace = metrics_table_meta(current_skyline_app, engine) current_logger.info(fail_msg) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: %s :: failed to get metrics_table_meta' % func_name current_logger.error('%s' % fail_msg) luminosity_table = None try: luminosity_table, fail_msg, trace = luminosity_table_meta(current_skyline_app, engine) current_logger.info(fail_msg) except: trace = traceback.format_exc() current_logger.error('%s' % trace) fail_msg = 'error :: %s :: failed to get luminosity_table_meta' % func_name current_logger.error('%s' % fail_msg) metrics_list = [] try: connection = engine.connect() stmt = select([metrics_table]).where(metrics_table.c.id > 0) results = connection.execute(stmt) for row in results: metric_id = row['id'] metric_name = row['metric'] metrics_list.append([int(metric_id), str(metric_name)]) connection.close() except: current_logger.error(traceback.format_exc()) current_logger.error('error :: could not determine metrics from MySQL') if engine: fp_create_engine_disposal(current_skyline_app, engine) raise try: connection = engine.connect() stmt = select([luminosity_table]).where(luminosity_table.c.id == int(anomaly_id)) result = connection.execute(stmt) for row in result: metric_id = row['metric_id'] metric_name = None if metric_id: # @modified 20180723 - Feature #2470: Correlations Graphite graph links # Branch #2270: luminosity # Return the metric_name as a string not a list, so that the # metric_name string can be used to build links to Graphite # graphs via webapp.py and correlations.html template. # metric_name = [metrics_list_name for metrics_list_id, metrics_list_name in metrics_list if int(metric_id) == int(metrics_list_id)] metric_name_list = [metrics_list_name for metrics_list_id, metrics_list_name in metrics_list if int(metric_id) == int(metrics_list_id)] metric_name = str(metric_name_list[0]) coefficient = row['coefficient'] shifted = row['shifted'] shifted_coefficient = row['shifted_coefficient'] correlations.append([metric_name, coefficient, shifted, shifted_coefficient]) connection.close() except: current_logger.error(traceback.format_exc()) current_logger.error('error :: could not determine correlations for anomaly id - %s' % str(anomaly_id)) if engine: fp_create_engine_disposal(current_skyline_app, engine) raise if engine: fp_create_engine_disposal(current_skyline_app, engine) if correlations: sorted_correlations = sorted(correlations, key=lambda x: x[1], reverse=True) correlations = sorted_correlations return correlations, fail_msg, trace