Source code for webapp.luminosity_pearson_closest

"""
luminosity_pearson_closest.py
"""
import copy
import datetime
import logging
import time
import traceback

from flask import request
import numpy as np
import requests

import settings
from matched_or_regexed_in_list import matched_or_regexed_in_list
from skyline_functions import get_graphite_metric
from functions.luminosity.pearson_closest import pearson_closest
from functions.metrics.get_base_name_from_labelled_metrics_name import get_base_name_from_labelled_metrics_name
from functions.metrics.get_base_names_and_metric_ids import get_base_names_and_metric_ids
from functions.plots.get_pearson_correlation_graphs import get_pearson_correlation_graphs
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.timeseries.downsample import downsample_timeseries
from functions.victoriametrics.get_victoriametrics_metric import get_victoriametrics_metric

skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
try:
    ENABLE_WEBAPP_DEBUG = settings.ENABLE_WEBAPP_DEBUG
except:
    logger.error('error :: cannot determine ENABLE_WEBAPP_DEBUG from settings')
    ENABLE_WEBAPP_DEBUG = False

try:
    full_duration_seconds = int(settings.FULL_DURATION)
except:
    full_duration_seconds = 86400

full_duration_in_hours = full_duration_seconds / 60 / 60

# @added 20240502 - Feature #5026: luminosity - pearson_closest 
#                   Feature #5342: utilities - pearson_closest
[docs] def determine_pearson_closest(current_skyline_app): """ Get pearson correlations :param current_skyline_app: The skyline app :type current_skyline_app: str :return: pearson_correlations :rtype: dict """ function_str = 'determine_pearson_closest' pearson_correlations = {} current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) err = 'None' metric = None try: metric = request.args.get('metric') current_logger.info('%s :: with metric: %s' % (function_str, str(metric))) except Exception as err: err = err metric = None metric_id = None try: metric_id = int(request.args.get('metric_id')) current_logger.info('%s :: with metric_id: %s' % (function_str, str(metric_id))) except Exception as err: err = err metric_id = None if not metric and not metric_id: current_logger.info('%s :: no metric or metric_id argument passed' % ( function_str)) return pearson_correlations err = 'None' namespaces = None try: namespaces_str = request.args.get('namespaces') if len(namespaces_str) > 0: current_logger.info('%s :: with namespaces parameter: %s' % (function_str, str(namespaces_str))) namespaces = namespaces_str.split(',') if namespaces_str == 'all': namespaces = ['.*'] current_logger.info('%s :: with namespaces: %s' % (function_str, str(namespaces))) except Exception as err: current_logger.error('error :: %s request no namespaces argument passed, err: %s' % ( function_str, err)) return 400 if not namespaces: current_logger.error('error :: %s request no namespaces argument passed, err: %s' % ( function_str, err)) return 400 err = 'None' from_timestamp = None try: from_timestamp = request.args.get('from_timestamp') except Exception as err: err = err from_timestamp = None if ':' in from_timestamp: new_from_timestamp = time.mktime(datetime.datetime.strptime(from_timestamp, '%Y-%m-%d %H:%M').timetuple()) from_timestamp = int(new_from_timestamp) try: from_timestamp = int(from_timestamp) except: from_timestamp = None if not from_timestamp: current_logger.error('error :: %s request no from_timestamp argument passed, err: %s' % ( function_str, err)) return 400 err = 'None' until_timestamp = None try: until_timestamp = request.args.get('until_timestamp') except Exception as err: err = err until_timestamp = None if ':' in until_timestamp: new_until_timestamp = time.mktime(datetime.datetime.strptime(until_timestamp, '%Y-%m-%d %H:%M').timetuple()) until_timestamp = int(new_until_timestamp) try: until_timestamp = int(until_timestamp) except: until_timestamp = None if not until_timestamp: current_logger.error('error :: %s request no until_timestamp argument passed, err: %s' % ( function_str, err)) return 400 abs_threshold = 0.5 try: abs_threshold_str = request.args.get('abs_threshold') if abs_threshold_str: abs_threshold = float(abs_threshold_str) except Exception as err: current_logger.error('error :: %s request no valid abs_threshold argument passed, err: %s' % ( function_str, err)) return 400 err = 'None' max_results = 0 try: if 'max_results' in request.args: max_results = int(float(request.args.get('max_results'))) except Exception as err: current_logger.error('error :: %s request invalid max_results argument passed, err: %s' % ( function_str, err)) return 400 plot_graphs = False try: if 'plot_graphs' in request.args: plot_graphs_str = request.args.get('plot_graphs') if plot_graphs_str == 'true': plot_graphs = True except Exception as err: current_logger.error('error :: %s request invalid plot_graphs argument passed, err: %s' % ( function_str, err)) labelled_metric_name = None if metric.startswith('labelled_metrics.'): labelled_metric_name = str(metric) try: base_name = get_base_name_from_labelled_metrics_name(current_skyline_app, labelled_metric_name) if base_name: metric = str(base_name) except Exception as err: current_logger.error('error :: %s :: get_base_name_from_labelled_metrics_name failed for %s, err: %s' % ( function_str, metric, err)) err = 'None' progress_url = None try: progress_url = request.args.get('progress_url') current_logger.info('%s :: with progress_url: %s' % (function_str, str(progress_url))) except Exception as err: err = err progress_url = None labelled_metric_name = None if metric.startswith('labelled_metrics.'): labelled_metric_name = str(metric) try: base_name = get_base_name_from_labelled_metrics_name(current_skyline_app, labelled_metric_name) if base_name: metric = str(base_name) except Exception as err: current_logger.error('error :: %s :: get_base_name_from_labelled_metrics_name failed for %s, err: %s' % ( function_str, metric, err)) base_names_with_ids = {} ids_with_base_names = {} try: base_names_with_ids = get_base_names_and_metric_ids(current_skyline_app) except Exception as err: logger.error('error :: %s :: %s :: get_base_names_and_metric_ids failed - %s' % ( current_skyline_app, function_str, err)) if base_names_with_ids: for base_name in list(base_names_with_ids.keys()): metric_id = int(str(base_names_with_ids[base_name])) ids_with_base_names[metric_id] = base_name base_names = list(base_names_with_ids.keys()) correlate_metrics = [] errs = [] for base_name in base_names: pattern_match = False try: pattern_match, matched_by = matched_or_regexed_in_list(current_skyline_app, base_name, namespaces) except Exception as err: errs.append(['matched_or_regexed_in_list', base_name, err]) if pattern_match: correlate_metrics.append(base_name) current_logger.info('%s :: determined %s metrics to correlate from namespaces: %s' % ( function_str, str(len(correlate_metrics)), str(namespaces))) if len(correlate_metrics) > 20: current_logger.info('%s :: would offload to wind due to the number of metrics to correlate' % function_str) metric_timeseries = [] try: if not labelled_metric_name: metric_timeseries = get_graphite_metric(current_skyline_app, metric, from_timestamp, until_timestamp, 'list', 'object') else: metric_timeseries = get_victoriametrics_metric(current_skyline_app, metric, from_timestamp, until_timestamp, 'list', 'object') except Exception as err: current_logger.error('error :: %s failed to get metric time series, err: %s' % ( function_str, err)) return 400 metric_timeseries_length = len(metric_timeseries) if metric_timeseries_length > 2000: current_logger.info('%s :: would offload to wind because of long time series, metric_timeseries_length: %s' % ( function_str, str(metric_timeseries_length))) metrics_timeseries_use_key = 'timeseries' start_correlations = time.time() metrics_timeseries = {metric: {'timeseries': metric_timeseries}} for base_name in correlate_metrics: metric2_timeseries = [] try: if '{' in base_name and base_name.endswith('}'): metric2_timeseries = get_victoriametrics_metric(current_skyline_app, base_name, from_timestamp, until_timestamp, 'list', 'object') else: metric2_timeseries = get_graphite_metric(current_skyline_app, base_name, from_timestamp, until_timestamp, 'list', 'object') except Exception as err: current_logger.error('error :: %s failed to get metric time series, err: %s' % ( function_str, err)) continue if not metric2_timeseries: continue metrics_timeseries[base_name] = {'timeseries': metric2_timeseries} try: resolution = determine_data_frequency(current_skyline_app, metric_timeseries, True) except Exception as err: current_logger.error(traceback.format_exc()) current_logger.error('error :: %s :: determine_data_frequency failed, err: %s' % ( function_str, err)) downsample = None use_resolution = int(resolution) if resolution < 600 and metric_timeseries_length > 200: downsample = 600 use_resolution = int(downsample) current_logger.info('%s :: downsampling to %s from %s second resolution and %s data points' % ( function_str, str(downsample), str(resolution), str(metric_timeseries_length))) all_aligned_timestamps = [] for i_metric in list(metrics_timeseries.keys()): i_metric_timeseries = metrics_timeseries[i_metric]['timeseries'] aligned_metric_timeseries = [] downsampled_timeseries = [] if downsample: try: downsampled_timeseries = downsample_timeseries(current_skyline_app, i_metric_timeseries, resolution, downsample, 'mean', 'end') except Exception as err: current_logger.error('error :: %s :: downsample_timeseries failed, err: %s' % ( function_str, err)) metrics_timeseries[i_metric]['downsampled_timeseries'] = downsampled_timeseries i_metric_timeseries = list(downsampled_timeseries) for t, v in i_metric_timeseries: aligned_timestamp = int(t // use_resolution * use_resolution) all_aligned_timestamps.append(aligned_timestamp) aligned_metric_timeseries.append([aligned_timestamp, v]) metrics_timeseries[i_metric]['aligned_timeseries'] = aligned_metric_timeseries unique_aligned_timestamps = sorted(list(set(all_aligned_timestamps))) start = unique_aligned_timestamps[0] end = unique_aligned_timestamps[-1] ct = int(start) aligned_timestamps = [] while ct < end: aligned_timestamps.append(ct) ct = ct + use_resolution aligned_timestamps.append(ct) for i_metric in list(metrics_timeseries.keys()): metric_timeseries_dict = {} first_value = None for t, v in metrics_timeseries[i_metric]['aligned_timeseries']: if isinstance(v, float): if not np.isnan(v): first_value = v break last_value = None for t, v in metrics_timeseries[i_metric]['aligned_timeseries']: metric_timeseries_dict[t] = v aligned_filled_timeseries = [] for t in unique_aligned_timestamps: data_v = np.nan try: data_v = float(metric_timeseries_dict[t]) if np.isnan(data_v): if last_value: data_v = last_value else: data_v = first_value last_value = data_v except: if last_value: data_v = last_value else: data_v = first_value aligned_filled_timeseries.append([t, data_v]) try: metrics_timeseries[i_metric]['aligned_filled_timeseries'] = aligned_filled_timeseries if i_metric == metric: metric_timeseries = list(aligned_filled_timeseries) current_logger.debug('%s :: metric: %s, len(aligned_filled_timeseries): %s' % ( function_str, str(i_metric), len(aligned_filled_timeseries))) except Exception as err: current_logger.error('error :: %s :: aligned_filled_timeseries failed on %s, err: %s' % ( function_str, i_metric, err)) metrics_timeseries_use_key = 'aligned_filled_timeseries' to_process_count = len(list(metrics_timeseries.keys())) processed_count = 0 callbacks_made = [] for base_name in list(metrics_timeseries.keys()): processed_count += 1 pearson_cc = None reason = None try: pearson_cc, reason, results = pearson_closest(current_skyline_app, metric, base_name, from_timestamp=from_timestamp, until_timestamp=until_timestamp, downsample=None, datapoints=None, print_debug=False, return_results=True, return_timeseries=False, metric_timeseries=metric_timeseries, metrics_timeseries=metrics_timeseries, metrics_timeseries_use_key=metrics_timeseries_use_key) except Exception as err: current_logger.error('error :: %s :: pearson_closest failed to correlate %s with %s, err: %s' % ( function_str, metric, base_name, err)) reason = err if isinstance(pearson_cc, float): pearson_correlations[base_name] = {'pearson_cc': pearson_cc} if np.isnan(pearson_cc): pearson_correlations[base_name] = {'pearson_cc': None} else: pearson_correlations[base_name] = {'pearson_cc': None, 'reason': reason} pearson_correlations[base_name]['resolution'] = resolution pearson_correlations[base_name]['downsample'] = downsample pearson_correlations[base_name]['used_resolution'] = use_resolution if progress_url: proportion_completed = round(float(processed_count)/float(to_process_count), 1) if proportion_completed in [0.1, 0.2, 0.5, 0.7]: if proportion_completed not in callbacks_made: use_progress_url = '%s?proportion_completed=%s' % ( progress_url, str(proportion_completed)) try: r = requests.get(use_progress_url, timeout=1) callbacks_made.append(proportion_completed) current_logger.info('%s :: updated progress_url: %s, got status code: %s' % ( function_str, str(use_progress_url), str(r.status_code))) except Exception as err: current_logger.error('error :: %s :: request failed, progress_url: %s, err: %s' % ( function_str, use_progress_url, err)) current_logger.info('%s :: %s correlations done, took %2f seconds' % ( function_str, str(len(pearson_correlations)), (time.time() - start_correlations))) sorted_pearson_correlations = {} pearson_ccs = [] pearson_ccs_none = [] if pearson_correlations: metrics = list(pearson_correlations.keys()) for i_metric in metrics: if pearson_correlations[i_metric]['pearson_cc'] is None: if abs_threshold: del pearson_correlations[i_metric] continue pearson_correlations[i_metric]['abs_pearson_cc'] = None pearson_ccs_none.append(i_metric) continue abs_pearson_cc = pearson_correlations[i_metric]['pearson_cc'] if pearson_correlations[i_metric]['pearson_cc'] < 0: abs_pearson_cc = float(pearson_correlations[i_metric]['pearson_cc'] * -1) if abs_threshold: if abs_pearson_cc < abs_threshold: del pearson_correlations[i_metric] continue pearson_ccs.append([i_metric, abs_pearson_cc]) pearson_correlations[i_metric]['abs_pearson_cc'] = abs_pearson_cc sorted_pearson_ccs = sorted(pearson_ccs, key=lambda x: x[1], reverse=True) for i_metric, abs_pearson_cc in sorted_pearson_ccs: sorted_pearson_correlations[i_metric] = dict(pearson_correlations[i_metric]) for i_metric in pearson_ccs_none: sorted_pearson_correlations[i_metric] = dict(pearson_correlations[i_metric]) pearson_correlations = sorted_pearson_correlations max_pearson_correlations = {} if pearson_correlations and max_results: results_added = 0 for i_metric, corr_dict in pearson_correlations.items(): if results_added == max_results: break max_pearson_correlations[i_metric] = pearson_correlations[i_metric] if max_pearson_correlations: pearson_correlations = copy.deepcopy(max_pearson_correlations) if pearson_correlations and plot_graphs: output_dir = '%s/%s/%s/pearson_correlation_graphs' % ( settings.SKYLINE_TMP_DIR, str(until_timestamp), str(metric_id)) try: pearson_correlation_graphs, graph_pearson_correlations = get_pearson_correlation_graphs( skyline_app, metric, until_timestamp, list(pearson_correlations.keys()), output_dir, tz='UTC', metrics_timeseries=metrics_timeseries, metrics_timeseries_use_key=metrics_timeseries_use_key, pearson_correlations=pearson_correlations, abs_threshold=abs_threshold) except Exception as err: logger.info(traceback.format_exc()) logger.error('error :: get_pearson_correlation_graphs failed, err: %s' % err) if graph_pearson_correlations: pearson_correlations = copy.deepcopy(graph_pearson_correlations) if progress_url: use_progress_url = '%s?proportion_completed=1' % progress_url try: r = requests.get(use_progress_url, timeout=1) except Exception as err: current_logger.error('error :: %s :: request failed, progress_url: %s, err: %s' % ( function_str, use_progress_url, err)) return pearson_correlations