import logging
from os import path
import operator
import time
import traceback
from ast import literal_eval
import datetime
from flask import request
# import mysql.connector
# from mysql.connector import errorcode
# @added 20180720 - Feature #2464: luminosity_remote_data
# Added redis and msgpack
from redis import StrictRedis
from msgpack import Unpacker
# @added 20201103 - Feature #3824: get_cluster_data
import requests
# @added 20201125 - Feature #3850: webapp - yhat_values API endoint
import numpy as np
# @added 20210328 - Feature #3994: Panorama - mirage not anomalous
import pandas as pd
import settings
from skyline_functions import (
mysql_select,
# @added 20180720 - Feature #2464: luminosity_remote_data
# nonNegativeDerivative, in_list, is_derivative_metric,
# @added 20200507 - Feature #3532: Sort all time series
# Added sort_timeseries and removed unused in_list
nonNegativeDerivative, sort_timeseries,
# @added 20201123 - Feature #3824: get_cluster_data
# Feature #2464: luminosity_remote_data
# Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
get_redis_conn_decoded,
# @added 20201125 - Feature #3850: webapp - yhat_values API endoint
get_graphite_metric,
# @added 20210328 - Feature #3994: Panorama - mirage not anomalous
filesafe_metricname, mkdir_p)
# @added 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
# from database_queries import (
# db_query_metric_id_from_base_name, db_query_latest_anomalies,
# db_query_metric_ids_from_metric_like)
# @added 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
# Task #4030: refactoring
from functions.database.queries.metric_id_from_base_name import metric_id_from_base_name
from functions.database.queries.metric_ids_from_metric_like import metric_ids_from_metric_like
from functions.database.queries.latest_anomalies import latest_anomalies as db_latest_anomalies
# @added 20210617 - Feature #4144: webapp - stale_metrics API endpoint
# Feature #4076: CUSTOM_STALE_PERIOD
# Branch #1444: thunder
from functions.thunder.stale_metrics import thunder_stale_metrics
import skyline_version
skyline_version = skyline_version.__absolute_version__
skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
REQUEST_ARGS = ['from_date',
'from_time',
'from_timestamp',
'until_date',
'until_time',
'until_timestamp',
'target',
'like_target',
'source',
'host',
'algorithm',
# @added 20161127 - Branch #922: ionosphere
'panorama_anomaly_id',
]
# Converting one settings variable into a local variable, just because it is a
# long string otherwise.
try:
ENABLE_WEBAPP_DEBUG = settings.ENABLE_WEBAPP_DEBUG
except Exception as e:
logger.error('error :: cannot determine ENABLE_WEBAPP_DEBUG from settings - %s' % e)
ENABLE_WEBAPP_DEBUG = False
# @added 20180720 - Feature #2464: luminosity_remote_data
# Added REDIS_CONN
if settings.REDIS_PASSWORD:
REDIS_CONN = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
REDIS_CONN = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs]def panorama_request():
"""
Gets the details of anomalies from the database, using the URL arguments
that are passed in by the :obj:`request.args` to build the MySQL select
query string and queries the database, parse the results and creates an
array of the anomalies that matched the query and creates the
``panaroma.json`` file, then returns the array. The Webapp needs both the
array and the JSONP file to serve to the browser for the client side
``panaroma.js``.
:param None: determined from :obj:`request.args`
:return: array
:rtype: array
.. note:: And creates ``panaroma.js`` for client side javascript
"""
logger.info('determining request args')
def get_ids_from_rows(thing, rows):
found_ids = []
for row in rows:
found_id = str(row[0])
found_ids.append(int(found_id))
# @modified 20191014 - Task #3270: Deprecate string.replace for py3
# Branch #3262: py3
# ids_first = string.replace(str(found_ids), '[', '')
# in_ids = string.replace(str(ids_first), ']', '')
found_ids_str = str(found_ids)
ids_first = found_ids_str.replace('[', '')
in_ids = ids_first.replace(']', '')
return in_ids
try:
request_args_len = len(request.args)
except:
request_args_len = False
latest_anomalies = False
if request_args_len == 0:
request_args_len = 'No request arguments passed'
# return str(request_args_len)
latest_anomalies = True
metric = False
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# if metric
# logger.info('Getting db id for %s' % metric)
# # @modified 20170913 - Task #2160: Test skyline with bandit
# # Added nosec to exclude from bandit tests
# query = 'select id from metrics WHERE metric=\'%s\'' % metric # nosec
# try:
# result = mysql_select(skyline_app, query)
# except:
# logger.error('error :: failed to get id from db: %s' % traceback.format_exc())
# result = 'metric id not found in database'
# return str(result[0][0])
search_request = True
count_request = False
if latest_anomalies:
logger.info('Getting latest anomalies')
# @modified 20191108 - Feature #3306: Record the anomaly_end_timestamp
# Branch #3262: py3
# query = 'select id, metric_id, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp from anomalies ORDER BY id DESC LIMIT 10'
# @modified 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
# query = 'select id, metric_id, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp, anomaly_end_timestamp from anomalies ORDER BY id DESC LIMIT 10'
# try:
# rows = mysql_select(skyline_app, query)
# except:
# logger.error('error :: failed to get anomalies from db: %s' % traceback.format_exc())
# rows = []
# @added 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
rows = []
try:
rows = db_latest_anomalies(skyline_app)
except:
logger.error('error :: failed to get anomalies from db: %s' % traceback.format_exc())
rows = []
if not latest_anomalies:
logger.info('Determining search parameters')
# @modified 20191108 - Feature #3306: Record the end_timestamp of anomalies
# Branch #3262: py3
# query_string = 'select id, metric_id, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp from anomalies'
query_string = 'select id, metric_id, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp, anomaly_end_timestamp from anomalies'
needs_and = False
# If we have to '' a string we cannot escape the query it seems...
# do_not_escape = False
if 'metric' in request.args:
metric = request.args.get('metric', None)
# if metric and metric != 'all':
if isinstance(metric, str) and metric != 'all':
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
# @modified 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
# query = "select id from metrics WHERE metric='%s'" % (metric) # nosec
# try:
# found_id = mysql_select(skyline_app, query)
# except:
# logger.error('error :: failed to get app ids from db: %s' % traceback.format_exc())
# found_id = None
# @added 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
found_id = None
if metric.startswith(settings.FULL_NAMESPACE):
base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = str(metric)
try:
# found_id = db_query_metric_id_from_base_name(skyline_app, base_name)
found_id = metric_id_from_base_name(skyline_app, base_name)
except:
logger.error('error :: failed to get metric id from db: %s' % traceback.format_exc())
found_id = None
if found_id:
# target_id = str(found_id[0][0])
target_id = str(found_id)
if needs_and:
new_query_string = '%s AND metric_id=%s' % (query_string, target_id)
else:
new_query_string = '%s WHERE metric_id=%s' % (query_string, target_id)
query_string = new_query_string
needs_and = True
# in_ids_str = None
if 'metric_like' in request.args:
metric_like = request.args.get('metric_like', None)
metrics_like_str = None
# if metric_like and metric_like != 'all':
if isinstance(metric_like, str) and metric_like != 'all':
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
rows_returned = None
# @modified 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
# query = 'select id from metrics WHERE metric LIKE \'%s\'' % (str(metric_like)) # nosec
# try:
# rows = mysql_select(skyline_app, query)
# except:
# logger.error('error :: failed to get metric ids from db: %s' % traceback.format_exc())
# return False
# rows_returned = None
# try:
# rows_returned = rows[0]
# if ENABLE_WEBAPP_DEBUG:
# logger.info('debug :: rows - rows[0] - %s' % str(rows[0]))
# except:
# rows_returned = False
# if ENABLE_WEBAPP_DEBUG:
# logger.info('debug :: no rows returned')
# @added 20210420 - Task #4022: Move mysql_select calls to SQLAlchemy
metrics_like_str = str(metric_like)
db_metric_ids = None
try:
db_metric_ids = metric_ids_from_metric_like(skyline_app, metrics_like_str)
except Exception as e:
logger.error('error :: failed to get metric ids from db: %s' % e)
return False
use_db_metric_ids = True
if db_metric_ids and use_db_metric_ids:
rows_returned = False
ids = ''
for db_metric_id in db_metric_ids:
if ids == '':
ids = '%s' % str(db_metric_id)
else:
ids = '%s, %s' % (ids, str(db_metric_id))
new_query_string = '%s WHERE metric_id IN (%s)' % (query_string, str(ids))
else:
# Get nothing
new_query_string = '%s WHERE metric_id IN (0)' % (query_string)
if ENABLE_WEBAPP_DEBUG:
logger.info('debug :: no rows returned using new_query_string - %s' % new_query_string)
if not use_db_metric_ids:
if rows_returned:
ids = get_ids_from_rows('metric', rows)
new_query_string = '%s WHERE metric_id IN (%s)' % (query_string, str(ids))
logger.info('debug :: id is %s chars long after adding get_ids_from_rows, new_query_string: %s' % (
str(len(ids)), new_query_string))
else:
# Get nothing
new_query_string = '%s WHERE metric_id IN (0)' % (query_string)
if ENABLE_WEBAPP_DEBUG:
logger.info('debug :: no rows returned using new_query_string - %s' % new_query_string)
query_string = new_query_string
needs_and = True
if 'count_by_metric' in request.args:
count_by_metric = request.args.get('count_by_metric', None)
if count_by_metric and count_by_metric != 'false':
search_request = False
count_request = True
# query_string = 'SELECT metric_id, COUNT(*) FROM anomalies GROUP BY metric_id ORDER BY COUNT(*) DESC'
query_string = 'SELECT metric_id, COUNT(*) FROM anomalies'
needs_and = False
if 'from_timestamp' in request.args:
from_timestamp = request.args.get('from_timestamp', None)
if from_timestamp and from_timestamp != 'all':
if ":" in from_timestamp:
# @modified 20211021 - handle multiple date formats
try:
new_from_timestamp = time.mktime(datetime.datetime.strptime(from_timestamp, '%Y%m%d %H:%M').timetuple())
except ValueError:
new_from_timestamp = time.mktime(datetime.datetime.strptime(from_timestamp, '%Y-%m-%d %H:%M').timetuple())
except Exception as err:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: panorama_request :: failed to unix timestamp from from_timestamp - %s' % str(err)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
from_timestamp = str(int(new_from_timestamp))
if needs_and:
new_query_string = '%s AND anomaly_timestamp >= %s' % (query_string, from_timestamp)
query_string = new_query_string
needs_and = True
else:
new_query_string = '%s WHERE anomaly_timestamp >= %s' % (query_string, from_timestamp)
query_string = new_query_string
needs_and = True
if 'until_timestamp' in request.args:
until_timestamp = request.args.get('until_timestamp', None)
if until_timestamp and until_timestamp != 'all':
if ":" in until_timestamp:
# @modified 20211021 - handle multiple date formats
try:
new_until_timestamp = time.mktime(datetime.datetime.strptime(until_timestamp, '%Y%m%d %H:%M').timetuple())
except ValueError:
new_until_timestamp = time.mktime(datetime.datetime.strptime(until_timestamp, '%Y-%m-%d %H:%M').timetuple())
except Exception as err:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: panorama_request :: failed to unix timestamp from until_timestamp - %s' % str(err)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
until_timestamp = str(int(new_until_timestamp))
if needs_and:
new_query_string = '%s AND anomaly_timestamp <= %s' % (query_string, until_timestamp)
query_string = new_query_string
needs_and = True
else:
new_query_string = '%s WHERE anomaly_timestamp <= %s' % (query_string, until_timestamp)
query_string = new_query_string
needs_and = True
if 'app' in request.args:
app = request.args.get('app', None)
if app and app != 'all':
# @added 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# Sanitise variable
if isinstance(app, str):
for_app = str(app)
else:
for_app = 'none'
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# query = 'select id from apps WHERE app=\'%s\'' % (str(app)) # nosec
query = 'select id from apps WHERE app=\'%s\'' % (str(for_app))
try:
found_id = mysql_select(skyline_app, query)
except:
logger.error('error :: failed to get app ids from db: %s' % traceback.format_exc())
found_id = None
if found_id:
target_id = str(found_id[0][0])
if needs_and:
new_query_string = '%s AND app_id=%s' % (query_string, target_id)
else:
new_query_string = '%s WHERE app_id=%s' % (query_string, target_id)
query_string = new_query_string
needs_and = True
if 'source' in request.args:
source = request.args.get('source', None)
if source and source != 'all':
# @added 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# Sanitise variable
if isinstance(source, str):
for_source = str(source)
else:
for_source = 'none'
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# query = 'select id from sources WHERE source=\'%s\'' % (str(source)) # nosec
query = 'select id from sources WHERE source=\'%s\'' % (str(for_source))
try:
found_id = mysql_select(skyline_app, query)
except:
logger.error('error :: failed to get source id from db: %s' % traceback.format_exc())
found_id = None
if found_id:
target_id = str(found_id[0][0])
if needs_and:
new_query_string = '%s AND source_id=\'%s\'' % (query_string, target_id)
else:
new_query_string = '%s WHERE source_id=\'%s\'' % (query_string, target_id)
query_string = new_query_string
needs_and = True
if 'algorithm' in request.args:
algorithm = request.args.get('algorithm', None)
# DISABLED as it is difficult match algorithm_id in the
# triggered_algorithms csv list
algorithm = 'all'
# @modified 20210421 - Task #4030: refactoring
# semgrep - python.lang.correctness.useless-comparison.no-strings-as-booleans
# if algorithm and algorithm != 'all':
use_all_for_algorithm = True
if use_all_for_algorithm and algorithm != 'all':
# @added 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# Sanitise variable
if isinstance(algorithm, str):
for_algorithm = str(algorithm)
else:
for_algorithm = 'none'
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# query = 'select id from algorithms WHERE algorithm LIKE \'%s\'' % (str(algorithm)) # nosec
query = 'select id from algorithms WHERE algorithm LIKE \'%s\'' % (str(for_algorithm))
try:
rows = mysql_select(skyline_app, query)
except:
logger.error('error :: failed to get algorithm ids from db: %s' % traceback.format_exc())
rows = []
ids = get_ids_from_rows('algorithm', rows)
if needs_and:
new_query_string = '%s AND algorithm_id IN (%s)' % (query_string, str(ids))
else:
new_query_string = '%s WHERE algorithm_id IN (%s)' % (query_string, str(ids))
query_string = new_query_string
needs_and = True
if 'host' in request.args:
host = request.args.get('host', None)
if host and host != 'all':
# @added 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# Sanitise variable
if isinstance(host, str):
for_host = str(host)
else:
for_host = 'none'
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# query = 'select id from hosts WHERE host=\'%s\'' % (str(host)) # nosec
query = 'select id from hosts WHERE host=\'%s\'' % (str(for_host))
try:
found_id = mysql_select(skyline_app, query)
except:
logger.error('error :: failed to get host id from db: %s' % traceback.format_exc())
found_id = None
if found_id:
target_id = str(found_id[0][0])
if needs_and:
new_query_string = '%s AND host_id=\'%s\'' % (query_string, target_id)
else:
new_query_string = '%s WHERE host_id=\'%s\'' % (query_string, target_id)
query_string = new_query_string
needs_and = True
if 'limit' in request.args:
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# limit = request.args.get('limit', '10')
limit_str = request.args.get('limit', '10')
try:
limit = int(limit_str) + 0
except Exception as e:
logger.error('error :: limit parameter not an int: %s' % e)
limit = 10
else:
limit = '10'
if 'order' in request.args:
# @modified 20210504 - Task #4030: refactoring
# Task #4022: Move mysql_select calls to SQLAlchemy
# order = request.args.get('order', 'DESC')
order_str = request.args.get('order', 'DESC')
if order_str == 'ASC':
order = 'ASC'
else:
order = 'DESC'
else:
order = 'DESC'
search_query = '%s ORDER BY id %s LIMIT %s' % (
query_string, order, str(limit))
if 'count_by_metric' in request.args:
count_by_metric = request.args.get('count_by_metric', None)
if count_by_metric and count_by_metric != 'false':
# query_string = 'SELECT metric_id, COUNT(*) FROM anomalies GROUP BY metric_id ORDER BY COUNT(*) DESC'
search_query = '%s GROUP BY metric_id ORDER BY COUNT(*) %s LIMIT %s' % (
query_string, order, limit)
try:
rows = mysql_select(skyline_app, search_query)
except:
logger.error('error :: failed to get anomalies from db: %s' % traceback.format_exc())
rows = []
anomalies = []
anomalous_metrics = []
if search_request:
# @modified 20191014 - Task #3270: Deprecate string.replace for py3
# Branch #3262: py3
anomalies_json = path.abspath(path.join(path.dirname(__file__), '..', settings.ANOMALY_DUMP))
# panorama_json = string.replace(str(anomalies_json), 'anomalies.json', 'panorama.json')
panorama_json = anomalies_json.replace('anomalies.json', 'panorama.json')
if ENABLE_WEBAPP_DEBUG:
logger.info('debug :: panorama_json - %s' % str(panorama_json))
for row in rows:
if search_request:
anomaly_id = str(row[0])
metric_id = str(row[1])
if count_request:
metric_id = str(row[0])
anomaly_count = str(row[1])
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
query = 'select metric from metrics WHERE id=%s' % metric_id # nosec
try:
result = mysql_select(skyline_app, query)
except:
logger.error('error :: failed to get id from db: %s' % traceback.format_exc())
continue
metric = str(result[0][0])
if search_request:
anomalous_datapoint = str(row[2])
anomaly_timestamp = str(row[3])
anomaly_timestamp = str(row[3])
full_duration = str(row[4])
created_timestamp = str(row[5])
# @modified 20191108 - Feature #3306: Record the anomaly_end_timestamp
# Branch #3262: py3
# anomaly_data = (anomaly_id, metric, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp)
# anomalies.append([int(anomaly_id), str(metric), anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp])
anomaly_end_timestamp = str(row[6])
# anomaly_data = (anomaly_id, metric, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp, anomaly_end_timestamp)
anomalies.append([int(anomaly_id), str(metric), anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp, anomaly_end_timestamp])
anomalous_metrics.append(str(metric))
if count_request:
limit_argument = anomaly_count
if int(anomaly_count) > 100:
limit_argument = 100
# anomaly_data = (int(anomaly_count), metric, str(limit_argument))
anomalies.append([int(anomaly_count), str(metric), str(limit_argument)])
anomalies.sort(key=operator.itemgetter(int(0)))
if search_request:
with open(panorama_json, 'w') as fh:
pass
# Write anomalous_metrics to static webapp directory
with open(panorama_json, 'a') as fh:
# Make it JSONP with a handle_data() function
fh.write('handle_data(%s)' % anomalies)
if latest_anomalies:
return anomalies
else:
return search_query, anomalies
[docs]def get_list(thing):
"""
Get a list of names for things in a database table.
:param thing: the thing, e.g. 'algorithm'
:type thing: str
:return: list
:rtype: list
"""
table = '%ss' % thing
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
query = 'select %s from %s' % (thing, table) # nosec
logger.info('get_list :: select %s from %s' % (thing, table)) # nosec
# got_results = False
try:
results = mysql_select(skyline_app, query)
# got_results = True
except:
logger.error('error :: failed to get list of %ss from %s' % (thing, table))
results = None
things = []
results_array_valid = False
try:
test_results = results[0]
if test_results:
results_array_valid = True
except:
logger.error('error :: invalid results array for get list of %ss from %s' % (thing, table))
# @modified 20210415 - Feature #4014: Ionosphere - inference
# Stop logging results in webapp
if results_array_valid:
# @modified 20210415 - Feature #4014: Ionosphere - inference
# Stop logging results in webapp
# logger.info('results: %s' % str(results))
# for result in results:
# things.append(str(result[0]))
# logger.info('things: %s' % str(things))
logger.info('get_list :: returned valid result: %s' % str(results_array_valid))
return things
# @added 20180720 - Feature #2464: luminosity_remote_data
# @modified 20201203 - Feature #3860: luminosity - handle low frequency data
# Add the metric resolution
# def luminosity_remote_data(anomaly_timestamp):
[docs]def luminosity_remote_data(anomaly_timestamp, resolution):
"""
Gets all the unique_metrics from Redis and then mgets Redis data for all
metrics. The data is then preprocessed for the remote Skyline luminosity
instance and only the relevant fragments of the time series are
returned. This return is then gzipped by the Flask Webapp response to
ensure the minimum about of bandwidth is used.
:param anomaly_timestamp: the anomaly timestamp
:type anomaly_timestamp: int
:return: list
:rtype: list
"""
message = 'luminosity_remote_data returned'
success = False
luminosity_data = []
logger.info('luminosity_remote_data :: determining unique_metrics')
unique_metrics = []
# If you modify the values of 61 or 600 here, it must be modified in the
# luminosity_remote_data function in
# skyline/luminosity/process_correlations.py as well
# @modified 20201203 - Feature #3860: luminosity - handle low frequency data
# Use the metric resolution
# from_timestamp = int(anomaly_timestamp) - 600
# until_timestamp = int(anomaly_timestamp) + 61
from_timestamp = int(anomaly_timestamp) - (resolution * 10)
until_timestamp = int(anomaly_timestamp) + (resolution + 1)
try:
# @modified 20201123 - Feature #3824: get_cluster_data
# Feature #2464: luminosity_remote_data
# Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# unique_metrics = list(REDIS_CONN.smembers(settings.FULL_NAMESPACE + 'unique_metrics'))
REDIS_CONN_DECODED = get_redis_conn_decoded(skyline_app)
unique_metrics = list(REDIS_CONN_DECODED.smembers(settings.FULL_NAMESPACE + 'unique_metrics'))
except Exception as e:
logger.error('error :: %s' % str(e))
logger.error('error :: luminosity_remote_data :: could not determine unique_metrics from Redis set')
if not unique_metrics:
message = 'error :: luminosity_remote_data :: could not determine unique_metrics from Redis set'
return luminosity_data, success, message
logger.info('luminosity_remote_data :: %s unique_metrics' % str(len(unique_metrics)))
# @added 20210125 - Feature #3956: luminosity - motifs
# Improve luminosity_remote_data performance
# Although the is_derivative_metric function is appropriate in the below
# loop here that is not the most performant manner in which to determine if
# the metrics are derivatives, as it needs to fire on every metric, so here
# we just trust the Redis derivative_metrics list. This increases
# performance on 1267 metrics from 6.442009 seconds to 1.473067 seconds
try:
# @modified 20211012 - Feature #4280: aet.metrics_manager.derivative_metrics Redis hash
# derivative_metrics = list(REDIS_CONN_DECODED.smembers('derivative_metrics'))
derivative_metrics = list(REDIS_CONN_DECODED.smembers('aet.metrics_manager.derivative_metrics'))
except:
derivative_metrics = []
# assigned metrics
assigned_min = 0
assigned_max = len(unique_metrics)
assigned_keys = range(assigned_min, assigned_max)
# Compile assigned metrics
assigned_metrics = [unique_metrics[index] for index in assigned_keys]
# Check if this process is unnecessary
if len(assigned_metrics) == 0:
message = 'error :: luminosity_remote_data :: assigned_metrics length is 0'
logger.error(message)
return luminosity_data, success, message
# Multi get series
raw_assigned_failed = True
try:
raw_assigned = REDIS_CONN.mget(assigned_metrics)
raw_assigned_failed = False
except:
logger.info(traceback.format_exc())
message = 'error :: luminosity_remote_data :: failed to mget raw_assigned'
logger.error(message)
return luminosity_data, success, message
if raw_assigned_failed:
message = 'error :: luminosity_remote_data :: failed to mget raw_assigned'
logger.error(message)
return luminosity_data, success, message
# Distill timeseries strings into lists
for i, metric_name in enumerate(assigned_metrics):
timeseries = []
try:
raw_series = raw_assigned[i]
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
except:
timeseries = []
if not timeseries:
continue
# @added 20200507 - Feature #3532: Sort all time series
# To ensure that there are no unordered timestamps in the time
# series which are artefacts of the collector or carbon-relay, sort
# all time series by timestamp before analysis.
original_timeseries = timeseries
if original_timeseries:
timeseries = sort_timeseries(original_timeseries)
del original_timeseries
# Convert the time series if this is a known_derivative_metric
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
# @added 20201117 - Feature #3824: get_cluster_data
# Feature #2464: luminosity_remote_data
# Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Convert metric_name bytes to str
metric_name = str(metric_name)
# @modified 20210125 - Feature #3956: luminosity - motifs
# Improve luminosity_remote_data performance
# Although the is_derivative_metric function is appropriate here it is
# not the most performant manner in which to determine if the metric
# is a derivative in this case as it needs to fire on every metric, so
# here we just trust the Redis derivative_metrics list. This increases
# performance on 1267 metrics from 6.442009 seconds to 1.473067 seconds
# if metric_name.startswith(settings.FULL_NAMESPACE):
# base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
# else:
# base_name = metric_name
# known_derivative_metric = is_derivative_metric('webapp', base_name)
known_derivative_metric = False
if metric_name in derivative_metrics:
known_derivative_metric = True
if known_derivative_metric:
try:
derivative_timeseries = nonNegativeDerivative(timeseries)
timeseries = derivative_timeseries
except:
logger.error('error :: nonNegativeDerivative failed')
# @modified 20210125 - Feature #3956: luminosity - motifs
# Improve luminosity_remote_data performance
# The list comprehension method halves the time to create the
# correlate_ts from 0.0008357290644198656 to 0.0004676780663430691 seconds
# correlate_ts = []
# for ts, value in timeseries:
# if int(ts) < from_timestamp:
# continue
# if int(ts) <= anomaly_timestamp:
# correlate_ts.append((int(ts), value))
# if int(ts) > (anomaly_timestamp + until_timestamp):
# break
correlate_ts = [x for x in timeseries if x[0] >= from_timestamp if x[0] <= until_timestamp]
if not correlate_ts:
continue
metric_data = [str(metric_name), correlate_ts]
luminosity_data.append(metric_data)
logger.info('luminosity_remote_data :: %s valid metric time series data preprocessed for the remote request' % str(len(luminosity_data)))
return luminosity_data, success, message
# @added 20200908 - Feature #3740: webapp - anomaly API endpoint
[docs]def panorama_anomaly_details(anomaly_id):
"""
Gets the details for an anomaly from the database.
"""
logger.info('panorama_anomaly_details - getting details for anomaly id %s' % str(anomaly_id))
metric_id = 0
# Added nosec to exclude from bandit tests
query = 'select metric_id from anomalies WHERE id=\'%s\'' % str(anomaly_id) # nosec
try:
result = mysql_select(skyline_app, query)
metric_id = int(result[0][0])
except:
logger.error(traceback.format_exc())
logger.error('error :: panorama_anomaly_details - failed to get metric_id from db')
return False
if metric_id > 0:
logger.info('panorama_anomaly_details - getting metric for metric_id - %s' % str(metric_id))
# Added nosec to exclude from bandit tests
query = 'select metric from metrics WHERE id=\'%s\'' % str(metric_id) # nosec
try:
result = mysql_select(skyline_app, query)
metric = str(result[0][0])
except:
logger.error(traceback.format_exc())
logger.error('error :: panorama_anomaly_details - failed to get metric from db')
return False
query = 'select id, metric_id, anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp, anomaly_end_timestamp from anomalies WHERE id=\'%s\'' % str(anomaly_id) # nosec
logger.info('panorama_anomaly_details - running query - %s' % str(query))
try:
rows = mysql_select(skyline_app, query)
except:
logger.error(traceback.format_exc())
logger.error('error :: panorama_anomaly_details - failed to get anomaly details from db')
return False
anomaly_data = None
for row in rows:
anomalous_datapoint = float(row[2])
anomaly_timestamp = int(row[3])
full_duration = int(row[4])
created_timestamp = str(row[5])
try:
anomaly_end_timestamp = int(row[6])
except:
anomaly_end_timestamp = None
anomaly_data = [int(anomaly_id), str(metric), anomalous_datapoint, anomaly_timestamp, full_duration, created_timestamp, anomaly_end_timestamp]
break
return anomaly_data
# @added 20201103 - Feature #3824: get_cluster_data
# @modified 20201127 - Feature #3824: get_cluster_data
# Feature #3820: HORIZON_SHARDS
# Allow to query only a single host in the cluster so that just the response
# can from a single host in the cluster can be evaluated
[docs]def get_cluster_data(api_endpoint, data_required, only_host='all', endpoint_params={}):
"""
Gets data from the /api of REMOTE_SKYLINE_INSTANCES. This allows the user
to query a single Skyline webapp node in a cluster and the Skyline instance
will respond with the concentated responses of all the
REMOTE_SKYLINE_INSTANCES in one a single response.
:param api_endpoint: the api endpoint to request data from the remote
Skyline instances
:param data_required: the element from the api json response that is
required
:param only_host: The remote Skyline host to query, if not passed all are
queried.
:param endpoint_params: A dictionary of any additional parameters that may
be required
:type api_endpoint: str
:type data_required: str
:type only_host: str
:type endpoint_params: dict
:return: list
:rtype: list
"""
try:
connect_timeout = int(settings.GRAPHITE_CONNECT_TIMEOUT)
read_timeout = int(settings.GRAPHITE_READ_TIMEOUT)
except:
connect_timeout = 5
read_timeout = 10
use_timeout = (int(connect_timeout), int(read_timeout))
data = []
if only_host != 'all':
logger.info('get_cluster_data :: querying all remote hosts as only_host set to %s' % (
str(only_host)))
# @added 20220115 - Feature #4376: webapp - update_external_settings
# Now determined based on endpoint_params if passed and handle GET
# and POST
normal_api = True
method = 'GET'
post_data = None
if endpoint_params:
if 'api_endpoint' in list(endpoint_params.keys()):
api_endpoint = endpoint_params['api_endpoint']
normal_api = False
logger.info('get_cluster_data :: overriding api_endpoint with %s' % (
str(api_endpoint)))
if 'method' in list(endpoint_params.keys()):
method = endpoint_params['method']
logger.info('get_cluster_data :: overriding method with %s' % (
str(method)))
if 'post_data' in list(endpoint_params.keys()):
post_data = endpoint_params['post_data']
logger.info('get_cluster_data :: post_data was passed')
for item in settings.REMOTE_SKYLINE_INSTANCES:
r = None
user = None
password = None
use_auth = False
# @added 20201127 - Feature #3824: get_cluster_data
# Feature #3820: HORIZON_SHARDS
# Allow to query only a single host in the cluster so that just the response
# can from a single host in the cluster can be evaluated
if only_host != 'all':
if only_host != str(item[0]):
logger.info('get_cluster_data :: not querying %s as only_host set to %s' % (
str(item[0]), str(only_host)))
continue
logger.info('get_cluster_data :: querying %s as only_host set to %s' % (
str(item[0]), str(only_host)))
try:
user = str(item[1])
password = str(item[2])
use_auth = True
except:
user = None
password = None
logger.info('get_cluster_data :: querying %s for %s on %s' % (
str(item[0]), str(data_required), str(api_endpoint)))
# @added 20220115 - Feature #4376: webapp - update_external_settings
# Now determined based on endpoint_params if passed and handle GET
# and POST
url = '%s/api?%s' % (str(item[0]), api_endpoint)
if not normal_api:
url = '%s/%s' % (str(item[0]), str(api_endpoint))
r = None
try:
# @modified 20220115 - Feature #4376: webapp - update_external_settings
# Now determined based on endpoint_params if passed and handle GET
# and POST
# url = '%s/api?%s' % (str(item[0]), api_endpoint)
if method == 'GET':
if use_auth:
r = requests.get(url, timeout=use_timeout, auth=(user, password))
else:
r = requests.get(url, timeout=use_timeout)
if method == 'POST':
connect_timeout = 5
read_timeout = 10
use_timeout = (int(connect_timeout), int(read_timeout))
if use_auth:
r = requests.post(url, auth=(user, password), json=post_data, timeout=use_timeout, verify=settings.VERIFY_SSL)
else:
r = requests.post(url, json=post_data, timeout=use_timeout, verify=settings.VERIFY_SSL)
except:
logger.error(traceback.format_exc())
logger.error('error :: get_cluster_data :: failed to get %s from %s' % (
api_endpoint, str(item)))
if r:
# @added 20220503 - Feature #4530: namespace.analysed_events
# Added 404 to handle resource not on cluster node
if r.status_code == 404:
logger.warning('get_cluster_data :: %s from %s responded with status code %s and reason %s' % (
api_endpoint, str(item), str(r.status_code), str(r.reason)))
# @modified 20220504 - Feature #4530: namespace.analysed_events
# return data
continue
if r.status_code != 200:
logger.error('error :: get_cluster_data :: %s from %s responded with status code %s and reason %s' % (
api_endpoint, str(item), str(r.status_code), str(r.reason)))
js = None
try:
js = r.json()
except:
logger.error(traceback.format_exc())
logger.error('error :: get_cluster_data :: failed to get json from the response from %s on %s' % (
api_endpoint, str(item)))
remote_data = []
if js:
logger.info('get_cluster_data :: got response for %s from %s' % (
str(data_required), str(item[0])))
try:
remote_data = js['data'][data_required]
except:
logger.error(traceback.format_exc())
logger.error('error :: get_cluster_data :: failed to build remote_data from %s on %s' % (
str(data_required), str(item)))
if remote_data:
# @modified 20210617 - Feature #4144: webapp - stale_metrics API endpoint
# Handle list and dic items
if isinstance(remote_data, list):
logger.info('get_cluster_data :: got %s items %s from %s' % (
str(len(remote_data)), str(data_required), str(item[0])))
data = data + remote_data
if isinstance(remote_data, dict):
logger.info('get_cluster_data :: got %s items %s from %s' % (
str(len(remote_data)), str(data_required), str(item[0])))
data.append(remote_data)
if isinstance(remote_data, bool):
logger.info('get_cluster_data :: got %s %s from %s' % (
str(remote_data), str(data_required), str(item[0])))
data.append(remote_data)
else:
# @added 20220503 - Feature #4530: namespace.analysed_events
# Added 404 to handle resource not on cluster node
if r.status_code == 404:
logger.warning('get_cluster_data :: %s from %s responded with status code %s and reason %s' % (
api_endpoint, str(item), str(r.status_code), str(r.reason)))
# @modified 20220504 - Feature #4530: namespace.analysed_events
# return data
continue
logger.error('error :: get_cluster_data :: failed to get response from %s on %s' % (
api_endpoint, str(item)))
return data
# @added 20201125 - Feature #3850: webapp - yhat_values API endoint
[docs]def get_yhat_values(
metric, from_timestamp, until_timestamp, include_value, include_mean,
include_yhat_real_lower, include_anomalous_periods):
timeseries = []
try:
logger.info('get_yhat_values :: for %s from %s until %s' % (
metric, str(from_timestamp), str(until_timestamp)))
timeseries = get_graphite_metric('webapp', metric, from_timestamp, until_timestamp, 'list', 'object')
except:
logger.error(traceback.format_exc())
logger.error('error :: get_yhat_values :: failed to get timeseries data for %s' % (
metric))
return None
yhat_dict = {}
logger.info('get_yhat_values :: %s values in timeseries for %s to calculate yhat values from' % (
str(len(timeseries)), metric))
# @added 20210126 - Task #3958: Handle secondary algorithms in yhat_values
anomalous_periods_dict = {}
if timeseries:
metric_id = 0
if metric:
logger.info('get_yhat_values :: getting db id for metric - %s' % metric)
query = 'select id from metrics WHERE metric=\'%s\'' % metric # nosec
try:
result = mysql_select(skyline_app, query)
metric_id = int(result[0][0])
except:
logger.error('error :: get_yhat_values :: failed to get id from db: %s' % traceback.format_exc())
anomalies_at = []
if metric_id:
logger.info('get_yhat_values :: getting latest anomalies')
query = 'select anomaly_timestamp, anomalous_datapoint, anomaly_end_timestamp from anomalies WHERE metric_id=%s AND anomaly_timestamp >= %s AND anomaly_timestamp <= %s' % (
str(metric_id), str(from_timestamp), str(until_timestamp))
try:
rows = mysql_select(skyline_app, query)
for row in rows:
a_timestamp = int(row[0])
a_value = float(row[1])
try:
a_end_timestamp = int(row[2])
except:
a_end_timestamp = 0
anomalies_at.append([a_timestamp, a_value, a_end_timestamp])
except:
logger.error('error :: get_yhat_values :: failed to get anomalies from db: %s' % traceback.format_exc())
rows = []
timeseries_ranges = []
last_timestamp = None
for index, item in enumerate(timeseries):
if last_timestamp:
t_range = list(range(last_timestamp, int(item[0])))
timeseries_ranges.append([index, t_range, item])
last_timestamp = int(item[0])
t_range = list(range(last_timestamp, (int(item[0]) + 1)))
timeseries_ranges.append([index, t_range, item])
anomalies_index = []
for index, time_range, item in timeseries_ranges:
for a_timestamp, a_value, a_end_timestamp in anomalies_at:
if a_timestamp in time_range:
anomalies_index.append([index, item])
anomalous_period_indices = []
anomalies_indices = [item[0] for item in anomalies_index]
for index, item in enumerate(timeseries):
for idx in anomalies_indices:
anomaly_index_range = list(range((idx - 3), (idx + 5)))
if index in anomaly_index_range:
for i in anomaly_index_range:
anomalous_period_indices.append(i)
anomaly_timestamps_indices = []
anomalies = []
for item in anomalies_index:
anomaly_timestamps_indices.append(item[0])
anomalies.append(item[1])
top = []
bottom = []
left = []
right = []
if timeseries:
try:
array_amin = np.amin([item[1] for item in timeseries])
values = []
# @added 20210126 - Task #3958: Handle secondary algorithms in yhat_values
# last_value = None
# start_anomalous_period = None
# end_anomalous_period = None
# sigma3_array = []
# sigma3_values = []
# extended_values = []
last_breach = 0
breach_for = 10
last_breach_vector = 'positive'
# last_used_extended = False
# last_used_extended_value = None
top = []
bottom = []
left = []
right = []
# @modified 20210126 - Task #3958: Handle secondary algorithms in yhat_values
# for ts, value in timeseries:
# values.append(value)
# va = np.array(values)
# va_mean = va.mean()
# va_std_3 = 3 * va.std()
for index, item in enumerate(timeseries):
ts = item[0]
value = item[1]
values.append(value)
va = np.array(values)
va_mean = va.mean()
va_std_3 = 3 * va.std()
# @added 20210126 - Task #3958: Handle secondary algorithms in yhat_values
anomalous_period = 0
three_sigma_lower = va_mean - va_std_3
three_sigma_upper = va_mean + va_std_3
# sigma3_array.append([ts, value, va_mean, [three_sigma_lower, three_sigma_upper]])
# sigma3_values.append([three_sigma_lower, three_sigma_upper])
use_extended = False
drop_expected_range = False
if index not in anomaly_timestamps_indices:
use_extended = True
# if last_used_extended:
# last_used_extended_value = None
else:
drop_expected_range = True
for anomaly_index in anomaly_timestamps_indices:
if index > anomaly_index:
# if index < (anomaly_index + 30):
if index < (anomaly_index + breach_for):
use_extended = False
anomalous_period = 1
break
extended_lower = three_sigma_lower
extended_upper = three_sigma_upper
if use_extended:
if item[1] > three_sigma_upper:
extended_lower = three_sigma_lower
extended_upper = (item[1] + ((item[1] / 100) * 5))
last_breach = index
last_breach_vector = 'positive'
elif item[1] < three_sigma_lower:
extended_lower = (item[1] - ((item[1] / 100) * 5))
extended_upper = three_sigma_upper
last_breach = index
last_breach_vector = 'negative'
elif index < (last_breach + breach_for) and index > last_breach:
if last_breach_vector == 'positive':
extended_value = (item[1] + ((item[1] / 100) * 5))
three_sigma_value = three_sigma_upper
if three_sigma_value > extended_value:
extended_value = (three_sigma_value + ((three_sigma_value / 100) * 5))
extended_lower = three_sigma_lower
extended_upper = extended_value
else:
extended_lower = (item[1] - ((item[1] / 100) * 5))
extended_upper = three_sigma_upper
if drop_expected_range:
use_extended = False
if last_breach_vector == 'positive':
extended_lower = three_sigma_lower - (three_sigma_upper * 0.1)
extended_upper = item[1] - (item[1] * 0.1)
if last_breach_vector == 'negative':
extended_lower = three_sigma_lower - (three_sigma_lower * 0.1)
extended_upper = item[1] + (item[1] * 0.1)
else:
extended_lower = three_sigma_lower
extended_upper = three_sigma_upper
if drop_expected_range:
use_extended = False
if last_breach_vector == 'positive':
extended_lower = three_sigma_lower - (three_sigma_upper * 0.1)
extended_upper = item[1] - (item[1] * 0.1)
if last_breach_vector == 'negative':
extended_lower = three_sigma_lower - (three_sigma_lower * 0.1)
extended_upper = item[1] + (item[1] * 0.1)
else:
extended_lower = three_sigma_lower
extended_upper = three_sigma_upper
if drop_expected_range:
use_extended = False
if last_breach_vector == 'positive':
extended_lower = three_sigma_lower - (three_sigma_upper * 0.1)
extended_upper = item[1] - (item[1] * 0.1)
if last_breach_vector == 'negative':
extended_lower = three_sigma_lower - (three_sigma_lower * 0.1)
extended_upper = item[1] + (item[1] * 0.1)
# extended_values.append([extended_lower, extended_upper])
lower = extended_lower
upper = extended_upper
if index in sorted(list(set(anomalous_period_indices))):
if index in anomalies_indices:
continue
for idx in anomaly_timestamps_indices:
if (index + 3) == idx:
a_top = extended_upper + (extended_upper * 0.1)
top.append(a_top)
a_bottom = extended_lower - (extended_lower * 0.1)
bottom.append(a_bottom)
a_left = item[0]
left.append(a_left)
if (index - 4) == idx:
a_right = item[0]
right.append(a_right)
# @modified 20201126 - Feature #3850: webapp - yhat_values API endoint
# Change dict key to int not float
int_ts = int(ts)
yhat_dict[int_ts] = {}
if include_value:
yhat_dict[int_ts]['value'] = value
if include_mean:
yhat_dict[int_ts]['mean'] = va_mean
if include_mean:
yhat_dict[int_ts]['mean'] = va_mean
# @modified 20210201 - Task #3958: Handle secondary algorithms in yhat_values
# yhat_lower = va_mean - va_std_3
yhat_lower = lower
yhat_upper = upper
if include_yhat_real_lower:
# @modified 20201202 - Feature #3850: webapp - yhat_values API endoint
# Set the yhat_real_lower correctly
# if yhat_lower < array_amin and array_amin == 0:
# yhat_dict[int_ts]['yhat_real_lower'] = array_amin
if yhat_lower < 0 and array_amin > -0.0000000001:
yhat_dict[int_ts]['yhat_real_lower'] = 0
else:
yhat_dict[int_ts]['yhat_real_lower'] = yhat_lower
yhat_dict[int_ts]['yhat_lower'] = yhat_lower
# @modified 20210201 - Task #3958: Handle secondary algorithms in yhat_values
yhat_dict[int_ts]['yhat_upper'] = va_mean + va_std_3
yhat_dict[int_ts]['yhat_upper'] = upper
# @added 20210201 - Task #3958: Handle secondary algorithms in yhat_values
if use_extended:
if yhat_lower != three_sigma_lower:
yhat_dict[int_ts]['3sigma_lower'] = three_sigma_lower
if yhat_upper != three_sigma_upper:
yhat_dict[int_ts]['3sigma_upper'] = three_sigma_upper
if include_anomalous_periods:
yhat_dict[int_ts]['anomalous_period'] = anomalous_period
except:
logger.error(traceback.format_exc())
logger.error('error :: get_yhat_values :: failed create yhat_dict for %s' % (
metric))
return None
logger.info('get_yhat_values :: calculated yhat values for %s data points' % str(len(yhat_dict)))
if yhat_dict:
yhat_dict_cache_key = 'webapp.%s.%s.%s.%s.%s.%s' % (
metric, str(from_timestamp), str(until_timestamp),
str(include_value), str(include_mean),
str(include_yhat_real_lower))
logger.info('get_yhat_values :: saving yhat_dict to Redis key - %s' % yhat_dict_cache_key)
try:
REDIS_CONN.setex(yhat_dict_cache_key, 14400, str(yhat_dict))
logger.info('get_yhat_values :: created Redis key - %s with 14400 TTL' % yhat_dict_cache_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: get_yhat_values :: failed to setex Redis key - %s' % yhat_dict_cache_key)
# @added 20210126 - Task #3958: Handle secondary algorithms in yhat_values
# Add rectangle coordinates that describe anomalous periods
anomalous_periods_dict['rectangles'] = {}
anomalous_periods_dict['rectangles']['top'] = top
anomalous_periods_dict['rectangles']['bottom'] = bottom
anomalous_periods_dict['rectangles']['left'] = left
anomalous_periods_dict['rectangles']['right'] = right
if anomalous_periods_dict:
yhat_anomalous_periods_dict_cache_key = 'webapp.%s.%s.%s.%s.%s.%s.anomalous_periods' % (
metric, str(from_timestamp), str(until_timestamp),
str(include_value), str(include_mean),
str(include_yhat_real_lower))
logger.info('get_yhat_values :: saving yhat_dict to Redis key - %s' % yhat_anomalous_periods_dict_cache_key)
try:
REDIS_CONN.setex(yhat_anomalous_periods_dict_cache_key, 14400, str(yhat_anomalous_periods_dict_cache_key))
logger.info('get_yhat_values :: created Redis key - %s with 14400 TTL' % yhat_anomalous_periods_dict_cache_key)
except:
logger.error(traceback.format_exc())
logger.error('error :: get_yhat_values :: failed to setex Redis key - %s' % yhat_dict_cache_key)
# @modified 20210201 - Task #3958: Handle secondary algorithms in yhat_values
# return yhat_dict
return yhat_dict, anomalous_periods_dict
# @added 20210326 - Feature #3994: Panorama - mirage not anomalous
[docs]def get_mirage_not_anomalous_metrics(
metric=None, from_timestamp=None, until_timestamp=None,
anomalies=False):
"""
Determine mirage not anomalous metrics from mirage.panorama.not_anomalous_metrics
and ionosphere.panorama.not_anomalous_metrics
:param metric: base_name
:param from_timestamp: the from_timestamp
:param until_timestamp: the until_timestamp
:param anomalies: whether to report anomalies as well
:type metric: str
:type from_timestamp: int
:type until_timestamp: int
:type anomalies: boolean
:return: (dict, dict)
:rtype: tuple
"""
logger.info(
'get_mirage_not_anomalous_metrics called with - metric: %s, from_timestamp: %s, until_timestamp: %s, anomalies: %s' % (
str(metric), str(from_timestamp), str(until_timestamp),
str(anomalies)))
current_date = datetime.datetime.now().date()
current_date_str = '%s 00:00' % str(current_date)
# from_timestamp_date_str = current_date_str
# until_timestamp_date_str = current_date_str
# until_timestamp = str(int(time.time()))
base_name = None
if 'metric' in request.args:
base_name = request.args.get('metric', None)
if base_name == 'all':
base_name = None
if metric:
base_name = metric
# if not from_timestamp and 'from_timestamp' in request.args:
if not from_timestamp:
from_timestamp = request.args.get('from_timestamp', 'today')
if from_timestamp == 'today':
# from_timestamp_date_str = current_date_str
# @modified 20211021 - handle multiple date formats
try:
new_from_timestamp = time.mktime(datetime.datetime.strptime(current_date_str, '%Y-%m-%d %H:%M').timetuple())
except Exception as err:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: panorama_request :: failed to determine unix timestamp from current_date_str - %s' % str(err)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
from_timestamp = int(new_from_timestamp)
if from_timestamp and from_timestamp != 'today':
if ":" in str(from_timestamp):
# try:
# datetime_object = datetime.datetime.strptime(from_timestamp, '%Y-%m-%d %H:%M')
# except:
# # Handle old format
# datetime_object = datetime.datetime.strptime(from_timestamp, '%Y%m%d %H:%M')
# from_timestamp_date_str = str(datetime_object.date())
# @modified 20211021 - handle multiple date formats
try:
new_from_timestamp = time.mktime(datetime.datetime.strptime(from_timestamp, '%Y-%m-%d %H:%M').timetuple())
except ValueError:
new_from_timestamp = time.mktime(datetime.datetime.strptime(from_timestamp, '%Y%m%d %H:%M').timetuple())
except Exception as err:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: panorama_request :: failed to unix timestamp from from_timestamp - %s' % str(err)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
from_timestamp = int(new_from_timestamp)
if not from_timestamp:
# from_timestamp_date_str = current_date_str
# @modified 20211021 - handle multiple date formats
try:
new_from_timestamp = time.mktime(datetime.datetime.strptime(current_date_str, '%Y-%m-%d %H:%M').timetuple())
except Exception as err:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: panorama_request :: failed to unix get timestamp from current_date_str - %s' % str(err)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
from_timestamp = int(new_from_timestamp)
# if not until_timestamp and 'until_timestamp' in request.args:
if not until_timestamp:
until_timestamp = request.args.get('until_timestamp', 'all')
if until_timestamp == 'all':
# until_timestamp_date_str = current_date_str
until_timestamp = int(time.time())
if until_timestamp and until_timestamp != 'all':
if ":" in str(until_timestamp):
# datetime_object = datetime.datetime.strptime(until_timestamp, '%Y-%m-%d %H:%M')
# until_timestamp_date_str = str(datetime_object.date())
# @modified 20211021 - handle multiple date formats
try:
new_until_timestamp = time.mktime(datetime.datetime.strptime(until_timestamp, '%Y-%m-%d %H:%M').timetuple())
except ValueError:
new_until_timestamp = time.mktime(datetime.datetime.strptime(until_timestamp, '%Y%m%d %H:%M').timetuple())
except Exception as err:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: panorama_request :: failed to unix timestamp from until_timestamp - %s' % str(err)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
until_timestamp = int(new_until_timestamp)
if not until_timestamp:
# until_timestamp_date_str = current_date_str
until_timestamp = int(time.time())
get_anomalies = False
if 'anomalies' in request.args:
anomalies_str = request.args.get('anomalies', 'false')
if anomalies_str == 'true':
get_anomalies = True
logger.info(
'get_mirage_not_anomalous_metrics - also determining anomalies for %s' % (
str(base_name)))
logger.info(
'get_mirage_not_anomalous_metrics - base_name: %s, from_timestamp: %s, until_timestamp: %s' % (
str(base_name), str(from_timestamp), str(until_timestamp)))
redis_hash = 'mirage.panorama.not_anomalous_metrics'
mirage_panorama_not_anomalous = {}
try:
REDIS_CONN_DECODED = get_redis_conn_decoded(skyline_app)
mirage_panorama_not_anomalous = REDIS_CONN_DECODED.hgetall(redis_hash)
logger.info('get_mirage_not_anomalous_metrics :: %s entries to check in the %s Redis hash key' % (
str(len(mirage_panorama_not_anomalous)), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed to get Redis hash key %s' % redis_hash)
mirage_panorama_not_anomalous = {}
all_timestamp_float_strings = []
if mirage_panorama_not_anomalous:
all_timestamp_float_strings = list(mirage_panorama_not_anomalous.keys())
logger.info('get_mirage_not_anomalous_metrics :: %s all_timestamp_float_strings to check from the %s Redis hash key' % (
str(len(all_timestamp_float_strings)), redis_hash))
timestamp_floats = []
if all_timestamp_float_strings:
for timestamp_float_string in all_timestamp_float_strings:
if int(float(timestamp_float_string)) >= int(from_timestamp):
if int(float(timestamp_float_string)) <= int(until_timestamp):
timestamp_floats.append(timestamp_float_string)
logger.info('get_mirage_not_anomalous_metrics :: found %s timestamp entries in range from the %s Redis hash key' % (
str(len(timestamp_floats)), redis_hash))
not_anomalous_dict = {}
not_anomalous_count = 0
for timestamp_float_string in timestamp_floats:
try:
timestamp_float_dict = literal_eval(mirage_panorama_not_anomalous[timestamp_float_string])
for i_metric in list(timestamp_float_dict.keys()):
if base_name:
if base_name != i_metric:
continue
logger.info('get_mirage_not_anomalous_metrics :: found entry for %s' % base_name)
try:
metric_dict = not_anomalous_dict[i_metric]
except:
metric_dict = {}
not_anomalous_dict[i_metric] = {}
not_anomalous_dict[i_metric]['from'] = int(from_timestamp)
not_anomalous_dict[i_metric]['until'] = int(until_timestamp)
not_anomalous_dict[i_metric]['timestamps'] = {}
metric_timestamp = timestamp_float_dict[i_metric]['timestamp']
try:
metric_timestamp_dict = not_anomalous_dict[i_metric]['timestamps'][metric_timestamp]
except:
not_anomalous_dict[i_metric]['timestamps'][metric_timestamp] = {}
metric_timestamp_dict = {}
if not metric_timestamp_dict:
not_anomalous_dict[i_metric]['timestamps'][metric_timestamp]['value'] = timestamp_float_dict[i_metric]['value']
not_anomalous_dict[i_metric]['timestamps'][metric_timestamp]['hours_to_resolve'] = timestamp_float_dict[i_metric]['hours_to_resolve']
not_anomalous_count += 1
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed iterate mirage_panorama_not_anomalous entry')
logger.info(
'get_mirage_not_anomalous_metrics - not_anomalous_count: %s, for base_name: %s' % (
str(not_anomalous_count), str(base_name)))
# @added 20210429 - Feature #3994: Panorama - mirage not anomalous
# A hash is added to the ionosphere.panorama.not_anomalous_metrics for
# every metric that is found to be not anomalous.
redis_hash = 'ionosphere.panorama.not_anomalous_metrics'
ionosphere_panorama_not_anomalous = {}
try:
REDIS_CONN_DECODED = get_redis_conn_decoded(skyline_app)
ionosphere_panorama_not_anomalous = REDIS_CONN_DECODED.hgetall(redis_hash)
logger.info('get_mirage_not_anomalous_metrics :: %s entries to check in the %s Redis hash key' % (
str(len(ionosphere_panorama_not_anomalous)), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed to get Redis hash key %s' % redis_hash)
ionosphere_panorama_not_anomalous = {}
ionosphere_all_timestamp_float_strings = []
if ionosphere_panorama_not_anomalous:
ionosphere_all_timestamp_float_strings = list(ionosphere_panorama_not_anomalous.keys())
ionosphere_timestamp_floats = []
if all_timestamp_float_strings:
for timestamp_float_string in ionosphere_all_timestamp_float_strings:
if int(float(timestamp_float_string)) >= int(from_timestamp):
if int(float(timestamp_float_string)) <= int(until_timestamp):
ionosphere_timestamp_floats.append(timestamp_float_string)
for timestamp_float_string in ionosphere_timestamp_floats:
try:
timestamp_float_dict = literal_eval(ionosphere_panorama_not_anomalous[timestamp_float_string])
for i_metric in list(timestamp_float_dict.keys()):
if base_name:
if base_name != i_metric:
continue
try:
metric_dict = not_anomalous_dict[i_metric]
except:
metric_dict = {}
not_anomalous_dict[i_metric] = {}
not_anomalous_dict[i_metric]['from'] = int(from_timestamp)
not_anomalous_dict[i_metric]['until'] = int(until_timestamp)
not_anomalous_dict[i_metric]['timestamps'] = {}
del metric_dict
metric_timestamp = timestamp_float_dict[i_metric]['timestamp']
try:
metric_timestamp_dict = not_anomalous_dict[i_metric]['timestamps'][metric_timestamp]
except:
not_anomalous_dict[i_metric]['timestamps'][metric_timestamp] = {}
metric_timestamp_dict = {}
if not metric_timestamp_dict:
not_anomalous_dict[i_metric]['timestamps'][metric_timestamp]['value'] = timestamp_float_dict[i_metric]['value']
not_anomalous_dict[i_metric]['timestamps'][metric_timestamp]['hours_to_resolve'] = timestamp_float_dict[i_metric]['hours_to_resolve']
not_anomalous_count += 1
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed iterate ionosphere_panorama_not_anomalous entry')
logger.info(
'get_mirage_not_anomalous_metrics - not_anomalous_count: %s (with ionosphere), for base_name: %s' % (
str(not_anomalous_count), str(base_name)))
anomalies_dict = {}
if get_anomalies:
for i_metric in list(not_anomalous_dict.keys()):
metric_id = None
query = 'SELECT id FROM metrics WHERE metric=\'%s\'' % i_metric
try:
results = mysql_select(skyline_app, query)
for item in results:
metric_id = int(item[0])
break
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: querying MySQL - %s' % query)
query_start_str = 'SELECT anomaly_timestamp,anomalous_datapoint,anomaly_end_timestamp,full_duration FROM anomalies'
if metric_id:
query = '%s WHERE metric_id=%s AND anomaly_timestamp > %s AND anomaly_timestamp < %s' % (
query_start_str, metric_id, str(from_timestamp), str(until_timestamp))
else:
query = '%s WHERE anomaly_timestamp > %s AND anomaly_timestamp < %s' % (
query_start_str, str(from_timestamp), str(until_timestamp))
anomalies = []
try:
results = mysql_select(skyline_app, query)
for item in results:
anomalies.append([i_metric, int(item[0]), float(item[1]), float(item[2]), round(int(item[3]) / 3600)])
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: querying MySQL - %s' % query)
anomalies_dict[i_metric] = {}
anomalies_dict[i_metric]['from'] = int(from_timestamp)
anomalies_dict[i_metric]['until'] = int(until_timestamp)
anomalies_dict[i_metric]['timestamps'] = {}
if anomalies:
for a_metric, timestamp, value, anomaly_end_timestamp, hours_to_resolve in anomalies:
anomalies_dict[i_metric]['timestamps'][timestamp] = {}
anomalies_dict[i_metric]['timestamps'][timestamp]['value'] = value
anomalies_dict[i_metric]['timestamps'][timestamp]['hours_to_resolve'] = hours_to_resolve
anomalies_dict[i_metric]['timestamps'][timestamp]['end_timestamp'] = anomaly_end_timestamp
# @added 20210328 - Feature #3994: Panorama - mirage not anomalous
# Save key to use in not_anomalous_metric
not_anomalous_dict_key = 'panorama.not_anomalous_dict.%s.%s' % (
str(from_timestamp), str(until_timestamp))
not_anomalous_dict_key_ttl = 600
if base_name:
not_anomalous_dict_key = 'panorama.not_anomalous_dict.%s.%s.%s' % (
str(from_timestamp), str(until_timestamp), base_name)
not_anomalous_dict_key_ttl = 600
try:
REDIS_CONN.setex(not_anomalous_dict_key, not_anomalous_dict_key_ttl, str(not_anomalous_dict))
logger.info('get_mirage_not_anomalous_metrics :: created Redis key - %s with %s TTL' % (
not_anomalous_dict_key, str(not_anomalous_dict_key_ttl)))
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed to created Redis key - %s with %s TTL' % (
not_anomalous_dict_key, str(not_anomalous_dict_key_ttl)))
if not base_name:
recent_not_anomalous_dict_key = 'panorama.not_anomalous_dict.recent'
recent_not_anomalous_dict_key_ttl = 180
try:
REDIS_CONN.setex(recent_not_anomalous_dict_key, recent_not_anomalous_dict_key_ttl, str(not_anomalous_dict))
logger.info('get_mirage_not_anomalous_metrics :: created Redis key - %s with %s TTL' % (
recent_not_anomalous_dict_key, str(recent_not_anomalous_dict_key_ttl)))
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed to created Redis key - %s with %s TTL' % (
recent_not_anomalous_dict_key, str(recent_not_anomalous_dict_key_ttl)))
anomalies_dict_key = 'panorama.anomalies_dict.%s.%s' % (
str(from_timestamp), str(until_timestamp))
anomalies_dict_key_ttl = 600
if base_name:
anomalies_dict_key = 'panorama.anomalies_dict.%s.%s.%s' % (
str(from_timestamp), str(until_timestamp), base_name)
anomalies_dict_key_ttl = 600
try:
REDIS_CONN.setex(anomalies_dict_key, anomalies_dict_key_ttl, str(anomalies_dict))
logger.info('get_mirage_not_anomalous_metrics :: created Redis key - %s with %s TTL' % (
anomalies_dict_key, str(anomalies_dict_key_ttl)))
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed to created Redis key - %s with %s TTL' % (
anomalies_dict_key, str(anomalies_dict_key_ttl)))
if not base_name:
recent_anomalies_dict_key = 'panorama.not_anomalous_dict.recent'
recent_anomalies_dict_key_ttl = 180
try:
REDIS_CONN.setex(recent_anomalies_dict_key, recent_anomalies_dict_key_ttl, str(anomalies_dict))
logger.info('get_mirage_not_anomalous_metrics :: created Redis key - %s with %s TTL' % (
recent_anomalies_dict_key, str(recent_anomalies_dict_key_ttl)))
except:
logger.error(traceback.format_exc())
logger.error('error :: get_mirage_not_anomalous_metrics :: failed to created Redis key - %s with %s TTL' % (
recent_anomalies_dict_key, str(recent_anomalies_dict_key_ttl)))
return not_anomalous_dict, anomalies_dict
# @added 20210328 - Feature #3994: Panorama - mirage not anomalous
[docs]def plot_not_anomalous_metric(not_anomalous_dict, anomalies_dict, plot_type):
"""
Plot the metric not anomalous or anomalies graph and return the file path
:param not_anomalous_dict: the dictionary of not anomalous events for the
metric
:param anomalies_dict: the dictionary of anomalous events for the
metric
:type not_anomalous_dict: dict
:type anomalies_dict: dict
:type plot_type: str ('not_anomalous' or 'anomalies')
:return: path and filename
:rtype: str
"""
fail_msg = None
trace = None
metric = None
from_timestamp = None
until_timestamp = None
try:
metric = list(not_anomalous_dict.keys())[0]
from_timestamp = not_anomalous_dict[metric]['from']
until_timestamp = not_anomalous_dict[metric]['until']
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: failed to get details for plot from not_anomalous_dict'
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
if not metric or not from_timestamp or not until_timestamp or not plot_type:
fail_msg = 'error :: plot_not_anomalous_metric :: failed to get details for plot'
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
try:
timeseries = get_graphite_metric(
skyline_app, metric, from_timestamp, until_timestamp, 'list',
'object')
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: failed to get timeseries from Graphite for details for %s' % metric
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
if plot_type == 'not_anomalous':
data_dict = not_anomalous_dict
if plot_type == 'anomalies':
data_dict = anomalies_dict
plot_timestamps = list(data_dict[metric]['timestamps'].keys())
logger.info('plot_not_anomalous_metric :: building not %s timeseries' % plot_type)
plot_timeseries = []
last_timestamp = None
a_timestamps_done = []
for timestamp, value in timeseries:
anomaly = 0
if not last_timestamp:
last_timestamp = int(timestamp)
plot_timeseries.append([int(timestamp), anomaly])
continue
for a_timestamp in plot_timestamps:
if a_timestamp < last_timestamp:
continue
if a_timestamp > int(timestamp):
continue
if a_timestamp in a_timestamps_done:
continue
if a_timestamp in list(range(last_timestamp, int(timestamp))):
anomaly = 1
a_timestamps_done.append(a_timestamp)
plot_timeseries.append([int(timestamp), anomaly])
logger.info('plot_not_anomalous_metric :: created %s timeseries' % plot_type)
logger.info('plot_not_anomalous_metric :: creating timeseries dataframe')
try:
df = pd.DataFrame(timeseries, columns=['date', 'value'])
df['date'] = pd.to_datetime(df['date'], unit='s')
datetime_index = pd.DatetimeIndex(df['date'].values)
df = df.set_index(datetime_index)
df.drop('date', axis=1, inplace=True)
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: failed create timeseries dataframe to plot %s' % metric
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
logger.info('plot_not_anomalous_metric :: creating %s dataframe' % plot_type)
try:
plot_df = pd.DataFrame(plot_timeseries, columns=['date', 'value'])
plot_df['date'] = pd.to_datetime(plot_df['date'], unit='s')
datetime_index = pd.DatetimeIndex(plot_df['date'].values)
plot_df = plot_df.set_index(datetime_index)
plot_df.drop('date', axis=1, inplace=True)
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: failed create not anomalous dataframe to plot %s' % metric
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
try:
logger.info('plot_not_anomalous_metric :: loading plot from adtk.visualization')
from adtk.visualization import plot
sane_metricname = filesafe_metricname(str(metric))
save_to_file = '%s/panorama/not_anomalous/%s/%s.%s.%s.%s.png' % (
settings.SKYLINE_TMP_DIR, sane_metricname, plot_type,
str(from_timestamp),
str(until_timestamp), sane_metricname)
save_to_path = path.dirname(save_to_file)
if plot_type == 'not_anomalous':
title = 'Not anomalous analysis\n%s' % metric
if plot_type == 'anomalies':
title = 'Anomalies\n%s' % metric
if not path.exists(save_to_path):
try:
mkdir_p(save_to_path)
except Exception as e:
logger.error('error :: plot_not_anomalous_metric :: failed to create dir - %s - %s' % (
save_to_path, e))
if path.exists(save_to_path):
try:
logger.info('plot_not_anomalous_metric :: plotting')
if plot_type == 'not_anomalous':
plot(
df, anomaly=plot_df, anomaly_color='green', title=title,
ts_markersize=1, anomaly_alpha=0.4, legend=False,
save_to_file=save_to_file)
if plot_type == 'anomalies':
plot(
df, anomaly=plot_df, anomaly_color='red', title=title,
ts_markersize=1, anomaly_alpha=1, legend=False,
save_to_file=save_to_file)
logger.debug('debug :: plot_not_anomalous_metric :: plot saved to - %s' % (
save_to_file))
except Exception as e:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: failed to plot - %s: %s' % (str(metric), e)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: plotting %s for %s' % (str(metric), plot_type)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
if not path.isfile(save_to_file):
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: plot_not_anomalous_metric :: plotting %s for %s to %s failed' % (
str(metric), plot_type, save_to_file)
logger.error('%s' % fail_msg)
raise # to webapp to return in the UI
else:
try:
REDIS_CONN.hset('panorama.not_anomalous_plots', time.time(), save_to_file)
logger.info('plot_not_anomalous_metric :: set Redis hash in panorama.not_anomalous_plots for clean up')
except:
logger.error(traceback.format_exc())
logger.error('error :: plot_not_anomalous_metric :: failed to set save_to_file in Redis hash - panorama.not_anomalous_plots')
return save_to_file
# @added 20210617 - Feature #4144: webapp - stale_metrics API endpoint
# Feature #4076: CUSTOM_STALE_PERIOD
# Branch #1444: thunder
[docs]def namespace_stale_metrics(namespace, cluster_data, exclude_sparsely_populated):
"""
Plot the metric not anomalous or anomalies graph and return the file path
:param not_anomalous_dict: the dictionary of not anomalous events for the
metric
:param anomalies_dict: the dictionary of anomalous events for the
metric
:type not_anomalous_dict: dict
:type anomalies_dict: dict
:type plot_type: str ('not_anomalous' or 'anomalies')
:return: path and filename
:rtype: str
"""
fail_msg = None
trace = None
namespaces_namespace_stale_metrics_dict = {}
namespaces_namespace_stale_metrics_dict['stale_metrics'] = {}
unique_base_names = []
try:
REDIS_CONN_DECODED = get_redis_conn_decoded(skyline_app)
unique_base_names = list(REDIS_CONN_DECODED.smembers('aet.analyzer.unique_base_names'))
logger.info('%s namespaces checked for stale metrics discovered with thunder_stale_metrics' % (
str(len(unique_base_names))))
except Exception as e:
fail_msg = 'error :: Webapp error with api?stale_metrics - %s' % e
logger.error(fail_msg)
raise
now = int(time.time())
namespace_stale_metrics_dict = {}
namespace_recovered_metrics_dict = {}
try:
namespace_stale_metrics_dict, namespace_recovered_metrics_dict = thunder_stale_metrics(skyline_app, log=True)
except Exception as e:
fail_msg = 'error :: Webapp error with api?stale_metrics - %s' % e
logger.error(fail_msg)
raise
logger.info('%s namespaces checked for stale metrics discovered with thunder_stale_metrics' % (
str(len(namespace_stale_metrics_dict))))
remote_stale_metrics_dicts = []
if settings.REMOTE_SKYLINE_INSTANCES and cluster_data:
exclude_sparsely_populated_str = 'false'
if exclude_sparsely_populated:
exclude_sparsely_populated_str = 'true'
remote_namespaces_namespace_stale_metrics_dicts = []
stale_metrics_uri = 'stale_metrics=true&namespace=%s&exclude_sparsely_populated=%s' % (
str(namespace), str(exclude_sparsely_populated_str))
try:
remote_namespaces_namespace_stale_metrics_dicts = get_cluster_data(stale_metrics_uri, 'stale_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: Webapp could not get remote_namespaces_namespace_stale_metrics_dict from the remote Skyline instances')
if remote_namespaces_namespace_stale_metrics_dicts:
logger.info('got %s remote namespace_stale_metrics_dicts instances from the remote Skyline instances' % str(len(remote_namespaces_namespace_stale_metrics_dicts)))
remote_stale_metrics_dicts = remote_namespaces_namespace_stale_metrics_dicts
stale_metrics_count = 0
total_metrics_count = len(unique_base_names)
if namespace == 'all':
namespaces_namespace_stale_metrics_dict['stale_metrics']['namespace'] = 'all'
if remote_stale_metrics_dicts:
for remote_stale_metrics_dict in remote_stale_metrics_dicts:
total_metrics_count = total_metrics_count + remote_stale_metrics_dict['total_metrics_count']
namespaces_namespace_stale_metrics_dict['stale_metrics']['total_metrics_count'] = total_metrics_count
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'] = {}
if namespace_stale_metrics_dict:
for parent_namespace in list(namespace_stale_metrics_dict.keys()):
for base_name in list(namespace_stale_metrics_dict[parent_namespace]['metrics'].keys()):
stale_metrics_count += 1
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name] = {}
last_timestamp = namespace_stale_metrics_dict[parent_namespace]['metrics'][base_name]
stale_for = now - int(float(last_timestamp))
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['last_timestamp'] = last_timestamp
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['stale_for'] = stale_for
if remote_stale_metrics_dicts:
for remote_stale_metrics_dict in remote_stale_metrics_dicts:
for base_name in list(remote_stale_metrics_dict['stale_metrics'].keys()):
stale_metrics_count += 1
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name] = {}
last_timestamp = remote_stale_metrics_dict['stale_metrics'][base_name]['last_timestamp']
stale_for = remote_stale_metrics_dict['stale_metrics'][base_name]['stale_for']
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['last_timestamp'] = last_timestamp
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['stale_for'] = stale_for
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics_count'] = stale_metrics_count
if namespace_stale_metrics_dict and namespace != 'all':
namespaces_namespace_stale_metrics_dict['stale_metrics']['namespace'] = namespace
total_metrics_count = len([base_name for base_name in unique_base_names if base_name.startswith(namespace)])
if remote_stale_metrics_dicts:
for remote_stale_metrics_dict in remote_stale_metrics_dicts:
total_metrics_count = total_metrics_count + remote_stale_metrics_dict['total_metrics_count']
namespaces_namespace_stale_metrics_dict['stale_metrics']['total_metrics_count'] = total_metrics_count
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'] = {}
top_level_namespace = namespace.split('.')[0]
if namespace_stale_metrics_dict:
for parent_namespace in list(namespace_stale_metrics_dict.keys()):
if parent_namespace != top_level_namespace:
continue
for base_name in list(namespace_stale_metrics_dict[parent_namespace]['metrics'].keys()):
if not base_name.startswith(namespace):
continue
stale_metrics_count += 1
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name] = {}
last_timestamp = namespace_stale_metrics_dict[parent_namespace]['metrics'][base_name]
stale_for = now - int(float(last_timestamp))
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['last_timestamp'] = last_timestamp
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['stale_for'] = stale_for
if remote_stale_metrics_dicts:
for remote_stale_metrics_dict in remote_stale_metrics_dicts:
for base_name in list(remote_stale_metrics_dict['stale_metrics'].keys()):
stale_metrics_count += 1
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name] = {}
last_timestamp = remote_stale_metrics_dict['stale_metrics'][base_name]['last_timestamp']
stale_for = remote_stale_metrics_dict['stale_metrics'][base_name]['stale_for']
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['last_timestamp'] = last_timestamp
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics'][base_name]['stale_for'] = stale_for
namespaces_namespace_stale_metrics_dict['stale_metrics']['stale_metrics_count'] = stale_metrics_count
return namespaces_namespace_stale_metrics_dict