Source code for luminosity.classify_metrics

import os
from os import getpid
import logging
import traceback
from operator import itemgetter
from timeit import default_timer as timer
from time import time
from collections import Counter
from ast import literal_eval

import settings
from skyline_functions import (
    get_redis_conn, get_redis_conn_decoded, get_graphite_metric,
    write_data_to_file)

try:
    from custom_algorithms import run_custom_algorithm_on_timeseries
except:
    run_custom_algorithm_on_timeseries = None

# @added 20210316 - Feature #3978: luminosity - classify_metrics
#                   Feature #3642: Anomaly type classification
try:
    luminosity_data_folder = settings.LUMINOSITY_DATA_FOLDER
except:
    luminosity_data_folder = '/opt/skyline/luminosity'
save_plots_to = '%s/classify_metrics' % luminosity_data_folder

try:
    LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT = settings.LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT
except:
    LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT = False
try:
    LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT = settings.LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT
except:
    LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT = False
try:
    LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES = settings.LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES
except:
    LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES = []

# Database configuration
config = {'user': settings.PANORAMA_DBUSER,
          'password': settings.PANORAMA_DBUSERPASS,
          'host': settings.PANORAMA_DBHOST,
          'port': settings.PANORAMA_DBPORT,
          'database': settings.PANORAMA_DATABASE,
          'raise_on_warnings': True}

skyline_app = 'luminosity'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)

redis_conn = get_redis_conn(skyline_app)
redis_conn_decoded = get_redis_conn_decoded(skyline_app)


