Source code for custom_algorithms.low_variance_anomalous_peak_trough

"""
low_variance_anomalous_peak_trough.py
"""
# REQUIRED Skyline imports.  All custom algorithms MUST have the following two
# imports.  These are required for exception handling and to record algorithm
# errors regardless of debug_logging setting for the custom_algorithm
import logging
import traceback
from time import time
from custom_algorithms import record_algorithm_error

import copy

# Import ALL modules that the custom algorithm requires.  Remember that if a
# requirement is not one that is provided by the Skyline requirements.txt you
# must ensure it is installed in the Skyline virtualenv
import numpy as np
from scipy.signal import find_peaks

from functions.timeseries_predictions.fft_extrapolation import fft_extrapolation

# The name of the function MUST be the same as the name declared in
# settings.CUSTOM_ALGORITHMS.
# It MUST have 3 parameters:
# current_skyline_app, timeseries, algorithm_parameters
# See https://earthgecko-skyline.readthedocs.io/en/latest/algorithms/custom-algorithms.html
# for a full explanation about each.
# ALWAYS WRAP YOUR ALGORITHM IN try and except


# @added 20230717 - Feature #4994: custom_algorithm - mirages
[docs] def low_variance_anomalous_peak_trough(current_skyline_app, parent_pid, timeseries, algorithm_parameters): """ A time series with low variance or few peaks/troughs is anomalous if the data point is > 3sigma of the peaks/troughs: peak_values_mean + (3 * peak_values_stdDev). trough_values_mean - (3 * trough_values_stdDev). This algorithm is ONLY suited to assessing the last datapoints and is NOT suited to an anomaly_window > 10 :param current_skyline_app: the Skyline app executing the algorithm. This will be passed to the algorithm by Skyline. This is **required** for error handling and logging. You do not have to worry about handling the argument in the scope of the custom algorithm itself, but the algorithm must accept it as the first agrument. :param parent_pid: the parent pid which is executing the algorithm, this is **required** for error handling and logging. You do not have to worry about handling this argument in the scope of algorithm, but the algorithm must accept it as the second argument. :param timeseries: the time series as a list e.g. ``[[1690920000, 0.0], ..., [1691524200.0, 0.0], [1691524800.0, 0.5]]`` :param algorithm_parameters: { 'currently_anomalous': False, # whether instance state is anomalous or not, default False 'anomaly_window': 4, # should be > 1 <= 10, default 4 'return_results': False, # whether to return the result dict, default False 'debug_logging': False, # whether to log, default False } :param algorithm_parameters: a dictionary of any required parameters for the custom_algorithm and algorithm itself. For the low_variance_anomalous_peak_trough custom algorithm no specific algorithm_parameters are required apart from an empty dict but the algorithm_parameters that can be passed are: - ``'anomaly_window'`` (int): The anomaly_window value. This specifies how many of the last data points should be considered when determining if the metric is anomalous. Only the last ``anomaly_window`` data points in the time series will be used to determine if the metric is anomalous. Default is ``1``. - ``'currently_anomalous'`` (bool): Optional. Whether the instance state is anomalous or not. Default is ``False``. - ``'return_results'`` (bool): Optional. If ``True``, returns the results dict in addition to anomalous and anomalyScore. Default is ``False``. - ``'debug_logging'`` (bool): Optional. If ``True``, enables debug logging. - ``'debug_print'`` (bool): Optional. If ``True``, enables debug printing (for Jupyter testing). Default is ``False``. Example usage: algorithm_parameters={ 'anomaly_window': 4, 'currently_anomalous': True, 'debug_logging': True, 'return_results': True, } :type current_skyline_app: str :type parent_pid: int :type timeseries: list :type algorithm_parameters: dict :return: anomalous, anomalyScore, results :rtype: tuple(boolean, float, dict) """ # You MUST define the algorithm_name algorithm_name = 'low_variance_anomalous_peak_trough' # If you wanted to log, you can but this should only be done during # testing and development def get_log(current_skyline_app): current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) return current_logger # Define the default state of None and None, anomalous does not default to # False as that is not correct, False is only correct if the algorithm # determines the data point is not anomalous. The same is true for the # anomalyScore. anomalous = False anomalyScore = 0.0 anomalies = {} anomalyScore_list = [] results = { 'algorithm': algorithm_name, 'anomalous': anomalous, 'anomalies': anomalies, 'anomalyScore_list': anomalyScore_list, 'analysed': False, } currently_anomalous = False try: currently_anomalous = algorithm_parameters['currently_anomalous'] except: currently_anomalous = False anomalous = currently_anomalous results['anomalous'] = anomalous if anomalous: results['anomalyScore'] = 1.0 anomaly_window = 4 try: anomaly_window = int(algorithm_parameters['anomaly_window']) except: anomaly_window = 4 results['anomaly_window'] = anomaly_window # @added 20230925 - variable sigma added # This allows for the sigma value used in low_variance_anomalous_peak_trough # be passed and reduced if certain criteria are met sigma = 3 try: sigma = algorithm_parameters['sigma'] except: sigma = 3 results['sigma'] = sigma return_results = False try: return_results = algorithm_parameters['return_results'] except: return_results = False # Use the algorithm_parameters to determine whether to log debug_logging = False try: debug_logging = algorithm_parameters['debug_logging'] except: debug_logging = False if debug_logging: try: current_logger = get_log(current_skyline_app) current_logger.debug('debug :: %s :: debug_logging enabled with algorithm_parameters - %s' % ( algorithm_name, str(algorithm_parameters))) except: # This except pattern MUST be used in ALL custom algortihms to # facilitate the traceback from any errors. The algorithm we want to # run super fast and without spamming the log with lots of errors. # But we do not want the function returning and not reporting # anything to the log, so the pythonic except is used to "sample" any # algorithm errors to a tmp file and report once per run rather than # spewing tons of errors into the log e.g. analyzer.log record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback.format_exc()) # Return None and None as the algorithm could not determine True or False if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore print_debug = False try: print_debug = algorithm_parameters['print_debug'] except: print_debug = False try: start = time() analyse_peaks_only = False number_of_peaks = 0 number_of_troughs = 0 anomalous = True last_index = len(timeseries) - 1 results['timestamp'] = timeseries[-1][0] results['datapoint'] = timeseries[-1][1] results['index'] = last_index # @added 20230925 - Feature #4994: custom_algorithm - mirages # Handle mostly positive strictly increasing monotonity on metrics. # There are metrics that can exhibit > 98% positive strictly increasing # monotonocity over the period. Metrics such as things like memory_usage # and memory_rss can slowly increment positively and over a 7 day period # and only experience the smallest of reductions in the values such as # {85: {'diff': 0.00012254714900006292, 'value': 3.267296552658}, # 86: {'diff': -0.00011038780199967846, 'value': 3.267419099807}, # 87: {'diff': 0.0, 'value': 3.267308712005}, # 88: {'diff': -0.00012254715000015182, 'value': 3.267308712005}, # 89: {'diff': 0.0, 'value': 3.267186164855}, # 90: {'diff': 0.0006616115570001746, 'value': 3.267186164855}} # resulting in data such as this: # {'analysis_type': 'peak', 'number_of_peaks': 7, 'peak_index': 1007, # 'peak_value': 3.363199234008, # 'peaks': [ # [1007, 3.363199234008], [912, 3.302538871765], [854, 3.302367448806], # [834, 3.302097797393], [767, 3.301387310028], [226, 3.2808201313], # [86, 3.267419099807]], # 'sorted_peak_values_list': [ # [1007, 3.363199234008], [912, 3.302538871765], [854, 3.302367448806], # [834, 3.302097797393], [767, 3.301387310028], [226, 3.2808201313], # [86, 3.267419099807]], # 'anomalous_peak': False, 'anomalyScore': 0.0, # 'peak_values_mean': 3.3028328418724278, 'peak_values_median': 3.302097797393, # 'peak_values_stdDev': 0.027737871777249984, 'mean_threshold': 3.3860464572041775, # 'median_threshold': 3.3853114127247497, 'all_peak_values_mean': 3.3028328418724278, # 'all_peak_values_median': 3.302097797393, 'all_peak_values_stdDev': 0.027737871777249984, # 'all_peak_median_threshold': 3.3853114127247497, 'peak_outliers': [], # 'outlier_in_peak_values': True, 'threshold_on': 'all_peak_median', # 'threshold (all_peak_values_median + (3 * all_peak_values_stdDev))': 3.3853114127247497, # 'threshold': 3.3853114127247497, # 'peak_values': [ # 3.267419099807, 3.2808201313, 3.301387310028, 3.302097797393, # 3.302367448806, 3.302538871765, 3.363199234008]} # # As per https://github.com/earthgecko/skyline/blob/master/docs/images/mostly.monotonic.lvapt.png # Running low_variance_anomalous_peak_trough on this type of data can # and will result in fNs. Although a preprocessing stage could be added # to identify these types of data and convert to the derivative format, # excluding very small dips, for analysis if the mass majority of the # diffs are 0 or positive/negative, it is too contrived. There is no # guarentee that preprocessing the data will have the desired results in # all circumstances, even if the decreases are less than the changes # mean and are less the 2% of the changes. # Therefore if data like this is identified just skip analysis on this # data and return what was passed as currently_anomalous # results['timings'] = {} near_monotonic = False start_near_mono_check = time() np_values = np.asarray([v for t, v in timeseries]) increases_and_zeros = [[index, diff] for index, diff in enumerate(list(np.diff(np_values))) if diff >= 0] decreases_and_zeros = [[index, diff] for index, diff in enumerate(list(np.diff(np_values))) if diff <= 0] check_monotonicity = False if ((len(increases_and_zeros) / len(np_values)) * 100) >= 95.0: check_monotonicity = True if ((len(decreases_and_zeros) / len(np_values)) * 100) >= 95.0: check_monotonicity = True if check_monotonicity: decreases = [[index, diff] for index, diff in enumerate(list(np.diff(np_values))) if diff < 0] increases = [[index, diff] for index, diff in enumerate(list(np.diff(np_values))) if diff > 0] decrease_sum = sum([v for index, v in decreases]) increase_sum = sum([v for index, v in increases]) total_change = increase_sum + (decrease_sum * -1) perc_increasing = (increase_sum / total_change) * 100 perc_decreasing = ((decrease_sum * -1) / total_change) * 100 if perc_increasing >= 98.0 and perc_decreasing < 2.0: near_monotonic = True monotonicity = 'increasing' if not near_monotonic: if perc_decreasing >= 98.0 and perc_increasing < 2.0: near_monotonic = True monotonicity = 'decreasing' results['timings']['near_monotonic_checks'] = time() - start_near_mono_check # Skip running the algorithm and return what was passed as currently_anomalous if near_monotonic: results['analysed'] = False reason = 'near strictly monotonically %s' % monotonicity results['skipped'] = {'reason': reason} if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore # Determine whether this is peak or trough analysis if anomaly_window < 10: tail_avg = sum([v for t, v in timeseries[-anomaly_window:]]) / anomaly_window else: tail_avg = sum([v for t, v in timeseries[-4:]]) / 4 # mean = np.mean([v for t, v in timeseries]) mean = np.mean(np_values) analysis_type = 'peak' if tail_avg < mean: analysis_type = 'trough' results['analysis_type'] = analysis_type peaks = [] number_of_peaks = 0 troughs = [] number_of_troughs = 0 # If the last data point is a peak find_peaks will not report it so add # predictions of 10 data points start_pred = time() pred_timeseries = fft_extrapolation(current_skyline_app, timeseries, n_predict=10, log=debug_logging) y_pred = np.array([v for t, v in pred_timeseries]) results['timings']['fft_extrapolation'] = time() - start_pred start_find_peaks = time() if analysis_type == 'peak': peaks, _ = find_peaks(y_pred, distance=1) # Remove predictions peaks = [int(index) for index in list(peaks) if index <= last_index] number_of_peaks = len(peaks) results['number_of_peaks'] = number_of_peaks all_peaks = list(peaks) if analysis_type == 'trough': troughs, _ = find_peaks(-y_pred, distance=1) # Remove predictions troughs = [int(index) for index in list(troughs) if index <= last_index] number_of_troughs = len(troughs) results['number_of_troughs'] = number_of_troughs all_troughs = list(troughs) results['timings']['find_peaks'] = time() - start_find_peaks peak_analysis = False if number_of_peaks > 1: peak_index = int(last_index) not_in_peaks = True # peaks and troughs are identified one the leading side for index in list(range(last_index - 4, last_index + 1)): if index in peaks: not_in_peaks = False peak_index = int(index) peak_value = timeseries[peak_index][1] results['peak_index'] = peak_index results['peak_value'] = peak_value if not not_in_peaks: analyse_peaks_only = True peak_analysis = True trough_analysis = False if number_of_troughs > 1: trough_index = int(last_index) not_in_troughs = True # peaks and troughs are identified one the leading side for index in list(range(last_index - 4, last_index + 1)): if index in troughs: not_in_troughs = False trough_index = int(index) trough_value = timeseries[trough_index][1] results['trough_index'] = trough_index results['trough_value'] = trough_value if not not_in_troughs: analyse_peaks_only = True trough_analysis = True if not analyse_peaks_only: if debug_logging: current_logger.info('%s :: criteria not met, not running, results: %s' % (algorithm_name, str(results))) results['analysis_runtime'] = time() - start if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore if debug_logging: current_logger.info('%s :: running analysis_type: %s, for %s' % ( algorithm_name, str(results['analysis_type']), str(results))) all_peak_values = [] # Only sample to 10 largest peaks if there are many peaks if peak_analysis: peak_values_list = [] for index in peaks: if index > last_index: break value = timeseries[index][1] all_peak_values.append(value) peak_values_list.append([index, value]) sorted_peak_values_list = sorted(peak_values_list, key=lambda x: x[1], reverse=True) if number_of_peaks > 20: peaks = [index for index, value in sorted_peak_values_list][0:10] sorted_peak_values_list = sorted_peak_values_list[0:10] peaks.sort() results['peaks'] = sorted_peak_values_list results['sorted_peak_values_list'] = sorted_peak_values_list if peak_analysis: peak_index = int(last_index) check_value = timeseries[peak_index][1] not_in_peaks = True # peaks and troughs are identified one the leading side for index in list(range(last_index - 4, last_index + 1)): if index in peaks: not_in_peaks = False peak_index = int(index) check_value = timeseries[index][1] if not_in_peaks: results['last_datapoint_not_in_peaks'] = True results['analysis_runtime'] = time() - start if debug_logging: current_logger.info('%s :: criteria not met, results: %s' % ( algorithm_name, str(results))) if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore if trough_analysis: trough_values_list = [] for index in troughs: if index > last_index: break value = timeseries[index][1] trough_values_list.append([index, value]) sorted_troughs_values_list = sorted(trough_values_list, key=lambda x: x[1], reverse=True) # Only sample to 10 largest troughs if there are many troughs if number_of_troughs > 20: troughs = [index for index, value in sorted_troughs_values_list][-10:] sorted_troughs_values_list = sorted_troughs_values_list[-10:] troughs.sort() results['troughs'] = sorted_troughs_values_list results['sorted_troughs_values_list'] = sorted_troughs_values_list if trough_analysis: trough_index = int(last_index) check_value = timeseries[trough_index][1] not_in_troughs = True # peaks and troughs are identified one the leading side for index in list(range(last_index - 4, last_index + 1)): if index in troughs: not_in_troughs = False trough_index = int(index) check_value = timeseries[trough_index][1] if not_in_troughs: results['last_datapoint_not_in_troughs'] = True results['analysis_runtime'] = time() - start if debug_logging: current_logger.info('%s :: criteria not met, results: %s' % ( algorithm_name, str(results))) if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore if analyse_peaks_only and peak_analysis: peak_values = [] peak_values_mean = False peak_values_median = False peak_values_stdDev = False threshold = False threshold_on = 'mean' # @modified 20230925 - variable sigma added # threshold_key = 'threshold (peak_values_mean + (3 * peak_values_stdDev))' threshold_key = 'threshold (peak_values_mean + (%s * peak_values_stdDev))' % str(sigma) for index in list(peaks): if index > last_index: break peak_values.append(timeseries[index][1]) if len(peak_values) == 1: anomalous = True if len(peak_values) > 1: # Coerce into floats peak_values_mean = float(np.mean(peak_values)) peak_values_median = float(np.median(peak_values)) peak_values_stdDev = float(np.std(peak_values)) # @modified 20230925 - variable sigma added # mean_threshold = peak_values_mean + (3 * peak_values_stdDev) mean_threshold = peak_values_mean + (sigma * peak_values_stdDev) threshold = mean_threshold # @modified 20230925 - variable sigma added # median_threshold = peak_values_median + (3 * peak_values_stdDev) median_threshold = peak_values_median + (sigma * peak_values_stdDev) use_median = False # Are there outlier peaks? all_peak_values = [item[1] for index, item in enumerate(timeseries) if index in all_peaks] # @added 20230812 # Only access if there are more than 2 peak values if len(all_peak_values) <= 2: if debug_logging: current_logger.info('%s :: only 2 peaks not evaluating, results: %s' % ( algorithm_name, str(results))) if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore # Coerce into floats all_peak_values_mean = float(np.mean(all_peak_values)) all_peak_values_median = float(np.median(all_peak_values)) all_peak_values_stdDev = float(np.std(all_peak_values)) # @modified 20230925 - variable sigma added # all_peak_median_threshold = all_peak_values_median + (3 * all_peak_values_stdDev) all_peak_median_threshold = all_peak_values_median + (sigma * all_peak_values_stdDev) peak_outliers = [v for v in all_peak_values if v > all_peak_median_threshold] # If there are outlier peaks then use the median if peak_outliers: use_median = True # If the peak itself is an outlier in the peak_values being # considered, then use the median # Coerce into floats # other_peak_values_mean = float(np.mean(peak_values[0:-2])) # other_peak_values_stdDev = float(np.std(peak_values[0:-2])) other_peak_values = [v for v in peak_values if v != check_value] if other_peak_values: other_peak_values_mean = float(np.mean(other_peak_values)) other_peak_values_stdDev = float(np.std(other_peak_values)) # @modified 20230925 - variable sigma added # other_mean_threshold = other_peak_values_mean + (3 * other_peak_values_stdDev) other_mean_threshold = other_peak_values_mean + (sigma * other_peak_values_stdDev) outlier_in_peak_values = False if check_value > other_mean_threshold: use_median = True outlier_in_peak_values = True else: outlier_in_peak_values = True use_median = True if use_median: threshold = median_threshold threshold_on = 'median' # @modified 20230925 - variable sigma added # threshold_key = 'threshold (peak_values_median + (3 * peak_values_stdDev))' threshold_key = 'threshold (peak_values_median + (%s * peak_values_stdDev))' % str(sigma) # @added 20230812 - increase sensitivity to outliers to negate fNs if outlier_in_peak_values: threshold = all_peak_median_threshold threshold_on = 'all_peak_median' # @modified 20230925 - variable sigma added # threshold_key = 'threshold (all_peak_values_median + (3 * all_peak_values_stdDev))' threshold_key = 'threshold (all_peak_values_median + (%s * all_peak_values_stdDev))' % str(sigma) # if timeseries[-1][1] > threshold: if check_value > threshold: anomalous = True anomalyScore = 1.0 else: anomalous = False if return_results: for index, item in enumerate(timeseries): score = 0 if index in peaks: if item[1] > threshold: score = 1 anomalies[int(item[0])] = {'value': item[1], 'index': index, 'score': 1} anomalyScore_list.append(score) results['analysed'] = True results['anomalous'] = anomalous results['anomalous_peak'] = anomalous results['anomalyScore'] = anomalyScore results['number_of_peaks'] = number_of_peaks results['peak_values_mean'] = peak_values_mean results['peak_values_median'] = peak_values_median results['peak_values_stdDev'] = peak_values_stdDev results['mean_threshold'] = mean_threshold results['median_threshold'] = median_threshold results['all_peak_values_mean'] = all_peak_values_mean results['all_peak_values_median'] = all_peak_values_median results['all_peak_values_stdDev'] = all_peak_values_stdDev results['all_peak_median_threshold'] = all_peak_median_threshold results['peak_outliers'] = peak_outliers results['outlier_in_peak_values'] = outlier_in_peak_values results['threshold_on'] = threshold_on results[threshold_key] = threshold results['threshold'] = threshold results['peak_values'] = peak_values results['analysis_runtime'] = time() - start # Place this at the end del results['anomalyScore_list'] if debug_logging: current_logger.info('%s :: results: %s' % (algorithm_name, str(results))) results['anomalies'] = anomalies results['anomalyScore_list'] = anomalyScore_list if analyse_peaks_only and trough_analysis: trough_values = [] trough_values_mean = False trough_values_median = False trough_values_stdDev = False threshold = False threshold_on = 'mean' # threshold_key = 'threshold ((3 * trough_values_stdDev) - trough_values_mean)' # @modified 20230925 - variable sigma added # threshold_key = 'threshold (trough_values_mean - (3 * trough_values_stdDev))' threshold_key = 'threshold (trough_values_mean - (%s * trough_values_stdDev))' % str(sigma) for index in list(troughs): if index > last_index: break trough_values.append(timeseries[index][1]) if len(trough_values) == 1: anomalous = True if len(trough_values) > 1: # Coerce into floats trough_values_mean = float(np.mean(trough_values)) trough_values_median = float(np.median(trough_values)) trough_values_stdDev = float(np.std(trough_values)) # mean_threshold = trough_values_mean - (3 * trough_values_stdDev) # mean_threshold = (3 * trough_values_stdDev) - trough_values_mean # @modified 20230925 - variable sigma added # mean_threshold = trough_values_mean - (3 * trough_values_stdDev) mean_threshold = trough_values_mean - (sigma * trough_values_stdDev) threshold = mean_threshold # median_threshold = trough_values_median - (3 * trough_values_stdDev) # median_threshold = (3 * trough_values_stdDev) - trough_values_median # @modified 20230925 - variable sigma added # median_threshold = trough_values_median - (3 * trough_values_stdDev) median_threshold = trough_values_median - (sigma * trough_values_stdDev) use_median = False # Are there outlier troughs all_trough_values = [item[1] for index, item in enumerate(timeseries) if index in all_troughs] # @added 20230812 # Only access if there are more than 2 trough values if len(all_trough_values) <= 2: if debug_logging: current_logger.info('%s :: only 2 trough not evaluating, results: %s' % (algorithm_name, str(results))) if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore # Coerce into floats all_trough_values_mean = float(np.mean(all_trough_values)) all_trough_values_median = float(np.median(all_trough_values)) all_trough_values_stdDev = float(np.std(all_trough_values)) # all_trough_median_threshold = all_trough_values_median - (3 * all_trough_values_stdDev) # all_trough_median_threshold = (3 * all_trough_values_stdDev) - all_trough_values_median # @modified 20230925 - variable sigma added # all_trough_median_threshold = all_trough_values_median - (3 * all_trough_values_stdDev) all_trough_median_threshold = all_trough_values_median - (sigma * all_trough_values_stdDev) trough_outliers = [v for v in all_trough_values if v < all_trough_median_threshold] # If there are outlier troughs then use the median if trough_outliers: use_median = True # If it is an outlier trough then use the median # Coerce into floats other_trough_values_mean = float(np.mean(trough_values[0:-2])) other_trough_values_stdDev = float(np.std(trough_values[0:-2])) # @modified 20230925 - variable sigma added # other_median_threshold = other_trough_values_mean - (3 * other_trough_values_stdDev) other_median_threshold = other_trough_values_mean - (sigma * other_trough_values_stdDev) outlier_in_trough_values = False if check_value < other_median_threshold: use_median = True outlier_in_trough_values = True if use_median: # threshold = trough_values_median - (3 * trough_values_stdDev) # threshold = (3 * trough_values_stdDev) - trough_values_median # @modified 20230925 - variable sigma added # threshold = trough_values_median - (3 * trough_values_stdDev) threshold = trough_values_median - (sigma * trough_values_stdDev) threshold_on = 'median' # threshold_key = 'threshold ((3 * trough_values_stdDev) - trough_values_median)' # @modified 20230925 - variable sigma added # threshold_key = 'threshold (trough_values_median - (3 * trough_values_stdDev))' threshold_key = 'threshold (trough_values_median - (%s * trough_values_stdDev))' % str(sigma) # @added 20230812 - increase sensitivity to outliers to negate fNs if outlier_in_trough_values: threshold = all_trough_median_threshold threshold_on = 'all_trough_median' # threshold_key = 'threshold (other_tough_values_mean + (3 * other_trough_values_stdDev))' # @modified 20230925 - variable sigma added # threshold_key = 'threshold (all_trough_values_median + (3 * all_trough_values_stdDev))' threshold_key = 'threshold (all_trough_values_median + (%s * all_trough_values_stdDev))' % str(sigma) # if timeseries[-1][1] < threshold: if check_value < threshold: anomalous = True anomalyScore = 1.0 else: anomalous = False if return_results: for index, item in enumerate(timeseries): score = 0 if index in troughs: if item[1] < threshold: score = 1 anomalies[int(item[0])] = {'value': item[1], 'index': index, 'score': 1} anomalyScore_list.append(score) results['analysed'] = True results['anomalous'] = anomalous results['anomalous_trough'] = anomalous results['anomalyScore'] = anomalyScore results['number_of_troughs'] = number_of_peaks results['trough_values_mean'] = float(trough_values_mean) results['trough_values_stdDev'] = float(trough_values_stdDev) results['mean_threshold'] = float(mean_threshold) results['median_threshold'] = float(median_threshold) results['all_trough_values_mean'] = float(all_trough_values_mean) results['all_trough_values_median'] = float(all_trough_values_median) results['all_trough_values_stdDev'] = float(all_trough_values_stdDev) results['all_trough_median_threshold'] = float(all_trough_median_threshold) results['trough_outliers'] = trough_outliers results['outlier_in_trough_values'] = outlier_in_trough_values results['threshold_on'] = threshold_on results[threshold_key] = threshold results['threshold'] = threshold results['trough_values'] = trough_values results['analysis_runtime'] = time() - start # Place this at the end del results['anomalyScore_list'] if debug_logging: current_logger.info('%s :: results: %s' % (algorithm_name, str(results))) results['anomalies'] = anomalies results['anomalyScore_list'] = anomalyScore_list except StopIteration: # This except pattern MUST be used in ALL custom algortihms to # facilitate the traceback from any errors. The algorithm we want to # run super fast and without spamming the log with lots of errors. # But we do not want the function returning and not reporting # anything to the log, so the pythonic except is used to "sample" any # algorithm errors to a tmp file and report once per run rather than # spewing tons of errors into the log e.g. analyzer.log if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) except Exception as err: record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback.format_exc()) if print_debug: print('error:', traceback.format_exc()) if debug_logging: current_logger.debug('debug :: error - on low_variance_anomalous_peak_trough - %s' % err) current_logger.debug(traceback.format_exc()) if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore if return_results: return anomalous, anomalyScore, results return anomalous, anomalyScore