Source code for custom_algorithms.skyline_matrixprofile

"""
THIS IS A MORE FEATUREFUL CUSTOM ALGORITHM to provide a skeleton to develop your
own custom algorithms.  The algorithm itself, although viable, is not
recommended for production or general use, it is simply a toy algorithm here to
demonstrate the structure of a more complex custom algorithm that has
``algorithm_parameters`` passed and can also log if enabled.
It is documented via comments #
"""

# REQUIRED Skyline imports.  All custom algorithms MUST have the following two
# imports.  These are required for exception handling and to record algorithm
# errors regardless of debug_logging setting for the custom_algorithm
import traceback
from timeit import default_timer as timer
import logging

from custom_algorithms import record_algorithm_error


# Import ALL modules that the custom algorithm requires.  Remember that if a
# requirement is not one that is provided by the Skyline requirements.txt you
# must ensure it is installed in the Skyline virtualenv
from collections import Counter
# @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy
# import matrixprofile as mp
import numpy as np
# This is required to cache the stumpy stump function so that the compile
# overhead is incurred once
# from numba import njit

# @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy
# The matrix-profile-foundation/matrixprofile library is no longer maintained
# and this has been replaced with the stumpy matrixprofile implementation.
# A switch to the library used is done here and it falls back to matrixprofile
# if stumpy is not available, however as of v4.0.0 matrixprofile has been
# removed from the requirements.txt
# stump_available = False
# mp = None
# try:
#     from stumpy import stump
#     stump_available = True
# except:
#     stump_available = False
# if not stump_available:
#     try:
#         import matrixprofile as mp
#     except:
#         mp = None
# import stumpy
from custom_algorithm_sources import stumpy

from functions.timeseries.downsample import downsample_timeseries

