Source code for webapp.get_snab_results

import logging
import os
import time
import datetime
import traceback

# @added 20230724 - Feature #5010: snab - save training_data
from ast import literal_eval

import settings
import skyline_version

# @modified 20230724 - Feature #5010: snab - save training_data
# Added Table and MetaData
from sqlalchemy import select, Table, MetaData

from database import get_engine, snab_table_meta, anomalies_table_meta
from matched_or_regexed_in_list import matched_or_regexed_in_list
# @modified 20241026 - Task #5521: webapp - update to bootstrap-5.3.3
#                      Feature #5008: webapp - snab report page
# Swap to use the database get_all_db_metric_names method rather than the Redis
# based get_metric_ids_and_base_names method which only deals active metric ids
#from functions.metrics.get_metric_ids_and_base_names import get_metric_ids_and_base_names
from functions.plots.plot_anomalies import plot_anomalies

# @modified 20230724 - Feature #5010: snab - save training_data
# Added get_redis_conn_decoded
from skyline_functions import get_graphite_metric, get_redis_conn_decoded

from create_matplotlib_graph import create_matplotlib_graph

# @added 20230713 - Feature #4994: custom_algorithm - mirages
#                   Feature #4988: Allow snab to return and save results
#                   Task #2732: Prometheus to Skyline
#                   Branch #4300: prometheus
# Handle labelled_metrics
from functions.victoriametrics.get_victoriametrics_metric import get_victoriametrics_metric

# @added 20230724 - Feature #5010: snab - save training_data
from functions.database.queries.get_algorithms import get_algorithms
from functions.database.queries.get_algorithm_groups import get_algorithm_groups

# @added 20241026 - Task #5521: webapp - update to bootstrap-5.3.3
#                   Feature #5008: webapp - snab report page
from functions.database.queries.get_all_db_metric_names import get_all_db_metric_names

skyline_version = skyline_version.__absolute_version__
skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
try:
    ENABLE_WEBAPP_DEBUG = settings.ENABLE_WEBAPP_DEBUG
except EnvironmentError as err:
    logger.error('error :: cannot determine ENABLE_WEBAPP_DEBUG from settings - %s' % str(err))
    ENABLE_WEBAPP_DEBUG = False


