from __future__ import division
import logging
import os
from sys import version_info
import traceback
import csv
# from ast import literal_eval
from sqlalchemy.sql import select
# @added 20170809 - Task #2132: Optimise Ionosphere DB usage
from pymemcache.client.base import Client as pymemcache_Client
import numpy as np
import pandas as pd
from tsfresh.feature_extraction import (
# @modified 20210101 - Task #3928: Update Skyline to use new tsfresh feature extraction method
# extract_features, ReasonableFeatureExtractionSettings)
extract_features, EfficientFCParameters)
import settings
from skyline_functions import get_memcache_metric_object
from database import (
get_engine, metrics_table_meta)
# @added 20210425 - Task #4030: refactoring
# Feature #4014: Ionosphere - inference
from functions.numpy.percent_different import get_percent_different
skyline_app = 'ionosphere'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
python_version = int(version_info[0])
this_host = str(os.uname()[1])
# Converting one settings variable into a local variable, just because it is a
# long string otherwise.
try:
ENABLE_IONOSPHERE_DEBUG = settings.ENABLE_IONOSPHERE_DEBUG
except:
logger.error('error :: layers :: cannot determine ENABLE_IONOSPHERE_DEBUG from settings')
ENABLE_IONOSPHERE_DEBUG = False
# @added 20220405 - Task #4514: Integrate opentelemetry
# Feature #4516: flux - opentelemetry traces
OTEL_ENABLED = False
try:
OTEL_ENABLED = settings.OTEL_ENABLED
except AttributeError:
OTEL_ENABLED = False
except:
OTEL_ENABLED = False
if OTEL_ENABLED and settings.MEMCACHE_ENABLED:
from opentelemetry.instrumentation.pymemcache import PymemcacheInstrumentor
# @modified 20220505 - Task #4514: Integrate opentelemetry
# Fail gracefully if opentelemetry breaks it breaks
try:
PymemcacheInstrumentor().instrument()
except:
pass
if settings.MEMCACHE_ENABLED:
memcache_client = pymemcache_Client((settings.MEMCACHED_SERVER_IP, settings.MEMCACHED_SERVER_PORT), connect_timeout=0.1, timeout=0.2)
else:
memcache_client = None
LOCAL_DEBUG = False
[docs]def get_metrics_db_object(base_name):
"""
Returns the data of a metric from metrics table as an object and populates
memcached with a dict of the object
:param base_name: the metric base_name
:type base_name: str
:return: metrics_db_object
:rtype: object
"""
def get_an_engine():
try:
engine, log_msg, trace = get_engine(skyline_app)
return engine, log_msg, trace
except:
logger.error(traceback.format_exc())
log_msg = 'error :: failed to get MySQL engine in spin_process'
logger.error('error :: failed to get MySQL engine in spin_process')
return None, log_msg, trace
def engine_disposal(engine):
if engine:
try:
engine.dispose()
except:
logger.error(traceback.format_exc())
logger.error('error :: calling engine.dispose()')
return
metrics_db_object = None
memcache_metrics_db_object = None
metrics_db_object_key = 'metrics_db_object.%s' % str(base_name)
memcache_metric_dict = None
if settings.MEMCACHE_ENABLED:
memcache_metric_dict = get_memcache_metric_object(skyline_app, base_name)
query_metric_table = True
if memcache_metric_dict:
query_metric_table = False
metrics_db_object = memcache_metric_dict
logger.info('using %s key data from memcache' % metrics_db_object_key)
# @modified 20170825 - Task #2132: Optimise Ionosphere DB usage
# If no memcache data then MySQL query_metric_table
if query_metric_table:
try:
engine, log_msg, trace = get_an_engine()
logger.info(log_msg)
except:
logger.error(traceback.format_exc())
logger.error('error :: could not get a MySQL engine to determine ionosphere_enabled')
if not engine:
logger.error('error :: engine not obtained to determine ionosphere_enabled')
# Get the metrics_table metadata
metrics_table = None
try:
metrics_table, log_msg, trace = metrics_table_meta(skyline_app, engine)
logger.info('metrics_table OK for %s' % base_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get metrics_table meta for %s' % base_name)
try:
connection = engine.connect()
stmt = select([metrics_table]).where(metrics_table.c.metric == base_name)
result = connection.execute(stmt)
try:
result
except:
logger.error(traceback.format_exc())
logger.error('error :: got no result from MySQL from metrics table for - %s' % base_name)
row = result.fetchone()
# @added 20170825 - Task #2132: Optimise Ionosphere DB usage
# @modified - 20180524 - Task #2132: Optimise Ionosphere DB usage
# Feature #2378: Add redis auth to Skyline and rebrow
# Wrapped memcache_metrics_db_object, metrics_id,
# metric_ionosphere_enabled and metrics_db_object in if row
# as if row is None it can fail with:
# TypeError: 'NoneType' object is not iterable
# memcache_metrics_db_object = dict(row)
if row:
memcache_metrics_db_object = dict(row)
# @added 20170115 - Feature #1854: Ionosphere learn - generations
# Create the metrics_db_object so it is available throughout
# Here we go! Learn!
metrics_db_object = row
else:
logger.info('could not determine metric id for %s' % base_name)
connection.close()
except:
logger.error(traceback.format_exc())
logger.error('error :: could not determine ionosphere_enabled from metrics table for - %s' % base_name)
# @added 20170825 - Task #2132: Optimise Ionosphere DB usage
# Add the metric db object data to memcache
# @modified 20191031 - Branch #3262: py3
# Task #3304: py3 - handle pymemcache bytes not str
# if settings.MEMCACHE_ENABLED and query_metric_table:
if settings.MEMCACHE_ENABLED and query_metric_table and memcache_metrics_db_object:
try:
memcache_metric_dict = {}
# @modified 20191030 - Branch #3262: py3
# Task #3304: py3 - handle pymemcache bytes not str
# for k, v in memcache_metrics_db_object.iteritems():
if python_version == 2:
for k, v in memcache_metrics_db_object.iteritems():
key_name = str(k)
key_value = str(v)
memcache_metric_dict[key_name] = key_value
else:
for k, v in memcache_metrics_db_object.items():
key_name = str(k)
key_value = str(v)
memcache_metric_dict[key_name] = key_value
memcache_client.set(metrics_db_object_key, memcache_metric_dict, expire=3600)
logger.info('set the memcache key - %s' % metrics_db_object_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to set %s in memcache' % metrics_db_object_key)
try:
memcache_client.close()
except:
# @modified 20170913 - Task #2160: Test skyline with bandit
# pass
logger.error('error :: failed to close memcache_client')
return metrics_db_object
[docs]def get_calculated_features(calculated_feature_file):
"""
Called by :class:`~skyline.skyline.Ionosphere.spin_process` to
create a list of feature_names and their calc_values.
:param calculated_feature_file: the full path and file name of the
features.transposed.csv file
:type calculated_feature_file: str
:return: calculated_features
:rtype: list
"""
if not os.path.isfile(calculated_feature_file):
# @added 20191029 - Task #3302: Handle csv.reader in py3
# Branch #3262: py3
# Added an error to log
logger.error('error :: get_calculated_features :: no calculated_feature_file found - %s' % (
str(calculated_feature_file)))
return []
count_id = 0
# @added 20191029 - Task #3302: Handle csv.reader in py3
# Branch #3262: py3
if python_version == 3:
try:
codecs
except:
import codecs
calculated_features = []
with open(calculated_feature_file, 'rb') as fr:
# @modified 20191029 - Task #3302: Handle csv.reader in py3
# Branch #3262: py3
# reader = csv.reader(fr, delimiter=',')
if python_version == 2:
reader = csv.reader(fr, delimiter=',')
else:
reader = csv.reader(codecs.iterdecode(fr, 'utf-8'), delimiter=',')
for i, line in enumerate(reader):
if str(line[0]) != '':
if ',' in line[0]:
feature_name = '"%s"' % str(line[0])
else:
feature_name = str(line[0])
count_id += 1
# @modified 20190412 - Feature #2484: FULL_DURATION feature profiles
# This sometimes generates an error so wrapped in try with
# logging.
# calc_value = float(line[1])
# calculated_features.append([feature_name, calc_value])
try:
calc_value = float(line[1])
calculated_features.append([feature_name, calc_value])
except:
# @modified 20190412 - Feature #2484: FULL_DURATION feature profiles
# This logging determineed that at times no value as in empty,
# is some times encountered with autocorrelation_lag values
# so disabled traceback and log as info not error
# logger.error(traceback.format_exc())
# logger.error('failed to determine calc_value from value of - %s - for feature_name - %s' % (
# str(line[1]), str(line[0])))
logger.info('get_calculated_features :: empty calc_value for feature_name - %s, not added to calculated_features' % (
str(line[0])))
return calculated_features
[docs]def minmax_scale_check(
fp_id_metric_ts, anomalous_timeseries, range_tolerance,
range_tolerance_percentage, fp_id, base_name, metric_timestamp,
features_percentage_diff):
"""
Called by nothing yet. Used to run a minmax scaling check and determine if
the features_sum of the 2 minmax scaled timeseries match.
:param fp_id_metric_ts:
:param anomalous_timeseries:
:param range_tolerance:
:param range_tolerance_percentage
:param fp_id:
:param base_name:
:param metric_timestamp:
:param features_percentage_diff:
:type fp_id_metric_ts: int
:return: (minmax_not_anomalous, minmax_fp_features_sum, minmax_fp_features_count, minmax_anomalous_features_sum, minmax_anomalous_features_count)
:rtype: tuple
"""
# @modified 20191115 - Branch #3262: py3
# not_anomalous = False
try:
minmax_fp_values = [x[1] for x in fp_id_metric_ts]
min_fp_value = min(minmax_fp_values)
max_fp_value = max(minmax_fp_values)
except:
min_fp_value = False
max_fp_value = False
try:
minmax_anomalous_values = [x2[1] for x2 in anomalous_timeseries]
min_anomalous_value = min(minmax_anomalous_values)
max_anomalous_value = max(minmax_anomalous_values)
except:
min_anomalous_value = False
max_anomalous_value = False
lower_range_not_same = True
try:
if int(min_fp_value) == int(min_anomalous_value):
lower_range_not_same = False
lower_range_similar = True
logger.info('min value of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are the same' % (
str(min_fp_value), str(min_anomalous_value)))
except:
lower_range_not_same = True
if min_fp_value and min_anomalous_value and lower_range_not_same:
if int(min_fp_value) == int(min_anomalous_value):
lower_range_similar = True
logger.info('min value of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are the same' % (
str(min_fp_value), str(min_anomalous_value)))
else:
lower_min_fp_value = int(min_fp_value - (min_fp_value * range_tolerance))
upper_min_fp_value = int(min_fp_value + (min_fp_value * range_tolerance))
if int(min_anomalous_value) in range(lower_min_fp_value, upper_min_fp_value):
lower_range_similar = True
logger.info('min value of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are similar within %s percent of each other' % (
str(min_fp_value),
str(min_anomalous_value),
str(range_tolerance_percentage)))
if not lower_range_similar:
logger.info('lower range of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are not similar' % (
str(min_fp_value), str(min_anomalous_value)))
upper_range_not_same = True
try:
if int(max_fp_value) == int(max_anomalous_value):
upper_range_not_same = False
upper_range_similar = True
logger.info('max value of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are the same' % (
str(max_fp_value), str(max_anomalous_value)))
except:
upper_range_not_same = True
if max_fp_value and max_anomalous_value and lower_range_similar and upper_range_not_same:
# @added 20180717 - Task #2446: Optimize Ionosphere
# Feature #2404: Ionosphere - fluid approximation
# On low values such as 1 and 2, the range_tolerance
# should be adjusted to account for the very small
# range. TODO
lower_max_fp_value = int(max_fp_value - (max_fp_value * range_tolerance))
upper_max_fp_value = int(max_fp_value + (max_fp_value * range_tolerance))
if int(max_anomalous_value) in range(lower_max_fp_value, upper_max_fp_value):
upper_range_similar = True
logger.info('max value of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are similar within %s percent of each other' % (
str(max_fp_value), str(max_anomalous_value),
str(range_tolerance_percentage)))
else:
logger.info('max value of fp_id_metric_ts (%s) and anomalous_timeseries (%s) are not similar' % (
str(max_fp_value), str(max_anomalous_value)))
if lower_range_similar and upper_range_similar:
range_similar = True
else:
logger.info('the ranges of fp_id_metric_ts and anomalous_timeseries differ significantly Min-Max scaling will be skipped')
minmax_fp_ts = []
# if fp_id_metric_ts:
if range_similar:
if LOCAL_DEBUG:
logger.debug('debug :: creating minmax_fp_ts from minmax scaled fp_id_metric_ts')
try:
minmax_fp_values = [x[1] for x in fp_id_metric_ts]
x_np = np.asarray(minmax_fp_values)
# Min-Max scaling
np_minmax = (x_np - x_np.min()) / (x_np.max() - x_np.min())
for (ts, v) in zip(fp_id_metric_ts, np_minmax):
minmax_fp_ts.append([ts[0], v])
logger.info('minmax_fp_ts list populated with the minmax scaled time series with %s data points' % str(len(minmax_fp_ts)))
except:
logger.error(traceback.format_exc())
logger.error('error :: could not minmax scale fp id %s time series for %s' % (str(fp_id), str(base_name)))
if not minmax_fp_ts:
logger.error('error :: minmax_fp_ts list not populated')
minmax_anomalous_ts = []
anomalous_ts_values_count = len(anomalous_timeseries)
if minmax_fp_ts:
# Only process if they are approximately the same length
minmax_fp_ts_values_count = len(minmax_fp_ts)
if minmax_fp_ts_values_count - anomalous_ts_values_count in range(-14, 14):
try:
minmax_anomalous_values = [x2[1] for x2 in anomalous_timeseries]
x_np = np.asarray(minmax_anomalous_values)
# Min-Max scaling
np_minmax = (x_np - x_np.min()) / (x_np.max() - x_np.min())
for (ts, v) in zip(fp_id_metric_ts, np_minmax):
minmax_anomalous_ts.append([ts[0], v])
except:
logger.error(traceback.format_exc())
logger.error('error :: could not minmax scale current time series anomalous_timeseries for %s' % (str(fp_id), str(base_name)))
if len(minmax_anomalous_ts) > 0:
logger.info('minmax_anomalous_ts is populated with %s data points' % str(len(minmax_anomalous_ts)))
else:
logger.error('error :: minmax_anomalous_ts is not populated')
else:
logger.info('minmax scaled check will be skipped - anomalous_ts_values_count is %s and minmax_fp_ts is %s' % (str(anomalous_ts_values_count), str(minmax_fp_ts_values_count)))
minmax_fp_ts_csv = '%s/fpid.%s.%s.minmax_fp_ts.tsfresh.input.std.csv' % (
settings.SKYLINE_TMP_DIR, str(fp_id), base_name)
minmax_fp_fname_out = minmax_fp_ts_csv + '.transposed.csv'
anomalous_ts_csv = '%s/%s.%s.minmax_anomalous_ts.tsfresh.std.csv' % (
settings.SKYLINE_TMP_DIR, metric_timestamp, base_name)
anomalous_fp_fname_out = anomalous_ts_csv + '.transposed.csv'
# @modified 20210101 - Task #3928: Update Skyline to use new tsfresh feature extraction method
# tsf_settings = ReasonableFeatureExtractionSettings()
# tsf_settings.disable_progressbar = True
minmax_fp_features_sum = None
minmax_anomalous_features_sum = None
if minmax_anomalous_ts and minmax_fp_ts:
if LOCAL_DEBUG:
logger.debug('debug :: analyzing minmax_fp_ts and minmax_anomalous_ts')
if not os.path.isfile(minmax_fp_ts_csv):
if LOCAL_DEBUG:
logger.debug('debug :: creating %s from minmax_fp_ts' % minmax_fp_ts_csv)
datapoints = minmax_fp_ts
converted = []
for datapoint in datapoints:
try:
new_datapoint = [float(datapoint[0]), float(datapoint[1])]
converted.append(new_datapoint)
except: # nosec
continue
if LOCAL_DEBUG:
if len(converted) > 0:
logger.debug('debug :: converted is populated')
else:
logger.debug('debug :: error :: converted is not populated')
for ts, value in converted:
try:
utc_ts_line = '%s,%s,%s\n' % (base_name, str(int(ts)), str(value))
with open(minmax_fp_ts_csv, 'a') as fh:
fh.write(utc_ts_line)
except:
logger.error(traceback.format_exc())
logger.error('error :: could not write to file %s' % (str(minmax_fp_ts_csv)))
else:
logger.info('file found %s, using for data' % minmax_fp_ts_csv)
if not os.path.isfile(minmax_fp_ts_csv):
logger.error('error :: file not found %s' % minmax_fp_ts_csv)
else:
logger.info('file exists to create the minmax_fp_ts data frame from - %s' % minmax_fp_ts_csv)
try:
df = pd.read_csv(minmax_fp_ts_csv, delimiter=',', header=None, names=['metric', 'timestamp', 'value'])
df.columns = ['metric', 'timestamp', 'value']
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to created data frame from %s' % (str(minmax_fp_ts_csv)))
try:
df_features = extract_features(
# @modified 20210101 - Task #3928: Update Skyline to use new tsfresh feature extraction method
# df, column_id='metric', column_sort='timestamp', column_kind=None,
# column_value=None, feature_extraction_settings=tsf_settings)
df, default_fc_parameters=EfficientFCParameters(),
column_id='metric', column_sort='timestamp', column_kind=None,
column_value=None, disable_progressbar=True)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to created df_features from %s' % (str(minmax_fp_ts_csv)))
# Create transposed features csv
if not os.path.isfile(minmax_fp_fname_out):
# Transpose
df_t = df_features.transpose()
df_t.to_csv(minmax_fp_fname_out)
else:
if LOCAL_DEBUG:
logger.debug('debug :: file exists - %s' % minmax_fp_fname_out)
try:
# Calculate the count and sum of the features values
df_sum = pd.read_csv(
minmax_fp_fname_out, delimiter=',', header=0,
names=['feature_name', 'value'])
df_sum.columns = ['feature_name', 'value']
df_sum['feature_name'] = df_sum['feature_name'].astype(str)
df_sum['value'] = df_sum['value'].astype(float)
minmax_fp_features_count = len(df_sum['value'])
minmax_fp_features_sum = df_sum['value'].sum()
logger.info('minmax_fp_ts - features_count: %s, features_sum: %s' % (str(minmax_fp_features_count), str(minmax_fp_features_sum)))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to created df_sum from %s' % (str(minmax_fp_fname_out)))
if minmax_fp_features_count > 0:
if LOCAL_DEBUG:
logger.debug('debug :: minmax_fp_features_count of the minmax_fp_ts is %s' % str(minmax_fp_features_count))
else:
logger.error('error :: minmax_fp_features_count is %s' % str(minmax_fp_features_count))
if not os.path.isfile(anomalous_ts_csv):
datapoints = minmax_anomalous_ts
converted = []
for datapoint in datapoints:
try:
new_datapoint = [float(datapoint[0]), float(datapoint[1])]
converted.append(new_datapoint)
except: # nosec
continue
for ts, value in converted:
utc_ts_line = '%s,%s,%s\n' % (base_name, str(int(ts)), str(value))
with open(anomalous_ts_csv, 'a') as fh:
fh.write(utc_ts_line)
df = pd.read_csv(anomalous_ts_csv, delimiter=',', header=None, names=['metric', 'timestamp', 'value'])
df.columns = ['metric', 'timestamp', 'value']
df_features_current = extract_features(
# @modified 20210101 - Task #3928: Update Skyline to use new tsfresh feature extraction method
# df, column_id='metric', column_sort='timestamp', column_kind=None,
# column_value=None, feature_extraction_settings=tsf_settings)
df, default_fc_parameters=EfficientFCParameters(),
column_id='metric', column_sort='timestamp', column_kind=None,
column_value=None, disable_progressbar=True)
# Create transposed features csv
if not os.path.isfile(anomalous_fp_fname_out):
# Transpose
df_t = df_features_current.transpose()
df_t.to_csv(anomalous_fp_fname_out)
# Calculate the count and sum of the features values
df_sum_2 = pd.read_csv(
anomalous_fp_fname_out, delimiter=',', header=0,
names=['feature_name', 'value'])
df_sum_2.columns = ['feature_name', 'value']
df_sum_2['feature_name'] = df_sum_2['feature_name'].astype(str)
df_sum_2['value'] = df_sum_2['value'].astype(float)
minmax_anomalous_features_count = len(df_sum_2['value'])
minmax_anomalous_features_sum = df_sum_2['value'].sum()
logger.info('minmax_anomalous_ts - minmax_anomalous_features_count: %s, minmax_anomalous_features_sum: %s' % (
str(minmax_anomalous_features_count),
str(minmax_anomalous_features_sum)))
if minmax_fp_features_sum and minmax_anomalous_features_sum:
percent_different = None
# @modified 20210425 - Task #4030: refactoring
# Feature #4014: Ionosphere - inference
# Use the common function added
# try:
# fp_sum_array = [minmax_fp_features_sum]
# calc_sum_array = [minmax_anomalous_features_sum]
# percent_different = 100
# sums_array = np.array([minmax_fp_features_sum, minmax_anomalous_features_sum], dtype=float)
# calc_percent_different = np.diff(sums_array) / sums_array[:-1] * 100.
# percent_different = calc_percent_different[0]
# logger.info('percent_different between minmax scaled features sums - %s' % str(percent_different))
# except:
# logger.error(traceback.format_exc())
# logger.error('error :: failed to calculate percent_different from minmax scaled features sums')
try:
percent_different = get_percent_different(minmax_fp_features_sum, minmax_anomalous_features_sum, True)
logger.info('percent_different between minmax scaled features sums - %s' % str(percent_different))
except Exception as e:
logger.error('error :: failed to calculate percent_different between minmax scaled features sums - %s' % e)
if percent_different:
almost_equal = None
try:
# np.testing.assert_array_almost_equal(fp_sum_array, calc_sum_array)
np.testing.assert_array_almost_equal(minmax_fp_features_sum, minmax_anomalous_features_sum)
almost_equal = True
except:
almost_equal = False
if almost_equal:
minmax_not_anomalous = True
logger.info('minmax scaled common features sums are almost equal, not anomalous')
# if diff_in_sums <= 1%:
if percent_different < 0:
new_pdiff = percent_different * -1
percent_different = new_pdiff
if percent_different < float(features_percentage_diff):
minmax_not_anomalous = True
# log
logger.info('not anomalous - minmax scaled features profile match - %s - %s' % (base_name, str(minmax_not_anomalous)))
logger.info(
'minmax scaled calculated features sum are within %s percent of fp_id %s with %s, not anomalous' %
(str(features_percentage_diff),
str(fp_id), str(percent_different)))
# @modified 20191115 - Branch #3262: py3
# if minmax_not_anomalous:
# not_anomalous = True
# minmax = 1
# Created time series resources for graphing in
# the matched page
return (minmax_not_anomalous, minmax_fp_features_sum, minmax_fp_features_count, minmax_anomalous_features_sum, minmax_anomalous_features_count)