from __future__ import division
import logging
import os
from time import time, sleep
import shutil
import glob
from sys import version_info
import traceback
from sqlalchemy.sql import (select, desc)
import settings
from skyline_functions import (mkdir_p, get_memcache_metric_object)
from features_profile import calculate_features_profile
from database import (
get_engine, ionosphere_table_meta, metrics_table_meta)
# @added 2017014 - Feature #1854: Ionosphere learn
from ionosphere_functions import create_features_profile
skyline_app = 'ionosphere'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(version_info[0])
this_host = str(os.uname()[1])
# Converting one settings variable into a local variable, just because it is a
# long string otherwise.
try:
ENABLE_IONOSPHERE_DEBUG = settings.ENABLE_IONOSPHERE_DEBUG
except:
logger.error('error :: echo :: cannot determine ENABLE_IONOSPHERE_DEBUG from settings' % skyline_app)
ENABLE_IONOSPHERE_DEBUG = False
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
context = 'ionosphere_echo'
[docs]def echo_get_metric_from_metrics(base_name, engine):
"""
Called by :func:`~echo` and returns the metric id and metric db object
:param timestamp: timestamp at which learn was called
:type timestamp: int
:return: tuple
:rtype: (int, object)
"""
logger = logging.getLogger(skyline_app_logger)
metrics_id = 0
metric_db_object = None
# Get the metrics_table metadata
metrics_table = None
try:
metrics_table, log_msg, trace = metrics_table_meta(skyline_app, engine)
logger.info('echo :: metrics_table OK for %s' % base_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: echo :: failed to get metrics_table meta for %s' % base_name)
return False
try:
connection = engine.connect()
stmt = select([metrics_table]).where(metrics_table.c.metric == base_name)
result = connection.execute(stmt)
row = result.fetchone()
metric_db_object = row
metrics_id = row['id']
connection.close()
except:
logger.error(traceback.format_exc())
logger.error('error :: echo :: could not determine id from metrics table for - %s' % base_name)
return False
return metrics_id, metric_db_object
# @added 20190326 - Feature #2484: FULL_DURATION feature profiles
[docs]def ionosphere_echo(base_name, mirage_full_duration):
logger = logging.getLogger(skyline_app_logger)
child_process_pid = os.getpid()
context = 'ionosphere_echo'
logger.info('ionosphere_echo :: started with child_process_pid - %s for %s' % (str(child_process_pid), base_name))
full_duration_in_hours = int(settings.FULL_DURATION / 60 / 60)
try:
# Allow for 3 seconds
ionosphere_echo_max_fp_create_time = (settings.IONOSPHERE_ECHO_MAX_FP_CREATE_TIME - 3)
except:
ionosphere_echo_max_fp_create_time = 52
echo_started_at = int(time())
def echo_get_an_engine():
try:
engine, fail_msg, trace = get_engine(skyline_app)
return engine, fail_msg, trace
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: ionosphere_echo :: get_an_engine :: failed to get MySQL engine'
logger.error('%s' % fail_msg)
return None, fail_msg, trace
def echo_engine_disposal(engine):
try:
if engine:
try:
engine.dispose()
logger.info('ionosphere_echo :: MySQL engine disposed of')
return True
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: calling engine.dispose()')
else:
logger.info('ionosphere_echo :: no MySQL engine to dispose of')
return True
except:
return False
return False
# Determine the metric details from the database
metrics_id = None
metric_db_object = None
engine = None
# Get the metric db object data to memcache it is exists
metric_db_object = get_memcache_metric_object(skyline_app, base_name)
if metric_db_object:
metrics_id = metric_db_object['id']
logger.info('ionosphere_echo :: determined metric id %s from memcache for %s' % (str(metrics_id), base_name))
else:
# Only if no memcache data
# Get a MySQL engine
try:
engine, log_msg, trace = echo_get_an_engine()
logger.info('ionosphere_echo :: %s' % log_msg)
logger.info('ionosphere_echo :: determining metric id from DB as not found in memcache for %s' % (base_name))
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: could not get a MySQL engine to get metric_db_object')
if not engine:
logger.error('error :: ionosphere_echo :: engine not obtained to get metric_db_object')
logger.info('ionosphere_echo :: exiting this work but not removing work item, as database may be available again before the work expires')
return
try:
metrics_id, metric_db_object = echo_get_metric_from_metrics(base_name, engine)
echo_engine_disposal(engine)
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: failed get the metric details from the database')
logger.info('ionosphere_echo :: exiting this work but not removing work item, as database may be available again before the work expires')
if not metrics_id:
logger.error('error :: ionosphere_echo :: failed get the metrics_id from the database')
logger.info('ionosphere_echo :: exiting this work but not removing work item, as database may be available again before the work expires')
echo_engine_disposal(engine)
return
if not metric_db_object:
logger.error('error :: ionosphere_echo :: failed get the metric_db_object from the database')
logger.info('ionosphere_echo :: exiting this work but not removing work item, as database may be available again before the work expires')
echo_engine_disposal(engine)
return
# Determine the metric fp ids from the database
if not engine:
logger.info('ionosphere_echo :: getting MySQL engine to determine fp ids for metric id %s - %s' % (str(metrics_id), base_name))
try:
engine, log_msg, trace = echo_get_an_engine()
logger.info(log_msg)
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: could not get a MySQL engine to get fp_ids')
if not engine:
logger.error('error :: ionosphere_echo :: engine not obtained to get fp_ids')
return
try:
ionosphere_table, log_msg, trace = ionosphere_table_meta(skyline_app, engine)
logger.info(log_msg)
logger.info('ionosphere_echo :: ionosphere_table OK')
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: failed to get ionosphere_table meta for %s' % base_name)
# Determine the fp_ids that exist for the metric
echo_fp_ids_result = []
try:
connection = engine.connect()
stmt = select([ionosphere_table]).where(ionosphere_table.c.metric_id == metrics_id).order_by(desc(ionosphere_table.c.id))
echo_fp_ids = connection.execute(stmt)
echo_fp_ids_result = [{column: value for column, value in rowproxy.items()} for rowproxy in echo_fp_ids]
connection.close()
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: could not determine fp ids from DB for %s' % base_name)
return
if engine:
echo_engine_disposal(engine)
if not echo_fp_ids_result:
logger.error('error :: ionosphere_echo :: no echo_fp_ids_result - could not determine fp ids from DB for %s' % base_name)
else:
logger.info('ionosphere_echo :: echo_fp_ids_result - determined fp ids from DB for %s' % base_name)
try:
db_fp_count = len(echo_fp_ids_result)
logger.info('ionosphere_echo :: %s features profile ids found' % str(db_fp_count))
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: could not calculate len of echo_fp_ids_result')
echo_enabled_mirage_fp_ids = []
for row in echo_fp_ids_result:
if row['enabled'] != 1:
continue
if row['deleted'] == 1:
continue
# Only create features profiles at settings.FULL_DURATION for Mirage
# features profiles if the fp has been validated
if row['validated'] == 0:
continue
if row['full_duration'] == int(mirage_full_duration):
fp_id = row['id']
# @added 20190413 - Bug #2934: Ionosphere - no mirage.redis.24h.json file
# Feature #1960: ionosphere_layers
# Any features profiles that were created before the introduction of
# ionosphere_layers will not have a mirage.redis.24h.json file as the
# creation of these resources in the training_data dir was only added at
# that point, so they are excluded. Less than 20170307 1488844800 excl.
# @modified 20190508 - Bug #2934: Ionosphere - no mirage.redis.24h.json file
# Feature #1960: ionosphere_layers
# Increased the date as some on the cusp were still erroring, move
# forward 2 days to 20170309
# if int(row['anomaly_timestamp']) < 1488844800:
if int(row['anomaly_timestamp']) < 1489017600:
logger.info('ionosphere_echo :: skipping fp id %s as predates having a mirage.redis json file' % str(fp_id))
continue
echo_enabled_mirage_fp_ids.append(fp_id)
echo_enabled_mirage_fp_ids_count = len(echo_enabled_mirage_fp_ids)
logger.info('ionosphere_echo :: %s Mirage features profile ids found' % str(echo_enabled_mirage_fp_ids_count))
# Check which Mirage features profile do not have has a
# settings.FULL_DURATION features profile
mirage_fd_fp_count = 0
echo_create_fd_fp_for = []
for validated_mirage_fp_id in echo_enabled_mirage_fp_ids:
mirage_fd_fp_exists = False
for row in echo_fp_ids_result:
if int(row['parent_id']) != int(validated_mirage_fp_id):
continue
if int(row['full_duration']) == int(settings.FULL_DURATION):
mirage_fd_fp_exists = True
mirage_fd_fp_count += 1
if not mirage_fd_fp_exists:
echo_create_fd_fp_for.append(int(validated_mirage_fp_id))
del echo_enabled_mirage_fp_ids
logger.info('ionosphere_echo :: there are %s FULL_DURATION features profiles for %s' % (str(mirage_fd_fp_count), base_name))
echo_create_fd_fp_for_count = len(echo_create_fd_fp_for)
logger.info('ionosphere_echo :: %s FULL_DURATION features profiles to be created for %s' % (str(echo_create_fd_fp_for_count), base_name))
echo_created_fp_count = 0
# @added 20190404 - Bug #2904: Initial Ionosphere echo load and Ionosphere feedback
# Feature #2484: FULL_DURATION feature profiles
# Rate limit the creation of ionosphere_echo FULL_DURATION features profiles
# this only effects managing the creation of lots of features profiles if
# Ionosphere echo is enabled on a Skyline instance with lots of existing
# features profiles for Mirage metrics. Only create 5 FULL_DURATION
# features profiles from the latest Mirage based features profiles, which
# takes around 10 seconds
if echo_create_fd_fp_for_count > 5:
logger.info('ionosphere_echo :: rate limiting and only creating the 5 least FULL_DURATION features profiles for %s' % (base_name))
# Reverse, newest first, using slicing to produce a reversed copy
reverse_echo_create_fd_fp_for = echo_create_fd_fp_for[::-1]
rate_limit_echo_create_fd_fp_for = reverse_echo_create_fd_fp_for[0:5]
echo_create_fd_fp_for = rate_limit_echo_create_fd_fp_for
last_created_fp = int(time())
for mirage_fp_id in echo_create_fd_fp_for:
fp_timestamp = None
for row in echo_fp_ids_result:
if int(row['id']) != int(mirage_fp_id):
continue
else:
fp_timestamp = int(row['anomaly_timestamp'])
fp_generation = int(row['generation'])
if not fp_timestamp:
continue
if not fp_generation:
fp_generation = 0
time_check_now = int(time())
echo_runtime = time_check_now - echo_started_at
if echo_runtime >= ionosphere_echo_max_fp_create_time:
logger.info('ionosphere_echo :: ionosphere_echo running for %s seconds, exiting before IONOSPHERE_ECHO_MAX_FP_CREATE_TIME of %s seconds is breached' % (str(echo_runtime), str(ionosphere_echo_max_fp_create_time)))
break
logger.info('ionosphere_echo :: creating FULL_DURATION features profile based on data from fp id %s - %s' % (str(mirage_fp_id), base_name))
context = 'ionosphere_echo'
ionosphere_job = 'learn_fp_human'
generation = fp_generation + 1
fp_learn = False
# What is the path of the features profile files
metric_timeseries_dir = base_name.replace('.', '/')
metric_fp_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_PROFILES_FOLDER, metric_timeseries_dir,
str(fp_timestamp))
# What is the path of the new training data dir to copy the files to
# and create it
created_ts = int(time())
# Ensure features profile creation timestamps do not overlap
if last_created_fp == created_ts:
sleep(1)
created_ts = int(time())
last_created_fp = created_ts
metric_training_data_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(created_ts), metric_timeseries_dir)
if not os.path.exists(metric_training_data_dir):
try:
mkdir_p(metric_training_data_dir)
logger.info('ionosphere_echo :: training data dir created - %s' % metric_training_data_dir)
except:
logger.error('error :: ionosphere_echo :: failed to create training data dir - %s' % metric_training_data_dir)
continue
if not os.path.isdir(metric_fp_data_dir):
logger.error('error :: ionosphere_echo :: features profile data dir does not exist - %s' % metric_fp_data_dir)
continue
data_files = []
try:
glob_path = '%s/*.*' % metric_fp_data_dir
data_files = glob.glob(glob_path)
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
logger.error('error :: ionosphere_echo :: glob could not read - %s' % metric_fp_data_dir)
# Make a list of the files to copy
copy_files = []
for i_file in data_files:
# Exclude all the file resources related to the Mirage
# full_duration features profile
if 'matched.fp_id' in i_file:
continue
if 'fp.created.txt' in i_file:
continue
if 'fp.details.txt' in i_file:
continue
if 'csv.features.transposed.csv' in i_file:
continue
# Specifically include the required files
if 'graphite_now' in i_file:
copy_files.append(i_file)
echo_metric_txt = '%s.txt' % base_name
if echo_metric_txt in i_file:
copy_files.append(i_file)
echo_metric_json = '%s.json' % base_name
if echo_metric_json in i_file:
copy_files.append(i_file)
if 'mirage.graphite' in i_file:
copy_files.append(i_file)
if 'mirage.redis' in i_file:
copy_files.append(i_file)
# Copy the required files to the new training_data dir
for i_file in copy_files:
copying_filename = os.path.basename(i_file)
dest_file = '%s/%s' % (metric_training_data_dir, copying_filename)
if not os.path.isfile(dest_file):
try:
shutil.copy(i_file, metric_training_data_dir)
# logger.info('ionosphere_echo :: training data copied - %s' % (i_file))
except shutil.Error as e:
trace = traceback.format_exc()
logger.error('%s' % trace)
logger.error('error :: ionosphere_echo :: shutil error - training data not copied to %s' % metric_training_data_dir)
logger.error('error :: ionosphere_echo :: %s' % (e))
# Any error saying that the directory doesn't exist
except OSError as e:
trace = traceback.format_exc()
logger.error('%s' % trace)
logger.error('error :: ionosphere_echo :: OSError error - training data not copied to %s' % metric_training_data_dir)
logger.error('error :: %s' % (e))
calculated_feature_file = '%s/%s.tsfresh.input.csv.features.transposed.csv' % (metric_training_data_dir, base_name)
calculated_feature_file_found = False
fp_csv = None
if os.path.isfile(calculated_feature_file):
calculated_feature_file_found = True
fp_csv = calculated_feature_file
logger.info('ionosphere_echo :: calculated features file is available - %s' % (calculated_feature_file))
echo_json_file = '%s.mirage.redis.%sh.json' % (base_name, str(full_duration_in_hours))
if not calculated_feature_file_found:
logger.info('ionosphere_echo :: calculating features from mirage.redis data ts json - %s' % (echo_json_file))
str_created_ts = str(created_ts)
try:
fp_csv, successful, fp_exists, fp_id, log_msg, traceback_format_exc, f_calc = calculate_features_profile(skyline_app, str_created_ts, base_name, context)
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: failed to calculate features')
continue
else:
logger.info('ionosphere_echo :: using available calculated features file')
if os.path.isfile(calculated_feature_file):
logger.info('ionosphere_echo :: calculated features - %s' % (calculated_feature_file))
else:
logger.error('error :: ionosphere_echo :: failed to calculate features no file found - %s' % calculated_feature_file)
continue
# Create the new settings.FULL_DURATION features profile
ionosphere_job = 'learn_fp_human'
# @added 20190503 - Branch #2646: slack
# Added slack_ionosphere_job
slack_ionosphere_job = ionosphere_job
fp_learn = False
try:
# @modified 20190503 - Branch #2646: slack
# Added slack_ionosphere_job
# fp_id, fp_in_successful, fp_exists, fail_msg, traceback_format_exc = create_features_profile(skyline_app, created_ts, base_name, context, ionosphere_job, mirage_fp_id, generation, fp_learn)
# @modified 20190922 - Feature #3230: users DB table
# Ideas #2476: Label and relate anomalies
# Feature #2516: Add label to features profile
# Added user_id
# fp_id, fp_in_successful, fp_exists, fail_msg, traceback_format_exc = create_features_profile(skyline_app, created_ts, base_name, context, ionosphere_job, mirage_fp_id, generation, fp_learn, slack_ionosphere_job, user_id)
user_id = 1
# @modified 20191030 - Feature #2516: Add label to features profile
# Added missing label
label = 'echo'
fp_id, fp_in_successful, fp_exists, fail_msg, traceback_format_exc = create_features_profile(skyline_app, created_ts, base_name, context, ionosphere_job, mirage_fp_id, generation, fp_learn, slack_ionosphere_job, user_id, label)
except:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: failed to create a settings.FULL_DURATION features profile from fp_id %s for %s' % (str(mirage_fp_id), base_name))
continue
if not fp_in_successful:
logger.error(traceback.format_exc())
logger.error('error :: ionosphere_echo :: create_features_profile failed')
continue
else:
echo_created_fp_count += 1
logger.info(
'ionosphere_echo :: new generation %s features profile with id %s settings.FULL_DURATION created from parent feature profile with id %s' % (
str(generation), str(fp_id), str(mirage_fp_id)))
del echo_create_fd_fp_for
if engine:
echo_engine_disposal(engine)
logger.info('ionosphere_echo :: created %s of %s FULL_DURATION features profile that were to be created for %s' % (str(echo_created_fp_count), str(echo_create_fd_fp_for_count), base_name))
return