# The name of the fucntion MUST be the same as the name declared in
# settings.CUSTOM_ALGORITHMS.
# It MUST have 3 parameters:
# current_skyline_app, timeseries, algorithm_parameters
# See https://earthgecko-skyline.readthedocs.io/en/latest/algorithms/custom-algorithms.html
# for a full explanation about each.
# ALWAYS WRAP YOUR ALGORITHM IN try and except
[docs]def skyline_matrixprofile(current_skyline_app, parent_pid, timeseries, algorithm_parameters): """ The skyline_matrixprofile algorithm uses matrixprofile to identify discords. :param current_skyline_app: the Skyline app executing the algorithm. This will be passed to the algorithm by Skyline. This is **required** for error handling and logging. You do not have to worry about handling the argument in the scope of the custom algorithm itself, but the algorithm must accept it as the first agrument. :param parent_pid: the parent pid which is executing the algorithm, this is **required** for error handling and logging. You do not have to worry about handling this argument in the scope of algorithm, but the algorithm must accept it as the second argument. :param timeseries: the time series as a list e.g. ``[[1578916800.0, 29.0], [1578920400.0, 55.0], ... [1580353200.0, 55.0]]`` :param algorithm_parameters: a dictionary of any required parameters for the custom_algorithm and algorithm itself. For the matrixprofile custom algorithm the following parameters are required, example: ``algorithm_parameters={ 'check_details': {<empty_dict|check_details dict>}, 'full_duration': full_duration, 'windows': int }`` :type current_skyline_app: str :type parent_pid: int :type timeseries: list :type algorithm_parameters: dict :return: True, False or Non :rtype: boolean """ # You MUST define the algorithm_name # @modified 20230118 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # Changed to full name # algorithm_name = 'matrixprofile' algorithm_name = 'skyline_matrixprofile' func_name = str(algorithm_name) # Define the default state of None and None, anomalous does not default to # False as that is not correct, False is only correct if the algorithm # determines the data point is not anomalous. The same is true for the # anomalyScore. anomalous = None anomalyScore = None results = {'anomalies': [], 'matrixprofile_scores': []} current_logger = None # If you wanted to log, you can but this should only be done during # testing and development def get_log(current_skyline_app): current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) return current_logger start = timer() # Use the algorithm_parameters to determine the sample_period debug_logging = None try: debug_logging = algorithm_parameters['debug_logging'] except: debug_logging = False debug_logging = True # Determine if a context is passed with algorithm_parameters context = None try: context = algorithm_parameters['context'] except KeyError: context = None if context: func_name = '%s :: %s' % (context, algorithm_name) # @added 20221126 - Feature #4734: mirage_vortex anomalies = {} matrixprofile_anomalies = [] return_results = False try: return_results = algorithm_parameters['return_results'] except: return_results = False if debug_logging: try: current_logger = get_log(current_skyline_app) current_logger.debug('debug :: %s :: debug_logging enabled with algorithm_parameters - %s' % ( func_name, str(algorithm_parameters))) except: # This except pattern MUST be used in ALL custom algortihms to # facilitate the traceback from any errors. The algorithm we want to # run super fast and without spamming the log with lots of errors. # But we do not want the function returning and not reporting # anything to the log, so the pythonic except is used to "sample" any # algorithm errors to a tmp file and report once per run rather than # spewing tons of errors into the log e.g. analyzer.log record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback.format_exc()) # Return None and None as the algorithm could not determine True or False if return_results: return (None, None, results) return (None, None) # @njit(cache=True) # def numba_stump(ts, windows): # profile = stump(ts, m=windows) # return profile # Use the algorithm_parameters to determine if there are check_details check_details = {} try: check_details = algorithm_parameters['check_details'] if debug_logging: current_logger.debug('debug :: %s :: snab check_details - %s' % ( func_name, str(check_details))) except Exception as err: traceback_msg = traceback.format_exc() if current_skyline_app == 'snab': if not traceback_msg: traceback_msg = 'None' current_logger.error(traceback_msg) if debug_logging: current_logger.error('error :: debug :: could not determine check_details from algorithm_parameters - %s - %s' % ( str(algorithm_parameters), err)) current_logger.debug('debug :: %s :: snab check_details - %s' % ( func_name, str(check_details))) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) # Return None and None as the algorithm could not determine True or False if return_results: return (None, None, results) return (None, None) else: pass # @added 20221126 - Feature #4734: mirage_vortex anomaly_window = 1 try: anomaly_window = algorithm_parameters['anomaly_window'] except: anomaly_window = 1 # Allow the matrixprofile windows parameter to be passed in the # check_details for snab as well windows = None if check_details: try: windows = check_details['windows'] if debug_logging: current_logger.debug('debug :: %s :: windows - %s - determined from check_details' % ( func_name, str(windows))) except: windows = None if not windows: try: windows = algorithm_parameters['windows'] if debug_logging: current_logger.debug('debug :: %s :: windows - %s' % ( func_name, str(windows))) except Exception as err: # This except pattern MUST be used in ALL custom algortihms to # facilitate the traceback from any errors. The algorithm we want to # run super fast and without spamming the log with lots of errors. # But we do not want the function returning and not reporting # anything to the log, so the pythonic except is used to "sample" any # algorithm errors to a tmp file and report once per run rather than # spewing tons of errors into the log e.g. analyzer.log if debug_logging: current_logger.error('error :: %s :: windows not passed - %s' % ( func_name, err)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback.format_exc()) # Return None and None as the algorithm could not determine True or False if return_results: return (None, None, results) return (None, None) # Allow the matrixprofile k discords parameter to be passed in the # check_details for snab as well k_discords = None if check_details: try: k_discords = int(check_details['k_discords']) if debug_logging: current_logger.debug('debug :: %s :: k_discords - %s - determined from check_details' % ( func_name, str(k_discords))) except: k_discords = None if not k_discords: try: k_discords = algorithm_parameters['k_discords'] if debug_logging: current_logger.debug('debug :: %s :: k_discords - %s' % ( func_name, str(k_discords))) except: # Default to discovering 20 discords k_discords = 20 # ALWAYS WRAP YOUR ALGORITHM IN try and the BELOW except try: start_preprocessing = timer() # INFO: Sorting time series of 10079 data points took 0.002215 seconds timeseries = sorted(timeseries, key=lambda x: x[0]) if debug_logging: current_logger.debug('debug :: %s :: time series of length - %s' % ( func_name, str(len(timeseries)))) # Testing the data to ensure it meets minimum requirements, in the case # of Skyline's use of the matrixprofile algorithm this means that: # - the time series must have at least 75% of its full_duration # - the time series must have at least 99% of the data points for the # in the sample being analysed. do_not_use_sparse_data = False if do_not_use_sparse_data: # Default for analyzer at required period to 18 hours period_required = int(86400 * 0.75) total_period = 0 total_datapoints = 0 try: start_timestamp = int(timeseries[0][0]) end_timestamp = int(timeseries[-1][0]) total_period = end_timestamp - start_timestamp total_datapoints = len(timeseries) except: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: falied to determine total_period and total_datapoints' % ( func_name)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) timeseries = [] if not timeseries: if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) if current_skyline_app == 'snab': try: full_duration = check_details['full_duration'] period_required = int(full_duration * 0.75) except: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: falied to determine total_period and total_datapoints' % ( func_name)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) # If the time series does not have 75% of its full_duration it does not # have sufficient data to sample try: if total_period < period_required: if debug_logging: current_logger.debug('debug :: %s :: time series does not have sufficient data' % ( func_name)) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) except: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: falied to determine if time series has sufficient data' % ( func_name)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) # If the time series does not have 75% of its full_duration data points # it does not have sufficient data to sample # Determine resolution from the last 30 data points # INFO took 0.002060 seconds resolution_timestamps = [] metric_resolution = False for metric_datapoint in timeseries[-30:]: timestamp = int(metric_datapoint[0]) resolution_timestamps.append(timestamp) timestamp_resolutions = [] if resolution_timestamps: last_timestamp = None for timestamp in resolution_timestamps: if last_timestamp: resolution = timestamp - last_timestamp timestamp_resolutions.append(resolution) last_timestamp = timestamp else: last_timestamp = timestamp try: del resolution_timestamps except: pass if timestamp_resolutions: try: timestamp_resolutions_count = Counter(timestamp_resolutions) ordered_timestamp_resolutions_count = timestamp_resolutions_count.most_common() metric_resolution = int(ordered_timestamp_resolutions_count[0][0]) except: traceback_msg = traceback.format_exc() record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: failed to determine if time series has sufficient data' % ( func_name)) try: del timestamp_resolutions except: pass minimum_datapoints = None if metric_resolution: minimum_datapoints = int(period_required / metric_resolution) if minimum_datapoints: if total_datapoints < minimum_datapoints: if debug_logging: current_logger.debug('debug :: %s :: time series does not have sufficient data, minimum_datapoints required is %s and time series has %s' % ( func_name, str(minimum_datapoints), str(total_datapoints))) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) # Is the time series fully populated? # full_duration_datapoints = int(full_duration / metric_resolution) total_period_datapoints = int(total_period / metric_resolution) minimum_percentage_sparsity = 95 sparsity = int(total_datapoints / (total_period_datapoints / 100)) if sparsity < minimum_percentage_sparsity: if debug_logging: current_logger.debug('debug :: %s :: time series does not have sufficient data, minimum_percentage_sparsity required is %s and time series has %s' % ( func_name, str(minimum_percentage_sparsity), str(sparsity))) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) original_timeseries = [] downsample_data = False if current_skyline_app == 'mirage' and len(timeseries) > 1600: try: timestamps = [int(item[0]) for item in timeseries] np_timestamps = np.array(timestamps) ts_diffs = np.diff(np_timestamps) resolution_counts = np.unique(ts_diffs, return_counts=True) resolution = resolution_counts[0][np.argmax(resolution_counts[1])] if resolution < 600: downsample_data = True except Exception as err: traceback_msg = traceback.format_exc() record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: resolution failed - %s' % ( func_name, err)) if downsample_data: original_timeseries = list(timeseries) try: downsampled_timeseries = downsample_timeseries(current_skyline_app, timeseries, resolution, 600, 'mean', 'end') timeseries = list(downsampled_timeseries) if debug_logging: current_logger.debug('debug :: %s :: downsampled timeseries from resolution %s with %s datapoints to resolution 600 with %s datapoints' % ( func_name, str(resolution), str(len(original_timeseries)), str(len(timeseries)))) except Exception as err: traceback_msg = traceback.format_exc() record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: downsample_timeseries failed - %s' % ( func_name, err)) # Preprocess the data into the required matrixprofile format and run the # data through the matrixprofile algorithm # Initially a POC was attempted using a reversed time series to validate # whether matrixprofile was identifying discords in the last windows of # the timeseries # reversed_timeseries = timeseries[::-1] try: dataset = [float(item[1]) for item in timeseries] # Do not reverse - after the right yssiM # dataset = dataset[::-1] ts = np.array(dataset) except Exception as err: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: failed to create ts - %s' % ( func_name, err)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) end_preprocessing = timer() preprocessing_runtime = end_preprocessing - start_preprocessing if debug_logging: current_logger.debug('debug :: %s :: preprocessing took %.6f seconds' % ( func_name, preprocessing_runtime)) # @added 20230113 - Task #4786: Switch from matrixprofile to stumpy stumpy_available = False if stumpy_available: start_dir_list = timer() loaded_modules = list(dir()) if debug_logging: current_logger.debug('debug :: %s :: dir() took %s seconds and listed %s loaded modules' % ( func_name, str(timer() - start_dir_list), str(len(loaded_modules)))) if 'np' in loaded_modules: if debug_logging: current_logger.debug('debug :: %s :: np is loaded' % ( func_name)) else: if debug_logging: current_logger.debug('debug :: %s :: np is not loaded' % ( func_name)) if 'stump' not in loaded_modules: if debug_logging: current_logger.debug('debug :: %s :: stump is not loaded' % ( func_name)) try: from stumpy import stump stumpy_available = True if debug_logging: current_logger.debug('debug :: %s :: stump imported from stumpy' % ( func_name)) except Exception as err: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: failed to load stump from stumpy - %s' % ( func_name, err)) stumpy_available = False else: stumpy_available = True if debug_logging: current_logger.debug('debug :: %s :: stump was already imported from stumpy' % ( func_name)) profile = None start_compute = timer() try: # @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy # if not stumpy_available: # profile = mp.compute(ts, windows=windows) # else: # if debug_logging: # current_logger.debug('debug :: %s :: calculating profile with stump' % ( # algorithm_name)) # profile = stump(ts, m=windows) # # profile = numba_stump(ts, windows) if debug_logging: current_logger.debug('debug :: %s :: calculating profile with stump' % ( func_name)) profile = stumpy.stump(ts, m=windows) if debug_logging: current_logger.debug('debug :: %s :: calculated profile with stump OK' % ( func_name)) except Exception as err: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) # @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy # if not stumpy_available: # current_logger.error('error :: debug_logging :: %s :: failed to run mp.compute on ts - %s' % ( # algorithm_name, err)) # else: current_logger.error('error :: debug_logging :: %s :: failed to run stump on ts - %s' % ( func_name, err)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) end_compute = timer() compute_runtime = end_compute - start_compute if debug_logging: # @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy # if not stumpy_available: # current_logger.debug('debug :: %s :: mp.compute took %.6f seconds' % ( # algorithm_name, compute_runtime)) # else: current_logger.debug('debug :: %s :: stumpy.stump took %.6f seconds to generate a profile' % ( func_name, compute_runtime)) if isinstance(profile, np.ndarray): current_logger.debug('debug :: %s :: stumpy.stump generated a profile of length: %s' % ( func_name, str(len(profile)))) else: current_logger.warning('warning :: %s :: stumpy.stump did not generate a profile' % ( func_name)) start_discord = timer() discover_discords = False # if not stumpy_available: # if profile: # discover_discords = True # else: # if isinstance(profile, np.ndarray): # discover_discords = True if isinstance(profile, np.ndarray): discover_discords = True # @modified 20230112 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # stmupy generates a np.ndarray which results in a ValueError when used # with if, e.g. The truth value of an array with more than one element is # ambiguous. Use a.any() or a.all() # if profile: if discover_discords: try: # @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy # if not stumpy_available: # profile = mp.discover.discords(profile, k=k_discords) # else: profile = np.argsort(profile[:, 0])[-k_discords:] # sorted_profile = np.argsort(profile[:, 0]) # discords_profile = [] # n_discords = int(k_discords) * -1 # for i in list(range(n_discords, 0)): # discords_profile.append(sorted_profile[i]) # profile = list(discords_profile) # @added 20230112 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies if debug_logging: current_logger.debug('debug :: %s :: stumpy.stump profile discords: %s' % ( func_name, str(profile))) except Exception as err: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) if not stumpy_available: current_logger.error('error :: debug_logging :: %s :: failed to run mp.discover.discords on profile' % ( func_name)) else: current_logger.error('error :: debug_logging :: %s :: failed to run np.argsort on profile - %s' % ( func_name, err)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) end_discord = timer() discord_runtime = end_discord - start_discord if debug_logging: # @modified 20230112 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # if not stumpy_available: # current_logger.debug('debug :: %s :: mp.discover.discords for %s k discords took %.6f seconds' % ( # algorithm_name, str(k_discords), discord_runtime)) # else: current_logger.debug('debug :: %s :: determining %s k-discords took %.6f seconds' % ( func_name, str(k_discords), discord_runtime)) discords = [] # @modified 20230112 - Task #4786: Switch from matrixprofile to stumpy # Task #4778: v4.0.0 - update dependencies # if profile: if discover_discords: try: # @modified 20230104 - Task #4786: Switch from matrixprofile to stumpy # if not stumpy_available: # for discord in profile['discords']: # discords.append(discord) # else: for discord in np.sort(profile): discords.append(discord) except KeyError: anomalous = False anomalyScore = 0.0 if debug_logging: current_logger.debug('debug :: %s :: no discords discovered, not anomalous' % ( func_name)) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) except Exception as err: traceback_msg = traceback.format_exc() if debug_logging: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: failed to determine discords - %s' % ( func_name, err)) record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) if discords: anomaly_timestamp = int(timeseries[-1][0]) anomaly_index = 0 for index, item in enumerate(timeseries): if int(item[0]) == int(anomaly_timestamp): anomaly_index = index break anonamlous_period_indices = [] for index, item in enumerate(timeseries): # @modified 20210630 # if index in range((anomaly_index - 10), anomaly_index): if index in range((anomaly_index - windows), anomaly_index): anonamlous_period_indices.append(index) anomalous = False discord_anomalies = [] for discord in discords: if discord in anonamlous_period_indices: anomalous = True for index in anonamlous_period_indices: if discord == index: discord_anomalies.append(index) if debug_logging: current_logger.debug('debug :: %s :: anomalous :: anomalous_timeseries index - %s' % ( func_name, str(index))) if anomalous: anomalyScore = 1.0 else: anomalyScore = 0.0 # @added 20221126 - Feature #4734: mirage_vortex if return_results: for index, item in enumerate(timeseries): if index in discords: matrixprofile_anomalies.append(1) ts = int(item[0]) anomalies[ts] = {'value': item[1], 'index': index, 'score': 1} else: matrixprofile_anomalies.append(0) anomaly_sum = sum(matrixprofile_anomalies[-anomaly_window:]) if anomaly_sum > 0: anomalous = True anomalyScore = 1.0 else: anomalous = False anomalyScore = 0.0 if debug_logging: current_logger.debug('debug :: %s :: anomalous - %s, anomalyScore - %s' % ( func_name, str(anomalous), str(anomalyScore))) # @added 20221126 - Feature #4734: mirage_vortex if return_results: results = { 'anomalous': anomalous, 'anomalies': anomalies, 'anomalyScore_list': matrixprofile_anomalies, 'scores': matrixprofile_anomalies, } if debug_logging: end = timer() processing_runtime = end - start current_logger.debug('debug :: %s :: completed analysis in %.6f seconds' % ( func_name, processing_runtime)) try: del timeseries except: pass if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore) except StopIteration: # This except pattern MUST be used in ALL custom algortihms to # facilitate the traceback from any errors. The algorithm we want to # run super fast and without spamming the log with lots of errors. # But we do not want the function returning and not reporting # anything to the log, so the pythonic except is used to "sample" any # algorithm errors to a tmp file and report once per run rather than # spewing tons of errors into the log e.g. analyzer.log if return_results: return (anomalous, anomalyScore, results) return (None, None) except Exception as err: traceback_msg = traceback.format_exc() if debug_logging: try: current_logger.error(traceback_msg) current_logger.error('error :: debug_logging :: %s :: failed - %s' % ( func_name, err)) except: pass record_algorithm_error(current_skyline_app, parent_pid, algorithm_name, traceback_msg) # Return None and None as the algorithm could not determine True or False if return_results: return (anomalous, anomalyScore, results) return (None, None) if return_results: return (anomalous, anomalyScore, results) return (anomalous, anomalyScore)