[docs]def classify_metrics(i, start_timestamp, classify_for): logger = logging.getLogger(skyline_app_logger) debug_algorithms = False logger.info('classify_metrics :: with start_timestamp - %s' % str(start_timestamp)) start_classify_metrics = timer() # Handle luminosity running with multiple processes def manage_processing_key(current_pid, base_name, classify_for, action): result = False processing_key = 'luminosity.classify_metrics.processing.%s' % str(base_name) if action == 'add': key_exists = None try: key_exists = redis_conn_decoded.get(processing_key) if key_exists: result = False return result except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - luminosity.classify_metrics.level_shift' % ( base_name)) try: data = {'pid': current_pid, 'timestamp': int(time())} redis_conn.setex(processing_key, classify_for, str(data)) result = True logger.info('classify_metrics :: managing %s added %s with %s' % ( str(base_name), processing_key, str(data))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to create key %s' % ( processing_key)) if action == 'remove': try: redis_conn.delete(processing_key) result = True except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove key %s' % ( processing_key)) return result def check_significant_level_shift(check_timeseries, base_name, anomalies, significant_length, save_plots_to): # Determine resolution from the last 30 data points # INFO took 0.002060 seconds resolution_timestamps = [] for metric_datapoint in check_timeseries[-30:]: timestamp = int(metric_datapoint[0]) resolution_timestamps.append(timestamp) timestamp_resolutions = [] if resolution_timestamps: last_timestamp = None for timestamp in resolution_timestamps: if last_timestamp: resolution = timestamp - last_timestamp timestamp_resolutions.append(resolution) last_timestamp = timestamp else: last_timestamp = timestamp try: del resolution_timestamps except: pass if timestamp_resolutions: try: timestamp_resolutions_count = Counter(timestamp_resolutions) ordered_timestamp_resolutions_count = timestamp_resolutions_count.most_common() metric_resolution = int(ordered_timestamp_resolutions_count[0][0]) except: pass try: del timestamp_resolutions except: pass continuous_periods = {} last_timestamp = None for ts, value in anomalies: if not last_timestamp: continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] last_timestamp = ts last_period = ts continue if (ts - metric_resolution) == last_timestamp: continuous_periods_timestamps = continuous_periods[last_period]['timestamps'] continuous_periods_timestamps.append(ts) last_timestamp = ts continuous_periods[last_period]['timestamps'] = continuous_periods_timestamps else: continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] last_timestamp = ts last_period = ts logger.info('classify_metrics :: %s level_shift_ad anomalies detected spanning %s continuous_periods on %s, result: %s, anomalyScore: %s' % ( str(len(anomalies)), str(len(continuous_periods)), str(base_name), str(result), str(anomalyScore))) persist_ad_dict = {} try: metric_dir = base_name.replace('.', '/') timestamp_dir = str(int(check_timeseries[-1][0])) PersistAD_data_file = '%s/%s/%s/%s/PersistAD.data.txt' % ( save_plots_to, custom_algorithm, metric_dir, timestamp_dir) if os.path.isfile(PersistAD_data_file): try: with open(PersistAD_data_file) as f: for line in f: persist_ad_dict = literal_eval(line) if persist_ad_dict: break except Exception as e: trace = traceback.format_exc() logger.error(trace) fail_msg = 'error :: get_classify_metrics :: failed to open PersistAD_data_file: %s - %s' % (PersistAD_data_file, e) logger.error('%s' % fail_msg) except: logger.error(traceback.format_exc()) logger.error('error :: failed to check PersistAD_data_file for %s' % base_name) persist_ad_anomalies = [] if persist_ad_dict: persist_ad_anomalies_dict = persist_ad_dict['anomalies'] persist_ad_anomalies = list(persist_ad_anomalies_dict.keys()) significant_level_shift = False if not persist_ad_anomalies: # A metric is deemed to have level shifted significantly if it only # does so occassionally and the shift lasts longer than 5 data # points # To identify metrics that are likely candidates to run the LevelShiftAD # algorithm against during realtime analysis, candidates should be # fairly stable and not trigger LevelShiftAD a lot. if len(continuous_periods) <= 2: for timestamp in continuous_periods: length_of_level_shift = len(continuous_periods[timestamp]['timestamps']) logger.debug('debug :: classify_metrics :: %s - continuous_periods[%s] - %s' % ( base_name, str(timestamp), str(continuous_periods[timestamp]))) logger.debug('debug :: classify_metrics :: %s - length_of_level_shift: %s, significant_length: %s' % ( base_name, str(length_of_level_shift), str(significant_length))) if length_of_level_shift >= significant_length: significant_level_shift = True logger.info('classify_metrics :: level shift metric - %s' % base_name) triggered_timestamp = int(timestamp) logger.info('classify_metrics :: level shift metric - %s at %s' % (base_name, str(triggered_timestamp))) if triggered_timestamp: try: redis_conn.hset('luminosity.classify_metrics.level_shift', base_name, triggered_timestamp) break except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - luminosity.classify_metrics.level_shift' % ( base_name)) if not significant_level_shift: logger.info('classify_metrics :: no significant, sustained level shifts - %s' % base_name) else: logger.info('classify_metrics :: not a level shift candidate metric - %s' % base_name) if persist_ad_anomalies: level_shift_anomalies_timestamps = [ts for ts, value in anomalies] persist_anomalies_timestamps = list(persist_ad_anomalies) shared_timestamps = [] for timestamp in level_shift_anomalies_timestamps: if timestamp in persist_anomalies_timestamps: shared_timestamps.append(timestamp) logger.info('classify_metrics :: LevelShiftAD and PersistAD have %s shared anomaly timestamps - %s' % ( str(len(shared_timestamps)), base_name)) continuous_periods = {} if shared_timestamps: last_timestamp = None for ts in shared_timestamps: if not last_timestamp: continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] last_timestamp = ts last_period = ts continue if (ts - metric_resolution) == last_timestamp: continuous_periods_timestamps = continuous_periods[last_period]['timestamps'] continuous_periods_timestamps.append(ts) last_timestamp = ts continuous_periods[last_period]['timestamps'] = continuous_periods_timestamps else: continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] last_timestamp = ts last_period = ts continuous_periods = {} last_timestamp = None last_p_timestamp = None for ts in level_shift_anomalies_timestamps: if ts in persist_anomalies_timestamps: if last_p_timestamp: if ts < last_p_timestamp: continue continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] if not last_p_timestamp: last_p_timestamp = ts last_timestamp = ts for p_ts in persist_anomalies_timestamps: if p_ts < ts: continue if (p_ts - metric_resolution) == last_p_timestamp: continuous_periods_timestamps = continuous_periods[ts]['timestamps'] continuous_periods_timestamps.append(p_ts) last_p_timestamp = p_ts continuous_periods[ts]['timestamps'] = continuous_periods_timestamps significant_level_shift_timestamps = [] if continuous_periods: for timestamp in continuous_periods: length_of_level_shift = len(continuous_periods[timestamp]['timestamps']) if length_of_level_shift >= significant_length: significant_level_shift_timestamps.append(timestamp) if len(significant_level_shift_timestamps) > 0: significant_level_shift = True try: redis_conn.hset('luminosity.classify_metrics.level_shift', base_name, significant_level_shift_timestamps[0]) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - luminosity.classify_metrics.level_shift' % ( base_name)) if not significant_level_shift: logger.info('classify_metrics :: no significant, persisted level shifts - %s' % base_name) else: logger.info('classify_metrics :: %s significant, persisted level shifts detected - %s' % ( str(len(significant_level_shift_timestamps)), base_name)) return significant_level_shift # Get all metrics unique_base_names = [] try: unique_base_names = list(redis_conn_decoded.smembers('aet.analyzer.unique_base_names')) except: logger.info(traceback.format_exc()) logger.error('error :: classify_metrics :: could not get the aet.analyzer.unique_base_names key from Redis') # Get last processed timestamps last_classified_hash_key = 'luminosity.last.classify_metrics' last_classified = [] try: last_classified = redis_conn_decoded.hgetall(last_classified_hash_key) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to get Redis hash key - %s' % last_classified_hash_key) classified_metrics = [] if last_classified: for metric in last_classified: classified_metrics.append(metric) sorted_last_classified = [] if last_classified: try: sorted_last_classified = sorted(last_classified.items(), key=itemgetter(1), reverse=False) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to sort last_classified by timestamp values') sorted_last_classified = [] logger.info('classify_metrics :: %s metrics sorted by oldest analyzed timestamp first' % ( str(len(sorted_last_classified)))) metrics_to_analyse = [] if not classified_metrics: metrics_to_analyse = list(unique_base_names) logger.info('classify_metrics :: no classified_metrics so unique_base_names has been added to metrics_to_analyse') if unique_base_names and classified_metrics: try: unique_base_names_set = set(list(unique_base_names)) classified_metrics_set = set(list(classified_metrics)) if unique_base_names_set == classified_metrics_set: logger.info('classify_metrics :: unique_base_names_set and classified_metrics_set are the same') else: set_difference = unique_base_names_set.difference(classified_metrics_set) for metric in set_difference: metrics_to_analyse.append(metric) logger.info('classify_metrics :: there are %s metrics that have not been classified that have been added to metrics_to_analyse' % str(len(metrics_to_analyse))) del set_difference except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to determine whether the unique_base_names_set and classified_metrics_set are different') last_classified_metrics = [] if sorted_last_classified: try: last_classified_metrics = [item[0] for item in sorted_last_classified] logger.info('classify_metrics :: reordered last_classified_metrics by oldest analyzed timestamp from Redis hash key - %s' % ( last_classified_hash_key)) except: logger.error(traceback.format_exc()) logger.error('error :: failed to generate list of metric names from sorted_last_classified') continue_to_process_metrics = [] continue_processing_metrics = [] continue_processing_hash_key = 'luminosity.classify_metric.continue_processing_metrics' try: continue_to_process_metrics = redis_conn_decoded.hgetall(continue_processing_hash_key) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to get Redis hash key - %s' % continue_processing_hash_key) if continue_to_process_metrics: try: continue_processing_metrics_list = sorted(continue_to_process_metrics.items(), key=itemgetter(1), reverse=False) continue_processing_metrics = [item[0] for item in continue_processing_metrics_list] except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to sort last_classified by timestamp values') continue_processing_metrics = [] if continue_processing_metrics: logger.info('classify_metrics :: there are %s metrics to continue to process' % str(len(continue_processing_metrics))) metrics_to_analyse = continue_processing_metrics + metrics_to_analyse if last_classified_metrics: metrics_to_analyse = metrics_to_analyse + last_classified_metrics logger.info('classify_metrics :: there are %s metrics in metrics_to_analyse' % str(len(metrics_to_analyse))) # Get mirage resolutions mirage_metric_resolutions_dict = {} if metrics_to_analyse: try: mirage_metric_resolutions_dict = redis_conn_decoded.hgetall('mirage.hash_key.metrics_resolutions') except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to get Redis hash key - mirage.hash_key.metrics_resolutions') metrics_to_process_per_run = int(len(unique_base_names) / (14400 / 60)) logger.info('classify_metrics :: metrics_to_process_per_run - %s' % ( str(metrics_to_process_per_run))) current_pid = getpid() # Because this is periodic analysis only analyse data from before the last # half hour calculate_until = int(start_timestamp) - 1800 metrics_proceessed = 0 metrics_classified = 0 metrics_in_ttl = 0 classifications_to_do = False # use_ttl = False use_ttl = True check_long_term_for_level_shift = False level_shifted_skipped_metrics = 0 # @added 20210315 - Feature #3978: luminosity - classify_metrics # When a new metric is added check the long term timeseries for level shifts if LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT: check_long_term_for_level_shift = True classifications_to_do = True if LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT: classifications_to_do = True # @added 20210316 - Feature #3978: luminosity - classify_metrics # Exclude metrics already classified known_level_shift_metrics = [] if LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT: try: known_level_shift_metrics_dict = redis_conn_decoded.hgetall('luminosity.classify_metrics.level_shift') if known_level_shift_metrics_dict: known_level_shift_metrics = list(known_level_shift_metrics_dict.keys()) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to get Redis hash key - luminosity.classify_metrics.level_shift') logger.info('classify_metrics :: to exclude - known_level_shift_metrics: %s' % ( str(len(known_level_shift_metrics)))) known_volatility_shift_metrics = [] if LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT: try: known_volatility_shift_metrics_dict = redis_conn_decoded.hgetall('luminosity.classify_metrics.volatility_shift') if known_volatility_shift_metrics_dict: known_volatility_shift_metrics = list(known_volatility_shift_metrics_dict.keys()) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to get Redis hash key - luminosity.classify_metrics.volatility_shift') logger.info('classify_metrics :: to exclude - known_volatility_shift_metrics: %s' % ( str(len(known_volatility_shift_metrics)))) base_names_processed = [] for base_name in metrics_to_analyse: if not classifications_to_do: logger.info('classify_metrics :: no classifications to do, nothing enabled') break if metrics_proceessed == metrics_to_process_per_run: break if base_name in base_names_processed: continue base_names_processed.append(base_name) time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 0.3): logger.info('classify_metrics :: stopping before timeout is reached') break last_analyzed_timestamp = None if last_classified: try: last_analyzed_timestamp = int(last_classified[base_name]) # logger.debug('debug :: classify_metrics :: last_analyzed_timestamp - %s - %s' % (base_name, str(last_analyzed_timestamp))) if use_ttl: if (last_analyzed_timestamp + 14400) > time_now: metrics_in_ttl += 1 continue except: last_analyzed_timestamp = None # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'add') except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to run manage_processing_key') if not manage_metric: logger.info('classify_metrics :: skipping as processing_key exists for %s' % base_name) continue if last_analyzed_timestamp: processed_seconds_ago = int(time_now) - int(last_analyzed_timestamp) logger.info('classify_metrics :: last processed %s seconds ago, processing %s' % ( str(processed_seconds_ago), str(base_name))) use_full_duration = int(settings.FULL_DURATION) try: metric_second_order_resolution_hours = mirage_metric_resolutions_dict[base_name] use_full_duration = int(metric_second_order_resolution_hours) * 3600 except: use_full_duration = int(settings.FULL_DURATION) try: from_timestamp = int(calculate_until) - int(use_full_duration) until_timestamp = int(calculate_until) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: calculating from_timestamp and until_timestamp for %s' % base_name) skip_level_shift = False check_new_metric_for_long_term_for_level_shift = False # @added 20210315 - # Analyse a 6 month period to determine if any level shifts have occurred metric_already_classified = False if LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT: if LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES: for namespace in LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES: if namespace in base_name: skip_level_shift = True last_analyzed_timestamp = time_now level_shifted_skipped_metrics += 1 checking_long_term_for_level_shift = False long_term_processing_key = 'luminosity.long_term_processing.%s' % str(base_name) if not last_analyzed_timestamp: checking_long_term_for_level_shift = True check_new_metric_for_long_term_for_level_shift = True try: redis_conn.setex(long_term_processing_key, 600, int(time())) logger.info('classify_metrics :: long_term_processing_key set - %s' % ( long_term_processing_key)) except: pass else: if last_analyzed_timestamp < (int(calculate_until) - (86400 * 7)): try: metric_being_processed = int(redis_conn_decoded.get(long_term_processing_key)) logger.info('classify_metrics :: a long_term_processing_key was found (%s) - last_analyzed_timestamp: %s is older than 2 weeks setting checking_long_term_for_level_shift on %s' % ( str(metric_being_processed), str(last_analyzed_timestamp), str(base_name))) checking_long_term_for_level_shift = True except: pass if base_name in known_level_shift_metrics and checking_long_term_for_level_shift: logger.info('classify_metrics :: not processing known level shift metric - %s' % ( str(base_name))) checking_long_term_for_level_shift = False if skip_level_shift: checking_long_term_for_level_shift = False if base_name in continue_processing_metrics: logger.info('classify_metrics :: continuing long term processing from %s because in continue_processing_metrics - %s' % ( str(last_analyzed_timestamp), str(base_name))) checking_long_term_for_level_shift = True long_term_timeseries = [] if check_long_term_for_level_shift and checking_long_term_for_level_shift: try: long_term_calculate_until = calculate_until - (86400 * 7) long_term_period = int(86400 * 90) if base_name in continue_processing_metrics and last_analyzed_timestamp: long_term_period = int(last_analyzed_timestamp) long_term_from_timestamp = int(long_term_calculate_until) - long_term_period # Continue from where it left off if last_analyzed_timestamp: if last_analyzed_timestamp > long_term_from_timestamp: long_term_from_timestamp = last_analyzed_timestamp + 1 logger.info('classify_metrics :: check_long_term_for_level_shift - long_term_from_timestamp set to last_analyzed_timestamp value + 1 for %s' % ( base_name)) long_term_until_timestamp = int(long_term_calculate_until) logger.info('classify_metrics :: check_long_term_for_level_shift - calculated long_term_from_timestamp: %s, long_term_until_timestamp: %s for %s' % ( str(long_term_from_timestamp), str(long_term_until_timestamp), base_name)) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: check_long_term_for_level_shift - calculating long_term_from_timestamp and until_timestamp for %s' % base_name) time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break try: long_term_timeseries = get_graphite_metric( skyline_app, str(base_name), long_term_from_timestamp, long_term_until_timestamp, 'list', 'object') if len(long_term_timeseries) > 0: if long_term_timeseries[0][0] < (time_now - (86400 * 7)): checking_long_term_for_level_shift = True logger.info('classify_metrics :: checking_long_term_for_level_shift - %s datapoints retrieved for processing %s' % ( str(len(long_term_timeseries)), str(base_name))) else: logger.info('classify_metrics :: checking_long_term_for_level_shift - %s datapoints retrieved for processing %s - nothing to do setting %s' % ( str(len(long_term_timeseries)), str(base_name), last_classified_hash_key)) try: redis_conn.hset(last_classified_hash_key, base_name, int(long_term_until_timestamp)) logger.info('classify_metrics :: checking_long_term_for_level_shift - %s set timestamp in %s to long_term_until_timestamp - %s' % ( str(base_name), str(last_classified_hash_key), str(long_term_until_timestamp))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: checking_long_term_for_level_shift - failed to set timestamp for %s in Redis hash key - %s' % ( base_name, last_classified_hash_key)) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: checking_long_term_for_level_shift - getting Graphite data for %s with long_term_from_timestamp: %s, long_term_until_timestamp: %s' % ( base_name, str(long_term_from_timestamp), str(long_term_until_timestamp))) # level_shift_ad custom_algorithm = 'adtk_level_shift' custom_algorithm_dict = {} custom_algorithm_dict['debug_logging'] = False debug_algorithm_logging = False if debug_algorithms: custom_algorithm_dict['debug_logging'] = True debug_algorithm_logging = True custom_algorithm_dict['debug_logging'] = True debug_algorithm_logging = True custom_algorithm_dict['algorithm_source'] = '/opt/skyline/github/skyline/skyline/custom_algorithms/adtk_level_shift.py' custom_algorithm_dict['algorithm_parameters'] = { 'window': 10, 'c': 9.9, 'return_anomalies': True, 'realtime_analysis': False, 'save_plots_to': save_plots_to, 'debug_logging': debug_algorithm_logging, 'run_PersistAD': True, 'persist_ad_algorithm_parameters': {'c': 9.9, 'window': 60} } custom_algorithm_dict['max_execution_time'] = 5.0 result = None anomalyScore = None anomalies = None timeseries_period_end = None if checking_long_term_for_level_shift and len(long_term_timeseries) > 0: timeseries_start_timestamp = long_term_timeseries[0][0] timeseries_end_timestamp = long_term_timeseries[-1][0] timeseries_period_start = timeseries_start_timestamp timeseries_period_end = timeseries_start_timestamp + use_full_duration periods_processed = 0 periods_to_process = int((timeseries_end_timestamp - timeseries_start_timestamp) / (86400 * 7)) if not periods_to_process: logger.error('error :: classify_metrics :: checking_long_term_for_level_shift - no periods_to_process: %s' % str(periods_to_process)) else: logger.info('classify_metrics :: checking_long_term_for_level_shift - periods_to_process: %s - %s' % ( str(periods_to_process), base_name)) significant_level_shifts_detected = 0 time_now = int(time()) runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break start_long_term_timer = timer() while timeseries_period_end < timeseries_end_timestamp: periods_processed += 1 working_timeseries = [item for item in long_term_timeseries if item[0] >= timeseries_period_start and item[0] <= timeseries_period_end] if not working_timeseries: timeseries_period_start = timeseries_period_end + 1 timeseries_period_end = timeseries_period_end + use_full_duration continue time_now = int(time()) runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break # Add to continue_processing_hash_key last_working_timestamp = int(working_timeseries[-1][0]) try: redis_conn.hset(continue_processing_hash_key, base_name, last_working_timestamp) logger.info('classify_metrics :: checking_long_term_for_level_shift - adding %s to %s with %s' % ( str(base_name), continue_processing_hash_key, str(last_working_timestamp))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to get Redis hash key - %s' % continue_processing_hash_key) result = None anomalyScore = None anomalies = None try: result, anomalyScore, anomalies = run_custom_algorithm_on_timeseries(skyline_app, current_pid, base_name, working_timeseries, custom_algorithm, custom_algorithm_dict, debug_algorithms) if debug_algorithms: logger.debug('debug :: classify_metrics :: checking_long_term_for_level_shift - run_custom_algorithm_on_timeseries run %s with result - %s, anomalyScore - %s' % ( custom_algorithm, str(result), str(anomalyScore))) except: if debug_algorithms: logger.error(traceback.format_exc()) logger.error('error :: algorithms :: checking_long_term_for_level_shift - failed to run custom_algorithm %s on %s' % ( custom_algorithm, base_name)) if not anomalies: logger.info('classify_metrics :: checking_long_term_for_level_shift - %s level_shift_ad anomalies were detected on %s, result: %s, anomalyScore: %s' % ( str(anomalies), str(base_name), str(result), str(anomalyScore))) timeseries_period_start = working_timeseries[-1][0] + 1 timeseries_period_end = working_timeseries[-1][0] + use_full_duration if anomalies: significant_level_shift = check_significant_level_shift(working_timeseries, base_name, anomalies, 3, save_plots_to) if significant_level_shift: significant_level_shifts_detected += 1 if not metric_already_classified: metrics_classified += 1 metric_already_classified = True logger.info('classify_metrics :: checking_long_term_for_level_shift - %s significant level shift detected' % ( str(base_name))) try: metric_dir = base_name.replace('.', '/') timestamp_dir = str(last_working_timestamp) significant_file = '%s/%s/%s/%s/significant.txt' % ( save_plots_to, custom_algorithm, metric_dir, timestamp_dir) write_data_to_file(skyline_app, significant_file, 'w', str(True)) logger.info('created significant file - %s' % significant_file) except: logger.error(traceback.format_exc()) logger.error('error :: failed to save significant file') logger.info('classify_metrics :: checking_long_term_for_level_shift - %s level_shift_ad anomalies were detected on %s, result: %s, anomalyScore: %s, significant: %s' % ( str(len(anomalies)), str(base_name), str(result), str(anomalyScore), str(significant_level_shift))) try: redis_conn.hset(last_classified_hash_key, base_name, last_working_timestamp) logger.info('classify_metrics :: checking_long_term_for_level_shift - %s set timestamp in %s to %s for period %s' % ( str(base_name), str(last_classified_hash_key), str(last_working_timestamp), str(periods_processed))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: checking_long_term_for_level_shift - failed to set timestamp for %s in Redis hash key - %s' % ( base_name, last_classified_hash_key)) logger.info('classify_metrics :: checking_long_term_for_level_shift - processed %s of %s periods to process - %s' % ( str(periods_processed), str(periods_to_process), base_name)) end_long_term_timer = timer() if periods_processed == periods_to_process: try: redis_conn.hdel(continue_processing_hash_key, base_name) logger.info('classify_metrics :: checking_long_term_for_level_shift - removing %s from %s processed all %s of %s long term periods' % ( str(base_name), continue_processing_hash_key, str(periods_processed), str(periods_to_process))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove %s from Redis hash key - %s' % ( base_name, continue_processing_hash_key)) logger.info('classify_metrics :: checking_long_term_for_level_shift - COMPLETE - periods_processed: %s, for %s with %s significant level shifts detected in %.6f seconds' % ( str(periods_processed), base_name, str(significant_level_shifts_detected), (end_long_term_timer - start_long_term_timer))) else: logger.info('classify_metrics :: checking_long_term_for_level_shift - completed processing %s of %s periods_to_process, for %s with %s significant level shifts detected in %.6f seconds' % ( str(periods_processed), str(periods_to_process), base_name, str(significant_level_shifts_detected), (end_long_term_timer - start_long_term_timer))) time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break get_timeseries_for_metric = False if base_name not in known_level_shift_metrics: get_timeseries_for_metric = True else: logger.info('classify_metrics :: known level shift metric - %s' % ( str(base_name))) if LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT and skip_level_shift: get_timeseries_for_metric = False if LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT: if base_name not in known_volatility_shift_metrics: get_timeseries_for_metric = True else: logger.info('classify_metrics :: known volatility shift metric - %s' % ( str(base_name))) if not get_timeseries_for_metric: try: redis_conn.hset(last_classified_hash_key, base_name, int(time_now)) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - %s' % ( base_name, last_classified_hash_key)) # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') continue time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break timeseries = [] if get_timeseries_for_metric: try: timeseries = get_graphite_metric( skyline_app, base_name, from_timestamp, until_timestamp, 'list', 'object') logger.info('classify_metrics :: timeseries retieved for - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: getting Graphite data for %s with use_full_duration: %s, from_timestamp: %s, until_timestamp: %s' % ( base_name, str(use_full_duration), str(from_timestamp), str(until_timestamp))) else: if debug_algorithms: logger.debug('classify_metrics :: not processing, known - %s' % ( str(base_name))) time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break if not timeseries: try: redis_conn.hset(last_classified_hash_key, str(base_name), int(time_now)) if not skip_level_shift: logger.info('classify_metrics :: no timeseries data for %s, set timestamp to now %s in %s' % ( str(base_name), str(int(time_now)), str(last_classified_hash_key))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - %s' % ( base_name, last_classified_hash_key)) # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') continue logger.info('classify_metrics :: timeseries has %s datapoints for processing %s' % ( str(len(timeseries)), str(base_name))) time_now = int(time()) runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break if LUMINOSITY_CLASSIFY_METRICS_LEVEL_SHIFT: if skip_level_shift: # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') continue try: result, anomalyScore, anomalies = run_custom_algorithm_on_timeseries(skyline_app, current_pid, base_name, timeseries, custom_algorithm, custom_algorithm_dict, debug_algorithms) if debug_algorithms: logger.debug('debug :: classify_metrics :: run_custom_algorithm_on_timeseries run %s with result - %s, anomalyScore - %s' % ( custom_algorithm, str(result), str(anomalyScore))) except: if debug_algorithms: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to run custom_algorithm %s on %s' % ( custom_algorithm, base_name)) if not anomalies: logger.info('classify_metrics :: %s level_shift_ad anomalies were detected on %s, result: %s, anomalyScore: %s' % ( str(anomalies), str(base_name), str(result), str(anomalyScore))) try: redis_conn.hset(last_classified_hash_key, base_name, int(until_timestamp)) logger.info('classify_metrics :: timeseries processed for %s, set timestamp to now %s in %s' % ( str(base_name), str(int(until_timestamp)), str(last_classified_hash_key))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - %s' % ( base_name, last_classified_hash_key)) time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break metric_resolution = False if anomalies: significant_level_shift = check_significant_level_shift(timeseries, base_name, anomalies, custom_algorithm_dict['algorithm_parameters']['window'], save_plots_to) if significant_level_shift: if not metric_already_classified: metrics_classified += 1 metric_already_classified = True try: metric_dir = base_name.replace('.', '/') timestamp_dir = str(int(timeseries[-1][0])) significant_file = '%s/%s/%s/%s/significant.txt' % ( save_plots_to, custom_algorithm, metric_dir, timestamp_dir) write_data_to_file(skyline_app, significant_file, 'w', str(True)) logger.info('classify_metrics :: created significant file - %s' % significant_file) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to save significant file') time_now = time() runtime = time_now - start_timestamp if runtime >= (classify_for - 1): logger.info('classify_metrics :: stopping before timeout is reached') # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') break # volatility_shift_ad if LUMINOSITY_CLASSIFY_METRICS_VOLATILITY_SHIFT: custom_algorithm = 'adtk_volatility_shift' custom_algorithm_dict = {} custom_algorithm_dict['debug_logging'] = False debug_algorithm_logging = False if debug_algorithms: custom_algorithm_dict['debug_logging'] = True debug_algorithm_logging = True custom_algorithm_dict['debug_logging'] = True debug_algorithm_logging = True custom_algorithm_dict['algorithm_source'] = '/opt/skyline/github/skyline/skyline/custom_algorithms/adtk_volatility_shift.py' custom_algorithm_dict['algorithm_parameters'] = {'window': 5, 'c': 9.9, 'return_anomalies': True, 'realtime_analysis': False, 'debug_logging': debug_algorithm_logging} custom_algorithm_dict['max_execution_time'] = 0.5 result = None anomalyScore = None anomalies = None try: result, anomalyScore, anomalies = run_custom_algorithm_on_timeseries(skyline_app, current_pid, base_name, timeseries, custom_algorithm, custom_algorithm_dict, debug_algorithms) if debug_algorithms: logger.debug('debug :: classify_metrics :: run_custom_algorithm_on_timeseries run %s with result - %s, anomalyScore - %s' % ( custom_algorithm, str(result), str(anomalyScore))) except: if debug_algorithms: logger.error(traceback.format_exc()) logger.error('error :: algorithms :: failed to run custom_algorithm %s on %s' % ( custom_algorithm, base_name)) if not anomalies: logger.info('classify_metrics :: %s volatility_shift_ad anomalies were detected on %s, result: %s, anomalyScore: %s' % ( str(anomalies), str(base_name), str(result), str(anomalyScore))) if anomalies: # Determine resolution from the last 30 data points # INFO took 0.002060 seconds if not metric_resolution: resolution_timestamps = [] metric_resolution = False for metric_datapoint in timeseries[-30:]: timestamp = int(metric_datapoint[0]) resolution_timestamps.append(timestamp) timestamp_resolutions = [] if resolution_timestamps: last_timestamp = None for timestamp in resolution_timestamps: if last_timestamp: resolution = timestamp - last_timestamp timestamp_resolutions.append(resolution) last_timestamp = timestamp else: last_timestamp = timestamp try: del resolution_timestamps except: pass if timestamp_resolutions: try: timestamp_resolutions_count = Counter(timestamp_resolutions) ordered_timestamp_resolutions_count = timestamp_resolutions_count.most_common() metric_resolution = int(ordered_timestamp_resolutions_count[0][0]) except: pass try: del timestamp_resolutions except: pass continuous_periods = {} last_timestamp = None for ts, value in anomalies: if not last_timestamp: continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] last_timestamp = ts last_period = ts continue if (ts - metric_resolution) == last_timestamp: continuous_periods_timestamps = continuous_periods[last_period]['timestamps'] continuous_periods_timestamps.append(ts) last_timestamp = ts continuous_periods[last_period]['timestamps'] = continuous_periods_timestamps else: continuous_periods[ts] = {} continuous_periods[ts]['timestamps'] = [ts] last_timestamp = ts last_period = ts logger.info('classify_metrics :: %s volatility_shift_ad anomalies detected spanning %s continuous_periods on %s, result: %s, anomalyScore: %s' % ( str(len(anomalies)), str(len(continuous_periods)), str(base_name), str(result), str(anomalyScore))) try: int_metric_timestamp = int(timeseries[-1][0]) except: int_metric_timestamp = 0 if len(continuous_periods) <= 2: logger.info('classify_metrics :: volatility shift metric - %s' % base_name) triggered_timestamp = None for timestamp in continuous_periods: triggered_timestamp = int(timestamp) logger.info('classify_metrics :: volatility shift metric - %s at %s' % (base_name, str(triggered_timestamp))) if triggered_timestamp: try: redis_conn.hset('luminosity.classify_metrics.volatility_shift', base_name, triggered_timestamp) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - luminosity.classify_metrics.volatility_shift' % ( base_name)) if not metric_already_classified: metrics_classified += 1 metric_already_classified = True else: logger.info('classify_metrics :: NOT volatility shift metric - %s' % base_name) try: int_metric_timestamp = int(timeseries[-1][0]) except: int_metric_timestamp = int(time_now) if last_analyzed_timestamp: if int_metric_timestamp == last_analyzed_timestamp: int_metric_timestamp = int(time_now) if last_analyzed_timestamp: if last_analyzed_timestamp > int_metric_timestamp: int_metric_timestamp = int(time_now) if check_new_metric_for_long_term_for_level_shift and timeseries: if int_metric_timestamp < (time_now - 3600): int_metric_timestamp = int(time_now) if check_long_term_for_level_shift and checking_long_term_for_level_shift and get_timeseries_for_metric: if int_metric_timestamp < (time_now - 3600): int_metric_timestamp = int(time_now) # update_hash_key = False # if int_metric_timestamp and not checked_long_term_for_level_shift: # update_hash_key = True # if update_hash_key: if int_metric_timestamp: try: redis_conn.hset(last_classified_hash_key, base_name, int_metric_timestamp) logger.info('classify_metrics :: analysis of complete, set timestamp to %s in %s for %s' % ( str(int_metric_timestamp), str(last_classified_hash_key), str(base_name))) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to set timestamp for %s in Redis hash key - %s' % ( base_name, last_classified_hash_key)) metrics_proceessed += 1 # Handle luminosity running with multiple processes manage_metric = False try: manage_metric = manage_processing_key(current_pid, base_name, classify_for, 'remove') if manage_metric: logger.info('classify_metrics :: removed manage_processing_key - %s' % base_name) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to remove manage_processing_key') try: data = [int(start_timestamp), metrics_proceessed] redis_conn.sadd('luminosity.classify_metrics.proceessed', str(data)) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to entry to Redis set luminosity.classify_metrics.proceessed') try: data = [int(start_timestamp), metrics_classified] redis_conn.sadd('luminosity.classify_metrics.classified', str(data)) except: logger.error(traceback.format_exc()) logger.error('error :: classify_metrics :: failed to entry to Redis set luminosity.classify_metrics.classified') end_classify_metrics = timer() logger.info('classify_metrics :: %s metrics were classified' % str(metrics_classified)) if metrics_in_ttl: logger.info('classify_metrics :: %s metrics are still in TTL' % str(metrics_in_ttl)) if LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES: logger.info('classify_metrics :: %s metrics were skipped as in LUMINOSITY_LEVEL_SHIFT_SKIP_NAMESPACES' % str(level_shifted_skipped_metrics)) logger.info('classify_metrics :: %s metrics were processed to classify of the total of %s metrics in the allowed time of %s seconds, took %.6f seconds' % ( str(metrics_proceessed), str(len(metrics_to_analyse)), str(classify_for), (end_classify_metrics - start_classify_metrics))) return