Source code for webapp.panorama_plot_anomalies

"""
Plot anomalies for a metric
"""
import logging
import os
import traceback
from time import time

import pandas as pd
from adtk.visualization import plot

import settings

from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
from functions.database.queries.query_anomalies import get_anomalies
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.graphite.get_metrics_timeseries import get_metrics_timeseries
# @added 20220519 - Feature #4326: webapp - panorama_plot_anomalies
# Added matches
from functions.database.queries.get_matches import get_matches

# @added 20220801 - Task #2732: Prometheus to Skyline
#                   Branch #4300: prometheus
from functions.metrics.get_base_name_from_labelled_metrics_name import get_base_name_from_labelled_metrics_name
from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
from functions.victoriametrics.get_victoriametrics_metric import get_victoriametrics_metric

skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)


# @added 20211125 - Feature #4326: webapp - panorama_plot_anomalies
# @modified 20220519 - Feature #4326: webapp - panorama_plot_anomalies
# Added matches
# def panorama_plot_anomalies(base_name, from_timestamp=None, until_timestamp=None):
[docs]def panorama_plot_anomalies( base_name, from_timestamp=None, until_timestamp=None, matches=False): """ Create a plot of the metric with its anomalies and return the anomalies dict, the path and filename and the matches_dict :param base_name: the name of the metric :param from_timestamp: the from timestamp :param until_timestamp: the until timestamp :param matches: whether to plot matches as well :type base_name: str :type from_timestamp: int :type until_timestamp: int :type matches: bool :return: (anomalies_dict, path and file, matches_dict) :rtype: tuple """ function_str = 'panorama_plot_anomalies' logger.info('%s - base_name: %s, from_timestamp: %s, until_timestamp: %s' % ( function_str, str(base_name), str(from_timestamp), str(until_timestamp))) if not until_timestamp: until_timestamp = int(time()) # @added 20221115 - Feature #4326: webapp - panorama_plot_anomalies # Added timeseries_dict timeseries_dict = {} # @added 20220801 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus data_source = 'graphite' metric_id = 0 labelled_metric_base_name = None labelled_metric_name = None use_base_name = str(base_name) if '{' in base_name and '}' in base_name and '_tenant_id="' in base_name: metric_id = 0 try: metric_id = get_metric_id_from_base_name(skyline_app, base_name) except Exception as err: logger.error('error :: panorama_plot_anomalies :: get_metric_id_from_base_name failed with base_name: %s - %s' % (str(base_name), err)) if metric_id: labelled_metric_name = 'labelled_metrics.%s' % str(metric_id) if base_name.startswith('labelled_metrics.'): labelled_metric_name = str(base_name) try: metric_name = get_base_name_from_labelled_metrics_name(skyline_app, base_name) if metric_name: base_name = str(metric_name) data_source = 'victoriametrics' except Exception as err: logger.error('error :: panorama_plot_anomalies :: get_base_name_from_labelled_metrics_name failed for %s - %s' % ( base_name, err)) if labelled_metric_name: use_base_name = str(labelled_metric_name) data_source = 'victoriametrics' save_to_file = '%s/panorama_anomalies_plot.%s.%s.%s.png' % ( settings.SKYLINE_TMP_DIR, use_base_name, str(from_timestamp), str(until_timestamp)) matches_save_to_file = None if matches: matches_save_to_file = '%s/panorama_matches_plot.%s.%s.%s.png' % ( settings.SKYLINE_TMP_DIR, use_base_name, str(from_timestamp), str(until_timestamp)) if not metric_id: try: metric_id = get_metric_id_from_base_name(skyline_app, base_name) logger.info('%s - %s with metric id:%s' % ( function_str, str(base_name), str(metric_id))) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed to determine metric id for %s - %s' % ( function_str, base_name, err)) raise try: anomalies_dict = get_anomalies(skyline_app, metric_id, params={'latest': False}) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed to determine anomalies for %s - %s' % ( function_str, base_name, err)) raise if from_timestamp and anomalies_dict: for anomaly_id in list(anomalies_dict.keys()): if anomalies_dict[anomaly_id]['anomaly_timestamp'] < from_timestamp: del anomalies_dict[anomaly_id] if until_timestamp and anomalies_dict: for anomaly_id in list(anomalies_dict.keys()): if anomalies_dict[anomaly_id]['anomaly_timestamp'] > until_timestamp: del anomalies_dict[anomaly_id] # @added 20220519 - Feature #4326: webapp - panorama_plot_anomalies # Added matches matches_dict = {} if matches: try: matches_dict = get_matches(skyline_app, metric_id, from_timestamp, until_timestamp) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: get_matches failed to determine matches for %s - %s' % ( function_str, base_name, err)) raise if not matches_dict: matches_save_to_file = None if not matches: if os.path.isfile(save_to_file): # @modified 20220519 - Feature #4326: webapp - panorama_plot_anomalies # Added matches # return anomalies_dict, save_to_file return anomalies_dict, save_to_file, matches_dict, matches_save_to_file, labelled_metric_name, timeseries_dict else: if matches_save_to_file: if os.path.isfile(save_to_file) and os.path.isfile(matches_save_to_file): return anomalies_dict, save_to_file, matches_dict, matches_save_to_file, labelled_metric_name, timeseries_dict else: if os.path.isfile(save_to_file): return anomalies_dict, save_to_file, matches_dict, matches_save_to_file, labelled_metric_name, timeseries_dict if not from_timestamp and anomalies_dict: first_anomaly_id = list(anomalies_dict.keys())[-1] first_anomaly_timestamp = anomalies_dict[first_anomaly_id]['anomaly_timestamp'] from_timestamp = first_anomaly_timestamp - (86400 * 7) logger.info('%s :: the from_timestamp was not passed, calculated from the anomalies_dict as %s' % ( function_str, str(from_timestamp))) if not from_timestamp and not anomalies_dict: logger.info('%s :: the from_timestamp was not passed and no anomalies found for %s' % ( function_str, base_name)) from_timestamp = until_timestamp - (86400 * 7) metrics_functions = {} metrics_functions[base_name] = {} if data_source == 'graphite': metrics_functions[base_name]['functions'] = None try: metrics_timeseries = get_metrics_timeseries(skyline_app, metrics_functions, from_timestamp, until_timestamp, log=False) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: get_metrics_timeseries failed - %s' % ( function_str, err)) raise if data_source == 'victoriametrics': logger.info('getting victoriametrics data for %s - from_timestamp - %s, until_timestamp - %s' % (base_name, str(from_timestamp), str(until_timestamp))) metrics_timeseries = {} metrics_timeseries[base_name] = {} timeseries = [] try: # get_victoriametrics_metric automatically applies the rate and # step required no downsampling or nonNegativeDerivative is # required. timeseries = get_victoriametrics_metric( skyline_app, base_name, from_timestamp, until_timestamp, 'list', 'object') except Exception as err: logger.error('error :: %s ::get_victoriametrics_metric failed - %s' % ( function_str, err)) raise if timeseries: metrics_timeseries[base_name]['timeseries'] = timeseries try: timeseries = metrics_timeseries[base_name]['timeseries'] # Truncate the first and last timestamp, just in case they are not # filled buckets timeseries = timeseries[1:-1] except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed to get timeseries for %s - %s' % ( function_str, base_name, err)) raise unaligned_anomaly_timestamps = [] for anomaly_id in list(anomalies_dict.keys()): unaligned_anomaly_timestamps.append(anomalies_dict[anomaly_id]['anomaly_timestamp']) # Align anomalies to timeseries resolution resolution = determine_data_frequency(skyline_app, timeseries, False) anomaly_timestamps = [] for ts in unaligned_anomaly_timestamps: anomaly_timestamps.append(int(int(ts) // resolution * resolution)) if data_source == 'victoriametrics': aligned_timeseries = [] for item in timeseries: aligned_timeseries.append([int(int(item[0]) // resolution * resolution), item[1]]) timeseries = aligned_timeseries # @added 20221115 - Feature #4326: webapp - panorama_plot_anomalies # Added timeseries_dict for ts, v in timeseries: timeseries_dict[int(ts)] = {'value': v} try: df = pd.DataFrame(timeseries, columns=['date', 'value']) df['date'] = pd.to_datetime(df['date'], unit='s') datetime_index = pd.DatetimeIndex(df['date'].values) df = df.set_index(datetime_index) df.drop('date', axis=1, inplace=True) anomalies_data = [] for item in timeseries: if int(item[0]) in anomaly_timestamps: # anomalies_data.append(1) anomaly_value = 1 else: # anomalies_data.append(0) anomaly_value = 0 anomalies_data.append(anomaly_value) timeseries_dict[int(item[0])]['anomaly'] = anomaly_value df['anomalies'] = anomalies_data title = '%s\n%s anomalies' % (base_name, str(len(anomaly_timestamps))) if labelled_metric_name: title = '%s\n%s anomalies' % (labelled_metric_name, str(len(anomaly_timestamps))) plot(df['value'], anomaly=df['anomalies'], anomaly_markersize=5, anomaly_tag='marker', anomaly_color='red', title=title, save_to_file=save_to_file) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed to plot anomalies for %s - %s' % ( function_str, base_name, err)) raise # @added 20220519 - Feature #4326: webapp - panorama_plot_anomalies # Added matches if matches and matches_dict: unaligned_matches_timestamps = [] try: for match_id in list(matches_dict.keys()): try: unaligned_matches_timestamps.append(matches_dict[match_id]['metric_timestamp']) except KeyError: # ionosphere_layers_matched use anomaly_timestamp not metric_timestamp unaligned_matches_timestamps.append(matches_dict[match_id]['anomaly_timestamp']) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed on matches_dict: %s' % ( function_str, str(matches_dict))) raise matches_timestamps = [] for ts in unaligned_matches_timestamps: matches_timestamps.append(int(int(ts) // resolution * resolution)) try: df = pd.DataFrame(timeseries, columns=['date', 'value']) df['date'] = pd.to_datetime(df['date'], unit='s') datetime_index = pd.DatetimeIndex(df['date'].values) df = df.set_index(datetime_index) df.drop('date', axis=1, inplace=True) matches_data = [] for item in timeseries: if int(item[0]) in matches_timestamps: # matches_data.append(1) matches_value = 1 else: # matches_data.append(0) matches_value = 0 matches_data.append(matches_value) timeseries_dict[int(item[0])]['match'] = matches_value df['matches'] = matches_data title = '%s\n%s matches' % (base_name, str(len(matches_timestamps))) if labelled_metric_name: title = '%s\n%s matches' % (labelled_metric_name, str(len(matches_timestamps))) plot(df['value'], anomaly=df['matches'], anomaly_markersize=5, anomaly_tag='marker', anomaly_color='green', title=title, save_to_file=matches_save_to_file) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: %s :: failed to plot matches for %s - %s' % ( function_str, base_name, err)) raise if not os.path.isfile(save_to_file): # @modified 20221115 - Feature #4326: webapp - panorama_plot_anomalies # Added timeseries_dict return anomalies_dict, None, matches_dict, matches_save_to_file, labelled_metric_name, timeseries_dict return anomalies_dict, save_to_file, matches_dict, matches_save_to_file, labelled_metric_name, timeseries_dict