[docs] def get_snab_engine(): try: engine, fail_msg, trace = get_engine(skyline_app) return engine, fail_msg, trace except: trace = traceback.format_exc() logger.error('%s' % trace) fail_msg = 'error :: get_snab_results :: failed to get MySQL engine for snab table' logger.error('%s' % fail_msg) # return None, fail_msg, trace raise # to webapp to return in the UI
[docs] def snab_engine_disposal(engine): if engine: try: engine.dispose() except: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: calling engine.dispose()') return
# @added 20211102 - Branch #3068: SNAB
[docs] def get_snab_results(filter_on): """ Update the relevant field in the snab table. :param filter_on: the filter_on dictionary :param algorithms: the algorithms dictionary :param algorithm_groups: the algorithm_groups dictionary :type filter_on: dict :type algorithms: dict :type algorithm_groups: dict :return: results_data :rtype: dict """ results_data = {} logger.info('get_snab_results :: filter_on: %s' % (str(filter_on))) namespaces = [] if filter_on['namespaces']: namespaces = filter_on['namespaces'] algorithm = None algorithm_id = 0 if filter_on['algorithm_id']: algorithm_id = filter_on['algorithm_id'] algorithm = filter_on['algorithm'] algorithm_group = None algorithm_group_id = 0 if filter_on['algorithm_group_id']: algorithm_group_id = filter_on['algorithm_group_id'] algorithm_group = filter_on['algorithm_group'] from_timestamp = filter_on['from_timestamp'] until_timestamp = filter_on['until_timestamp'] result = None if filter_on['result']: result = filter_on['result'] plot = filter_on['plot'] logger.info('get_snab_results :: getting MySQL engine') try: engine, fail_msg, trace = get_snab_engine() logger.info(fail_msg) except Exception as err: trace = traceback.format_exc() logger.error(trace) logger.error('%s' % fail_msg) logger.error('error :: get_snab_results :: could not get a MySQL engine to get snab table - %s' % str(err)) raise # to webapp to return in the UI try: anomalies_table, fail_msg, trace = anomalies_table_meta(skyline_app, engine) if fail_msg != 'anomalies_table meta reflected OK': logger.error('error :: get_snab_results :: could not get a MySQL engine fail_msg - %s' % str(fail_msg)) if trace != 'none': logger.error('error :: get_snab_results :: could not get a MySQL engine trace - %s' % str(trace)) except Exception as err: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_snab_results :: anomalies_table_meta - %s' % str(err) logger.error('%s' % fail_msg) if engine: snab_engine_disposal(engine) raise # to webapp to return in the UI # Determine start and end anomaly_ids start_anomaly_id = 0 try: connection = engine.connect() stmt = select([anomalies_table]).where(anomalies_table.c.anomaly_timestamp >= int(from_timestamp)).order_by(anomalies_table.c.id.asc()).limit(1) results = connection.execute(stmt) for row in results: start_anomaly_id = row['id'] break connection.close() except Exception as err: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_snab_results :: failed to determine start_anomaly_id - %s' % str(err) logger.error('%s' % fail_msg) if engine: try: connection.close() except: pass snab_engine_disposal(engine) raise # to webapp to return in the UI logger.info('get_snab_results :: starting from anomaly_id: %s' % str(start_anomaly_id)) end_anomaly_id = 0 try: connection = engine.connect() stmt = select([anomalies_table]).where(anomalies_table.c.anomaly_timestamp <= int(until_timestamp)).order_by(anomalies_table.c.id.desc()).limit(1) results = connection.execute(stmt) for row in results: end_anomaly_id = row['id'] break connection.close() except Exception as err: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_snab_results :: failed to determine end_anomaly_id - %s' % str(err) logger.error('%s' % fail_msg) if engine: try: connection.close() except: pass snab_engine_disposal(engine) raise # to webapp to return in the UI logger.info('get_snab_results :: ending with anomaly_id: %s' % str(end_anomaly_id)) try: snab_table, log_msg, trace = snab_table_meta(skyline_app, engine) logger.info(log_msg) logger.info('get_snab_results :: snab_table OK') except: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to get snab_table meta') if engine: snab_engine_disposal(engine) raise # to webapp to return in the UI all_results = {} try: connection = engine.connect() stmt = select([snab_table]).\ where(snab_table.c.anomaly_id >= start_anomaly_id).\ where(snab_table.c.anomaly_id <= end_anomaly_id) if from_timestamp: stmt = select([snab_table]).\ where(snab_table.c.anomaly_id >= start_anomaly_id).\ where(snab_table.c.anomaly_id <= end_anomaly_id).\ where(snab_table.c.snab_timestamp >= int(from_timestamp)) if until_timestamp: stmt = select([snab_table]).\ where(snab_table.c.anomaly_id >= start_anomaly_id).\ where(snab_table.c.anomaly_id <= end_anomaly_id).\ where(snab_table.c.snab_timestamp <= int(until_timestamp)) if from_timestamp and until_timestamp: stmt = select([snab_table]).\ where(snab_table.c.anomaly_id >= start_anomaly_id).\ where(snab_table.c.anomaly_id <= end_anomaly_id).\ where(snab_table.c.snab_timestamp >= int(from_timestamp)).\ where(snab_table.c.snab_timestamp <= int(until_timestamp)) results = connection.execute(stmt) for row in results: snab_id = row['id'] all_results[snab_id] = dict(row) connection.close() except Exception as err: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_snab_results :: could not determine all_results - %s' % str(err) logger.error(fail_msg) if engine: try: connection.close() except: pass snab_engine_disposal(engine) raise logger.info('get_snab_results :: determined %s SNAB results before filtering' % ( str(len(all_results)))) # @added 20241026 - Task #5521: webapp - update to bootstrap-5.3.3 # Feature #5008: webapp - snab report page # Map ALL metric ids metric_names_with_ids = {} metric_ids_and_base_names = {} try: # @modified 20241026 - Task #5521: webapp - update to bootstrap-5.3.3 # Feature #5008: webapp - snab report page # The get_metric_ids_and_base_names method gets the current active # metrics from the Redis hash, if metrics have been removed there will # be KeyError errors for their ids, use the get_all_db_metric_names # method to ensure ALL metric ids can be determined # metric_ids_and_base_names = get_metric_ids_and_base_names(skyline_app) with_ids = True metric_names, metric_names_with_ids = get_all_db_metric_names(skyline_app, with_ids) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to get Redis key %s - %s' % str(err)) metric_ids_and_base_names = {} # @added 20241026 - Task #5521: webapp - update to bootstrap-5.3.3 # Feature #5008: webapp - snab report page # Map ALL metric ids metric_ids_and_base_names = {} for i_base_name, mid in metric_names_with_ids.items(): metric_ids_and_base_names[mid] = i_base_name # Filter by result results_by_result = {} if result and result != 'all': for snab_id in list(all_results.keys()): if all_results[snab_id][result] == 1: results_by_result[snab_id] = all_results[snab_id] if results_by_result: all_results = dict(results_by_result) # Filter by algorithm results_by_algorithm = {} if algorithm_id: for snab_id in list(all_results.keys()): if all_results[snab_id]['algorithm_id'] == algorithm_id: results_by_algorithm[snab_id] = all_results[snab_id] all_results[snab_id]['algorithm'] = algorithm if results_by_algorithm: all_results = dict(results_by_algorithm) # Filter by algorithm_group results_by_algorithm_group = {} if algorithm_group_id: for snab_id in list(all_results.keys()): if all_results[snab_id]['algorithm_group_id'] == algorithm_group_id: results_by_algorithm_group[snab_id] = all_results[snab_id] all_results[snab_id]['algorithm_group'] = algorithm_group if results_by_algorithm_group: all_results = dict(results_by_algorithm_group) # Filter by namespaces results_by_namespaces = {} filtered_metrics = {} if namespaces: for metric_id in list(metric_ids_and_base_names.keys()): base_name = metric_ids_and_base_names[metric_id] try: pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, namespaces) if pattern_match: filtered_metrics[metric_id] = base_name except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: matched_or_regexed_in_list failed - %s' % str(err)) anomaly_ids = [] anomalies = {} if not filtered_metrics: check_anomaly_ids = [] for snab_id in list(all_results.keys()): check_anomaly_ids.append(all_results[snab_id]['anomaly_id']) check_metric_ids = [] try: connection = engine.connect() stmt = select([anomalies_table], anomalies_table.c.id.in_(check_anomaly_ids)) results = connection.execute(stmt) for row in results: anomaly_id = row['id'] metric_id = row['metric_id'] check_metric_ids.append(metric_id) anomaly_ids.append(anomaly_id) anomalies[anomaly_id] = dict(row) connection.close() except Exception as err: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_snab_results :: failed to build anomaly_ids - %s' % str(err) logger.error('%s' % fail_msg) if engine: try: connection.close() except: pass snab_engine_disposal(engine) raise # to webapp to return in the UI for metric_id in check_metric_ids: # @modified 20241026 - Task #5521: webapp - update to bootstrap-5.3.3 # Added try except try: base_name = metric_ids_and_base_names[metric_id] if base_name: filtered_metrics[metric_id] = base_name except KeyError: continue # Now determine what anomaly ids exist for the filtered metrics in the # all results metric_ids = list(filtered_metrics.keys()) if metric_ids and not anomalies: try: connection = engine.connect() stmt = select([anomalies_table], anomalies_table.c.metric_id.in_(metric_ids)) results = connection.execute(stmt) for row in results: anomaly_id = row['id'] anomaly_ids.append(anomaly_id) anomalies[anomaly_id] = dict(row) connection.close() except Exception as err: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_snab_results :: failed to build anomaly_ids - %s' % str(err) logger.error('%s' % fail_msg) if engine: try: connection.close() except: pass snab_engine_disposal(engine) raise # to webapp to return in the UI if anomaly_ids: for snab_id in list(all_results.keys()): if all_results[snab_id]['anomaly_id'] in anomaly_ids: results_by_namespaces[snab_id] = all_results[snab_id] if results_by_namespaces: all_results = dict(results_by_namespaces) all_results_errors = [] if all_results: for snab_id in list(all_results.keys()): try: anomaly_id = all_results[snab_id]['anomaly_id'] metric_id = anomalies[anomaly_id]['metric_id'] # @added 20241026 - Task #5521: webapp - update to bootstrap-5.3.3 if metric_id not in metric_ids_and_base_names.keys(): all_results_errors.append({'base_name not found for metric_id': metric_id, 'snab_id': snab_id, 'anomaly_id': anomaly_id}) continue all_results[snab_id]['metric'] = metric_ids_and_base_names[metric_id] # @added 20230724 - add training_data page link labelled_metric_name = None if '_tenant_id="' in metric_ids_and_base_names[metric_id]: labelled_metric_name = 'labelled_metrics.%s' % str(metric_id) all_results[snab_id]['labelled_metric'] = labelled_metric_name except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to determine base_name - %s' % str(err)) all_results_errors.append({'err': err, 'snab_id': snab_id}) results_data = dict(all_results) del all_results if len(all_results_errors) > 0: logger.error('error :: get_snab_results :: %s errors reported with all_results data, all_results_errors[\'0\']: %s' % ( str(len(all_results_errors)), str(all_results_errors[0]))) logger.info('get_snab_results :: determined %s SNAB results' % str(len(results_data))) # @added 20230724 - Feature #5010: snab - save training_data # Added training_data page link to SNAB results table try: redis_conn_decoded = get_redis_conn_decoded(skyline_app) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: get_redis_conn_decoded failed - %s' % ( err)) training_data_raw = [] try: training_data_raw = list(redis_conn_decoded.smembers('ionosphere.training_data')) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: smembers failed on ionosphere.training_data - %s' % ( err)) ionosphere_training_data_dict = {} for training_data_str in training_data_raw: try: training_data_item = literal_eval(training_data_str) metric = training_data_item[0] try: ionosphere_training_data_dict[metric].append(training_data_item[1]) except: ionosphere_training_data_dict[metric] = [] ionosphere_training_data_dict[metric].append(training_data_item[1]) except Exception as err: logger.error('error :: get_snab_results :: failed to interpolate - %s - %s' % ( str(training_data_str), err)) metrics_with_training_data = list(ionosphere_training_data_dict.keys()) algorithms = {} all_algorithms_by_id = {} try: algorithms, all_algorithms_by_id = get_algorithms(skyline_app, return_all_algorithms_by_id=True) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to get algorithms - %s' % str(err)) algorithm_groups = {} all_algorithm_groups_by_id = {} try: algorithm_groups, all_algorithm_groups_by_id = get_algorithm_groups(skyline_app, return_all_algorithm_groups_by_id=True) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to get algorithm_groups - %s' % str(err)) try: use_table_meta = MetaData() apps_table = Table('apps', use_table_meta, autoload=True, autoload_with=engine) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: use_table Table failed on apps table - %s' % ( err)) apps = {} try: connection = engine.connect() stmt = select(apps_table) result = connection.execute(stmt) for row in result: app_id = row['id'] apps[app_id] = row['app'] connection.close() except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to build apps - %s' % str(err)) results_errors = [] for snab_id in list(results_data.keys()): training_data_uri = None try: anomaly_timestamp = anomalies[anomaly_id]['anomaly_timestamp'] except KeyError as err: results_errors.append({'anomalies error': err, 'anomaly_id': anomaly_id}) continue try: metric = results_data[snab_id]['metric'] anomaly_id = results_data[snab_id]['anomaly_id'] labelled_metric_name = results_data[snab_id]['labelled_metric'] except KeyError as err: results_errors.append({'results_data error': err, 'snab_id': snab_id}) continue use_metric = None if metric in metrics_with_training_data: use_metric = metric if labelled_metric_name in metrics_with_training_data: use_metric = labelled_metric_name if use_metric: if anomaly_timestamp in ionosphere_training_data_dict[use_metric]: metric_timeseries_dir = use_metric.replace('.', '/') metric_training_data_dir = '%s/%s/%s' % ( settings.IONOSPHERE_DATA_FOLDER, str(anomaly_timestamp), metric_timeseries_dir) if os.path.exists(metric_training_data_dir): training_data_found = True training_data_uri = '/ionosphere?timestamp=%s&metric=%s&requested_timestamp=%s' % ( str(anomaly_timestamp), use_metric, str(anomaly_timestamp)) if not training_data_found: metric_training_data_dir = '%s_saved/%s/%s' % ( settings.IONOSPHERE_DATA_FOLDER, str(anomaly_timestamp), metric_timeseries_dir) if os.path.exists(metric_training_data_dir): training_data_uri = '/ionosphere?saved_training_data=true&timestamp=%s&metric=%s&requested_timestamp=%s' % ( str(anomaly_timestamp), use_metric, str(anomaly_timestamp)) results_data[snab_id]['training_data_uri'] = training_data_uri try: algorithm_id = int(results_data[snab_id]['algorithm_id']) except: algorithm_id = None algorithm_name = None if algorithm_id: try: algorithm_name = str(all_algorithms_by_id[algorithm_id]) except Exception as err: logger.warning('warning :: get_snab_results :: failed find algorithm in all_algorithms_by_id for algorithm_id: %s - %s' % ( str(algorithm_id), str(err))) # Change algorithm_id key to algorithm and maintain order results_data[snab_id] = {'algorithm' if k == 'algorithm_id' else k:v for k, v in results_data[snab_id].items()} results_data[snab_id]['algorithm'] = algorithm_name app_id = None try: app_id = int(apps[results_data[snab_id]['app_id']]) except: app_id = None app = None try: app = str(apps[results_data[snab_id]['app_id']]) except Exception as err: logger.warning('warning :: get_snab_results :: failed find app in apps for app_id: %s - %s' % ( str(algorithm_id), str(err))) # Change app_id key to app and maintain order results_data[snab_id] = {'app' if k == 'app_id' else k:v for k, v in results_data[snab_id].items()} results_data[snab_id]['app'] = app results_data[snab_id]['algorithm_id'] = algorithm_id try: algorithm_group_id = str(results_data[snab_id]['algorithm_group_id']) del results_data[snab_id]['algorithm_group_id'] results_data[snab_id]['algorithm_group_id'] = algorithm_group_id except: algorithm_group_id = None if len(results_errors) > 0: logger.error('error :: get_snab_results :: %s errors reported with results_data, results_errors[\'0\']: %s' % ( str(len(results_errors)), str(results_errors[0]))) for snab_id in list(results_data.keys()): results_data[snab_id]['plot'] = None if not plot: continue timeseries = [] try: anomaly_id = results_data[snab_id]['anomaly_id'] metric = results_data[snab_id]['metric'] except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: failed to determine anomaly_id and/or metric for snab_id: %s, err: %s' % ( str(snab_id), str(err))) continue graph_file = '%s/snab.%s.%s.%s.png' % ( settings.SKYLINE_TMP_DIR, str(snab_id), metric, str(result)) if os.path.isfile(graph_file): created_graph = True output_file = str(graph_file) results_data[snab_id]['plot'] = output_file continue try: anomaly_id = results_data[snab_id]['anomaly_id'] metric = results_data[snab_id]['metric'] try: algorithm_group = results_data[snab_id]['algorithm_group'] except KeyError: algorithm_group = None anomaly_timestamp = anomalies[anomaly_id]['anomaly_timestamp'] full_duration = anomalies[anomaly_id]['full_duration'] graphite_from_timestamp = anomaly_timestamp - full_duration graphite_until_timestamp = anomaly_timestamp + 14400 # @modified 20230713 - Feature #4994: custom_algorithm - mirages # Feature #4988: Allow snab to return and save results # Task #2732: Prometheus to Skyline # Branch #4300: prometheus # Handle labelled_metrics # timeseries = get_graphite_metric(skyline_app, metric, graphite_from_timestamp, graphite_until_timestamp, 'list', 'object') if metric.startswith('labelled_metrics.') or '_tenant_id="' in metric: timeseries = get_victoriametrics_metric(skyline_app, metric, from_timestamp, until_timestamp, 'list', 'object') else: timeseries = get_graphite_metric(skyline_app, metric, graphite_from_timestamp, graphite_until_timestamp, 'list', 'object') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: get_graphite_metric failed to get timeseries - %s' % str(err)) output_file = None if timeseries: try: graph_title = '%s\nSNAB id %s - %s - %s' % (metric, str(snab_id), algorithm_group, str(result)) created_graph, output_file = create_matplotlib_graph(skyline_app, graph_file, graph_title, timeseries, [anomaly_timestamp]) # output_file = plot_anomalies(skyline_app, metric, timeseries, [anomaly_timestamp], graph_title, graph_file) except: logger.error(traceback.format_exc()) logger.error('error :: get_snab_results :: plot_anomalies failed for %s snab id %s' % (metric, str(snab_id))) if output_file: results_data[snab_id]['plot'] = output_file if connection: try: connection.close() except: pass if engine: snab_engine_disposal(engine) return results_data