"""
analyzer_labelled_metrics.py
"""
import logging
try:
from Queue import Empty
except:
from queue import Empty
# from redis import StrictRedis
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list to reduce memory and number of
# processes
# from multiprocessing import Process, Manager, Queue
from multiprocessing import Process, Queue
import os
from os import kill, getpid
from signal import SIGKILL
from math import ceil
import traceback
from sys import exit as sys_exit
import resource
from ast import literal_eval
from random import shuffle
# @added 20220722 - Task #4624: Change all dict copy to deepcopy
import copy
import numpy as np
import pandas as pd
import memray
from prometheus_client.parser import _parse_labels as parse_labels
import settings
from skyline_functions import (
mkdir_p, nonNegativeDerivative, get_redis_conn, get_redis_conn_decoded,
write_data_to_file)
# @added 20200425 - Feature #3512: matched_or_regexed_in_list function
# Feature #3508: ionosphere.untrainable_metrics
# Feature #3486: analyzer_batch
from matched_or_regexed_in_list import matched_or_regexed_in_list
from alerters import trigger_alert
from algorithms import run_selected_algorithm
from algorithm_exceptions import TooShort, Stale, Boring, EmptyTimeseries
from functions.illuminance.add_illuminance_entries import add_illuminance_entries
# from functions.prometheus.metric_name_labels_parser import metric_name_labels_parser
from functions.timeseries.downsample import downsample_timeseries
from functions.timeseries.is_stationary import is_stationary
from functions.graphite.send_graphite_metric import send_graphite_metric
from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
from functions.timeseries.strictly_increasing_monotonicity import strictly_increasing_monotonicity
# @added 20230329 - Feature #4882: labelled_metrics - resolution and data sparsity
# Feature #3870: metrics_manager - check_data_sparsity
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.timeseries.determine_data_sparsity import determine_data_sparsity
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
from functions.analyzer.get_tenant_id_mrange_split import get_tenant_id_mrange_split
send_algorithm_run_metrics = False
skyline_app = 'analyzer'
skyline_app_thunder_key = 'analyzer_labelled_metrics'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
this_host = str(os.uname()[1])
try:
ANALYZER_LABELLED_METRICS_PROCESSES = settings.ANALYZER_LABELLED_METRICS_PROCESSES
except:
ANALYZER_LABELLED_METRICS_PROCESSES = 2
try:
MAX_ANALYZER_LABELLED_METRICS_PROCESS_RUNTIME = settings.MAX_ANALYZER_LABELLED_METRICS_PROCESS_RUNTIME
except:
MAX_ANALYZER_LABELLED_METRICS_PROCESS_RUNTIME = 180
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
ANALYZER_ENABLED = settings.ANALYZER_ENABLED
logger.info('ANALYZER_ENABLED is set to %s' % str(ANALYZER_ENABLED))
except:
ANALYZER_ENABLED = True
logger.info('warning :: ANALYZER_ENABLED is not declared in settings.py, defaults to True')
try:
# @modified 20220722 - Task #4624: Change all dict copy to deepcopy
# CUSTOM_ALGORITHMS = settings.CUSTOM_ALGORITHMS.copy()
CUSTOM_ALGORITHMS = copy.deepcopy(settings.CUSTOM_ALGORITHMS)
except:
CUSTOM_ALGORITHMS = None
try:
DEBUG_CUSTOM_ALGORITHMS = settings.DEBUG_CUSTOM_ALGORITHMS
except:
DEBUG_CUSTOM_ALGORITHMS = False
try:
VERBOSE_LOGGING = settings.ANALYZER_VERBOSE_LOGGING
except:
VERBOSE_LOGGING = False
# @added 20210513 - Feature #4068: ANALYZER_SKIP
try:
ANALYZER_SKIP = list(settings.ANALYZER_SKIP)
except:
ANALYZER_SKIP = []
# @added 20210519 - Feature #4076: CUSTOM_STALE_PERIOD
custom_stale_metrics_hash_key = 'analyzer.metrics_manager.custom_stale_periods'
try:
# @modified 20220722 - Task #4624: Change all dict copy to deepcopy
# CUSTOM_STALE_PERIOD = settings.CUSTOM_STALE_PERIOD.copy()
CUSTOM_STALE_PERIOD = copy.deepcopy(settings.CUSTOM_STALE_PERIOD)
except:
CUSTOM_STALE_PERIOD = {}
# @added 20230404 - Feature #4888: analyzer - load_shedding
LOAD_SHEDDING_ENABLED = True
try:
LOAD_SHEDDING_ENABLED = settings.LOAD_SHEDDING_ENABLED
except:
LOAD_SHEDDING_ENABLED = True
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
LOCAL_DEBUG = False
# @added 20191107 - Branch #3262: py3
alert_test_file = '%s/%s_alert_test.txt' % (settings.SKYLINE_TMP_DIR, skyline_app)
# In Skyline a metric is either a counter (derivative) or a gauge
skyline_metric_types = {'COUNTER': 1, 'GAUGE': 0}
skyline_metric_types_by_id = {}
for key in list(skyline_metric_types.keys()):
skyline_metric_types_by_id[skyline_metric_types[key]] = key
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_hash_key = 'analyzer_labelled_metrics.metrics.last_analysis_timestamp'
[docs]class AnalyzerLabelledMetrics(Thread):
"""
The AnalyzerLabelledMetrics class which controls the analyzer_labelled_metrics
thread and spawned processes.
"""
def __init__(self, parent_pid):
"""
Initialize the AnalyzerLabelledMetrics
Create the :obj:`self.exceptions_q` queue
Create the :obj:`self.anomaly_breakdown_q` queue
"""
# super(AnalyzerLabelledMetrics, self).__init__()
super().__init__()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.exceptions_q = Queue()
self.anomaly_breakdown_q = Queue()
self.stats_q = Queue()
self.new_metrics_q = Queue()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
logger.warning('warning :: parent or current process dead')
sys_exit(0)
[docs] def spawn_alerter_process(self, alert, metric, context):
"""
Spawn a process to trigger an alert.
This is used by smtp alerters so that matplotlib objects are cleared
down and the alerter cannot create a memory leak in this manner and
plt.savefig keeps the object in memory until the process terminates.
Seeing as data is being surfaced and processed in the alert_smtp
context, multiprocessing the alert creation and handling prevents any
memory leaks in the parent.
Added 20160814 relating to:
* Bug #1558: Memory leak in Analyzer
* Issue #21 Memory leak in Analyzer see https://github.com/earthgecko/skyline/issues/21
Parameters as per :py:func:`skyline.analyzer.alerters.trigger_alert
<analyzer.alerters.trigger_alert>`
"""
trigger_alert(alert, metric, context)
# Although this could be imported by functions.prometheus.metric_name_labels_parser
# memray highlights that the import method generates 1000s of allocations
# whereas defined locally incurs none.
[docs] def metric_name_labels_parser(self, metric):
"""
Given a Prometheus metric string return a dict of the metric name and labels.
:param metric: the prometheus metric
:type metric: str
:return: metric_dict
:rtype: dict
"""
metric_dict = {}
metric_name = None
try:
metric_elements = metric.split('{', 1)
metric_name = metric_elements[0]
metric_labels_str = metric_elements[1]
metric_labels = metric_labels_str.rstrip('}')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to parse metric %s - %s' % (
str(metric), str(err)))
return metric_dict
labels = {}
try:
labels = parse_labels(metric_labels)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to parse labels %s - %s' % (
str(metric), str(err)))
return metric_dict
metric_dict['metric'] = metric_name
metric_dict['labels'] = labels
return metric_dict
# @modified 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
[docs] def labelled_metrics_spin_process(self, i_process, assigned_metrics_dict, filters=None):
"""
Assign a bunch of metrics for a process to analyze.
Multiple get the assigned_metrics to the process from Redis.
For each metric:
- unpack the `raw_timeseries` for the metric.
- Analyse each timeseries against `ALGORITHMS` to determine if it is
anomalous.
- If anomalous add it to the Redis set analyzer.anomalous_metrics
- Add what algorithms triggered to the :obj:`self.anomaly_breakdown_q`
queue
- If :mod:`settings.ENABLE_CRUCIBLE` is ``True``:
- Add a crucible data file with the details about the timeseries and
anomaly.
- Write the timeseries to a json file for crucible.
Add keys and values to the queue so the parent process can collate for:\n
* :py:obj:`self.anomaly_breakdown_q`
* :py:obj:`self.exceptions_q`
"""
memray_file = '%s/analyzer_labelled_metrics.labelled_metrics_spin_process.%s.bin' % (settings.SKYLINE_TMP_DIR, str(i_process))
memray_file_last = '%s.last' % memray_file
if os.path.isfile(memray_file_last):
os.remove(memray_file_last)
logger.info('labelled_metrics_spin_process :: removed %s' % str(memray_file_last))
if os.path.isfile(memray_file):
os.rename(memray_file, memray_file_last)
logger.info('labelled_metrics_spin_process :: removed %s' % str(memray_file_last))
try:
with memray.Tracker(memray_file):
def median_absolute_deviation(timeseries):
try:
series = pd.Series([x[1] for x in timeseries])
median = series.median()
demedianed = np.abs(series - median)
median_deviation = demedianed.median()
except:
return None
if median_deviation == 0:
return False
try:
test_statistic = demedianed.iat[-1] / median_deviation
except:
return None
if test_statistic > 6:
return True
return False
try:
spin_start = time()
logger.info('labelled_metrics_spin_process :: started process %s' % str(i_process))
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: getting spin_start time failed - %s' % (
err))
spin_start = time()
until_timestamp = int(spin_start)
from_timestamp = until_timestamp - settings.FULL_DURATION
current_aligned_ts = int(until_timestamp // 60 * 60)
metric_airgaps = []
metric_airgaps_filled = []
run_negatives_present = False
check_for_airgaps_only = False
custom_stale_metrics_dict = {}
# @added 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_boring_metrics = []
identified_stale_metrics = []
identified_tooshort_metrics = []
last_reported_up = 0
try:
last_reported_up = self.redis_conn_decoded.get(skyline_app)
if not last_reported_up:
last_reported_up = 0
else:
last_reported_up = int(float(last_reported_up))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: Analyzer could not update the Redis %s key - %s' % (
err))
# Check the unique_labelled_metrics list is valid
assigned_metrics = list(assigned_metrics_dict.keys())
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
filters_str = None
if filters:
filters_str = '_tenant_id=(' + ','.join(filters) + ')'
# @added 20220504 - Feature #2580: illuminance
illuminance_dict = {}
# @added 20220919 - Feature #4676: analyzer - illuminance.all key
illuminance_all_dict = {}
algorithms = {}
try:
algorithms = self.redis_conn_decoded.hgetall('metrics_manager.algorithms.ids')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: hgetall metrics_manager.algorithms.ids - %s' % str(err))
# @added 20210513 - Feature #4068: ANALYZER_SKIP
analyzer_skip_metrics_skipped = 0
analyzer_skip_metrics = []
if ANALYZER_SKIP:
logger.info('labelled_metrics_spin_process :: determining ANALYZER_SKIP metrics from analyzer.metrics_manager.analyzer_skip Redis set')
try:
analyzer_skip_metrics = list(self.redis_conn_decoded.smembers('analyzer.metrics_manager.analyzer_skip'))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to generate a list from analyzer.metrics_manager.analyzer_skip Redis set - %s' % err)
analyzer_skip_metrics = []
if analyzer_skip_metrics:
logger.info('labelled_metrics_spin_process :: removing %s ANALYZER_SKIP metrics from the %s assigned_metrics' % (
str(len(analyzer_skip_metrics)), str(assigned_metrics)))
unique_labelled_metrics = list(set(unique_labelled_metrics) - set(analyzer_skip_metrics))
analyzer_skip_metrics_skipped = len(set(analyzer_skip_metrics))
else:
logger.info('labelled_metrics_spin_process :: did not determine any ANALYZER_SKIP metrics from from analyzer.metrics_manager.analyzer_skip Redis set, will check dynamically')
# @added 20190410 - Feature #2916: ANALYZER_ENABLED setting
if not ANALYZER_ENABLED:
# len_assigned_metrics = len(assigned_metrics)
logger.info('labelled_metrics_spin_process :: ANALYZER_ENABLED is set to %s removing the %s assigned_metrics' % (
str(ANALYZER_ENABLED), str(len(assigned_metrics))))
assigned_metrics = []
del unique_labelled_metrics
# Check if this process is unnecessary
if len(assigned_metrics) == 0:
logger.info('labelled_metrics_spin_process :: 0 assigned metrics, nothing to do')
return
run_selected_algorithm_count = 0
# Make process-specific dicts
exceptions = defaultdict(int)
anomaly_breakdown = defaultdict(int)
# Surface the last_timeseries_timestamp Redis hash key so that the
# timestamps can be compared as Thunder stale_metrics requires all
# timestamps for all metrics
metrics_last_timeseries_timestamp_update_dict = {}
metrics_last_timeseries_timestamp_hash_key = 'analyzer_labelled_metrics.last_timeseries_timestamp'
metrics_stationary_update_dict = {}
metrics_stationary_hash_key = 'analyzer_labelled_metrics.stationary_metrics'
logger.info('labelled_metrics_spin_process :: checking %s assigned_metrics' % str(len(assigned_metrics)))
metric_type_redis_keys = {}
tenant_ids = []
metrics_checked = 0
metrics_analysed = 0
metrics_skipped = 0
mad_analysed = 0
mad_anomalies = 0
anomalies = 0
no_anomalies = 0
stationary = 0
not_stationary = 0
stationary_not_expired = 0
redis_timings = []
redis_full_duration_timings = []
stationary_timings = []
mad_timings = []
downsample_timings = []
threesigma_timings = []
last_analysed_timeseries = []
last_analysed_metric = None
mad_anomalous_dict = {}
threesigma_anomalous_dict = {}
stale_metrics = {}
empty_metrics = []
no_new_data_metrics = {}
tooshort_metrics = {}
metrics_dict = {}
not_anomalous_list = []
empty_timeseries_list = []
boring_dict = {}
used_metric_id_keys = 0
metric_types_known = {
'COUNTER': {'known': 0, 'looked_up': 0},
'GAUGE': {'known': 0, 'looked_up': 0},
'SUMMARY': {'known': 0, 'looked_up': 0},
'HISTOGRAM': {'known': 0, 'looked_up': 0},
'UNKNOWN': {'known': 0, 'looked_up': 0},
}
metric_subtypes_added = {
'COUNTER': {'COUNTER': 0, 'GAUGE': 0},
'GAUGE': {'COUNTER': 0, 'GAUGE': 0},
'SUMMARY': {'COUNTER': 0, 'GAUGE': 0},
'HISTOGRAM': {'COUNTER': 0, 'GAUGE': 0},
'UNKNOWN': {'COUNTER': 0, 'GAUGE': 0},
}
metric_subtypes_queried = {
'COUNTER': {'COUNTER': 0, 'GAUGE': 0},
'GAUGE': {'COUNTER': 0, 'GAUGE': 0},
'SUMMARY': {'COUNTER': 0, 'GAUGE': 0},
'HISTOGRAM': {'COUNTER': 0, 'GAUGE': 0},
'UNKNOWN': {'COUNTER': 0, 'GAUGE': 0},
}
monotonicity_not_expired = 0
monotonicity_checked = 0
monotonicity_check_per_minute = int(len(assigned_metrics) / (60 * 4))
monotonicity_changed = 0
metrics_type_dict = {}
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
redis_set_errors = []
# @added 20221229 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# Fetch the data once because with 1000s of new metrics added it
# becomes more efficient
skyline_labelled_metrics_id_types = {}
try:
skyline_labelled_metrics_id_types = self.redis_conn_decoded.hgetall('skyline.labelled_metrics.id.type')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to hgetall skyline.labelled_metrics.id.type - %s' % err)
# @added 20230518 - metric_type.longterm_expire
skyline_labelled_metrics_id_types_longterm_expire = {}
try:
skyline_labelled_metrics_id_types_longterm_expire = self.redis_conn_decoded.hgetall('skyline.labelled_metrics.id.type.longterm_expire')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to hgetall skyline.labelled_metrics.id.type.longterm_expire - %s' % err)
# @added 20230329 - Feature #4882: labelled_metrics - resolution and data sparsity
# Feature #3870: metrics_manager - check_data_sparsity
resolution_and_sparsity_timings = []
labelled_metrics_resolutions = {}
labelled_metrics_sparsity = {}
labelled_metrics_resolution_sparsity_last_checked_dict = {}
labelled_metrics_resolution_sparsity_checked_dict = {}
labelled_metrics_resolution_sparsity_recently_checked = 0
labelled_metrics_resolution_sparsity_last_checked_hash_key = 'labelled_metrics.resolution_sparsity_last_checked'
try:
labelled_metrics_resolution_sparsity_last_checked_dict = self.redis_conn_decoded.hgetall(labelled_metrics_resolution_sparsity_last_checked_hash_key)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to hgetall %s - %s' % (
labelled_metrics_resolution_sparsity_last_checked_hash_key, err))
labelled_metrics_resolution_sparsity_last_checked_dict = {}
# @added 20230404 - Feature #4888: analyzer - load_shedding
analyzer_last_run_time = 0
analyzer_last_run_time_timestamp = int(spin_start)
load_shedding_active = False
if LOAD_SHEDDING_ENABLED:
logger.info('labelled_metrics_spin_process :: load_shedding_active - checking')
try:
analyzer_last_run_time = float(self.redis_conn_decoded.hget('analyzer_labelled_metrics.run_time', 'value'))
analyzer_last_run_time_timestamp = int(self.redis_conn_decoded.hget('analyzer_labelled_metrics.run_time', 'timestamp'))
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: load_shedding_active failed to get keys from analyzer.run_time Redis hash - %s' % err)
analyzer_last_run_time = 0
analyzer_last_run_time_timestamp = int(spin_start)
if analyzer_last_run_time >= (settings.MAX_ANALYZER_PROCESS_RUNTIME - 5):
load_shedding_active = True
logger.info('labelled_metrics_spin_process :: load_shedding_active set to True because analyzer_last_run_time: %s' % str(analyzer_last_run_time))
if analyzer_last_run_time_timestamp <= (int(spin_start) - (settings.MAX_ANALYZER_PROCESS_RUNTIME + 30)):
load_shedding_active = True
logger.info('labelled_metrics_spin_process :: load_shedding_active set to True because analyzer_last_run_time_timestamp is older than % seconds' % (
str(analyzer_last_run_time_timestamp)))
logger.info('labelled_metrics_spin_process :: load_shedding_active: %s' % str(load_shedding_active))
last_analysed_ordered_metrics = []
load_shedding_assigned_metrics = []
metrics_last_analysis_dict = {}
# If the load_shedding hash exist keep updating it until it expires and
# does not exist. If it exists but load_shedding_active is not True the
# hash expiry does not get updated it only gets updated if
# load_shedding_active is True. This is so that if load_shedding becomes
# active on 1 run, but is not active on the next run the hash last
# analysis times are still updated for another 4 minutes so that if
# load_shedding becomes active in the next 4 minutes, it is not starting
# from scratch and will have data to do an ordered analysis on.
load_shedding_hash_exists = False
last_metrics_last_analysis_dict = {}
try:
load_shedding_hash_exists = self.redis_conn_decoded.exists(metrics_last_analysis_hash_key)
except Exception as err:
logger.error('error :: load_shedding_active failed to hgetall %s from Redis hash - %s' % (metrics_last_analysis_hash_key, err))
if load_shedding_active or load_shedding_hash_exists:
start_load_shedding_get = time()
log_str = 'load shedding hash exists'
if load_shedding_active:
log_str = 'load_shedding_active'
try:
last_metrics_last_analysis_dict = self.redis_conn_decoded.hgetall(metrics_last_analysis_hash_key)
logger.info('labelled_metrics_spin_process :: %s got %s metrics last analysed timestamps from %s to reorder %s assigned_metrics' % (
log_str, str(len(last_metrics_last_analysis_dict)), metrics_last_analysis_hash_key,
str(len(assigned_metrics))))
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: load_shedding_active failed to hgetall %s from Redis hash - %s' % (metrics_last_analysis_hash_key, err))
if load_shedding_active:
start_load_shedding_get = time()
assigned_labelled_metrics = []
assigned_labelled_metrics_dict = {}
for base_name in assigned_metrics:
metric_id = assigned_metrics_dict[base_name]['id']
labelled_metric_name = 'labelled_metrics.%s' % str(metric_id)
assigned_labelled_metrics.append(labelled_metric_name)
assigned_labelled_metrics_dict[labelled_metric_name] = base_name
if metrics_last_analysis_dict:
metrics_last_analyzed_timestamp = [[int(timestamp_str), metric] for metric, timestamp_str in metrics_last_analysis_dict.items()]
metrics_last_analyzed_timestamp_sorted = sorted(metrics_last_analyzed_timestamp, key=lambda x: x[0])
del metrics_last_analyzed_timestamp
del metrics_last_analysis_dict
metrics_last_analysis_dict = {}
metrics_last_analyzed_timestamp_sorted = [item for item in metrics_last_analyzed_timestamp_sorted if item[1] in assigned_labelled_metrics]
logger.info('labelled_metrics_spin_process :: load_shedding_active reordered metrics by last analysed timestamp, first metric in list now: %s' % (
str(metrics_last_analyzed_timestamp_sorted[0])))
logger.info('labelled_metrics_spin_process :: load_shedding_active reordered metrics by last analysed timestamp, last metric in list now: %s' % (
str(metrics_last_analyzed_timestamp_sorted[-1])))
last_analysed_ordered_metrics = [labelled_metric for ts, labelled_metric in metrics_last_analyzed_timestamp_sorted]
del metrics_last_analyzed_timestamp_sorted
if last_analysed_ordered_metrics:
load_shedding_assigned_metrics = [labelled_metric for labelled_metric in last_analysed_ordered_metrics if labelled_metric in assigned_labelled_metrics]
logger.info('labelled_metrics_spin_process :: load_shedding_last_analysed_ordered_metrics took %s seconds' % str((time() - start_load_shedding_get)))
if load_shedding_assigned_metrics:
logger.info('labelled_metrics_spin_process :: load_shedding_active replacing %s assigned_metrics with %s metrics ordered by last analysed timestamp' % (
str(len(assigned_labelled_metrics)), str(len(load_shedding_assigned_metrics))))
try:
set_assigned_metrics = set(assigned_labelled_metrics)
set_load_shedding_assigned_metrics = set(load_shedding_assigned_metrics)
not_present_metrics_set = set_assigned_metrics.difference(set_load_shedding_assigned_metrics)
not_present_metrics = list(not_present_metrics_set)
del set_assigned_metrics
del set_load_shedding_assigned_metrics
del not_present_metrics_set
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: load_shedding_active failed determine not_present_metrics - %s' % err)
not_present_metrics = []
if not_present_metrics:
logger.info('labelled_metrics_spin_process :: load_shedding_active appending %s metrics which had no last analysed timestamp' % (
str(len(not_present_metrics))))
load_shedding_assigned_metrics = load_shedding_assigned_metrics + not_present_metrics
del not_present_metrics
assigned_metrics = []
for labelled_metric in load_shedding_assigned_metrics:
try:
assigned_metrics.append(assigned_labelled_metrics_dict[labelled_metric])
except:
errors.append([labelled_metric, 'load_shedding assigned_metrics', str(err)])
del load_shedding_assigned_metrics
logger.info('labelled_metrics_spin_process :: load_shedding_active now %s assigned_metrics' % (
str(len(assigned_metrics))))
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
all_timeseries = []
labelled_metric_in_all_timeseries = []
labelled_metric_and_indices_in_all_timeseries = {}
timeseries_not_present_in_all_timeseries = {}
if filters_str:
filters = [filters_str]
logger.info('labelled_metrics_spin_process :: calling mrange with filters: %s' % str(filters))
r_start = time()
try:
all_timeseries = self.redis_conn_decoded.ts().mrange(((until_timestamp - (180 * 60)) * 1000), (until_timestamp * 1000), filters)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to get Redis timeseries with mrange - %s' % (
err))
errors.append(['all_timeseries', 'ts().mrange', str(err)])
all_timeseries = []
redis_timings.append(time() - r_start)
if all_timeseries:
logger.info('labelled_metrics_spin_process :: mrange returned %s timeseries' % str(len(all_timeseries)))
labelled_metric_in_all_timeseries = [list(tlist.keys())[0] for tlist in all_timeseries]
for index, labelled_metric in enumerate(labelled_metric_in_all_timeseries):
labelled_metric_and_indices_in_all_timeseries[labelled_metric] = index
# @added 20230404 - Feature #4888: analyzer - load_shedding
load_shedding_active_log_stop = False
activating_load_shedding = False
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
feedback_metrics_skipped = []
feedback_labelled_metric_ids = []
analyzer_labelled_metrics_busy = False
if settings.SKYLINE_FEEDBACK_NAMESPACES:
try:
analyzer_labelled_metrics_busy = self.redis_conn_decoded.get('analyzer_labelled_metrics.busy')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to get analyzer_labelled_metrics.busy Redis key' % (
err))
if analyzer_labelled_metrics_busy:
logger.info('labelled_metrics_spin_process :: analyzer_labelled_metrics_busy found')
try:
feedback_labelled_metric_ids = list(self.redis_conn_decoded.smembers('aet.metrics_manager.feedback.labelled_metric_ids'))
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: smembers failed on Redis set aet.metrics_manager.feedback.labelled_metric_ids - %s' % (
err))
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
normal_daily_peak_metrics_skipped = []
current_now = int(time())
normal_daily_peaks_keys = []
for i in sorted(list(range(1, 3)), reverse=True):
key_ts = current_aligned_ts - (60 * i)
key = 'mirage.normal_daily_peak_metrics.%s' % str(key_ts)
normal_daily_peaks_keys.append(key)
normal_daily_peak_metrics_expiry = {}
for normal_daily_peaks_key in normal_daily_peaks_keys:
try:
current_key_data = self.redis_conn_decoded.hgetall(normal_daily_peaks_key)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to hgetall Redis hash %s - %s' % (
str(normal_daily_peaks_key), err))
for labelled_metric in list(current_key_data):
expire_at = 0
try:
expire_at = int(float(current_key_data[labelled_metric]))
except:
expire_at = 0
if expire_at:
if current_now > expire_at:
if labelled_metric in labelled_metric_in_all_timeseries:
try:
self.redis_conn_decoded.hdel(normal_daily_peaks_key, labelled_metric)
except Exception as err:
logger.error('error :: mirage_labelled_metrics :: failed to remove %s from Redis hash %s - %s' % (
labelled_metric, normal_daily_peaks_key, err))
else:
normal_daily_peak_metrics_expiry[labelled_metric] = expire_at
logger.info('labelled_metrics_spin_process :: surfaced %s metrics currently in anomalous_daily_peak expiry' % str(len(normal_daily_peak_metrics_expiry)))
errors = []
for base_name in assigned_metrics:
metrics_dict[base_name] = {}
self.check_if_parent_is_alive()
# @added 20230404 - Feature #4888: analyzer - load_shedding
# If load shedding is active and the process in approaching the
# MAX_ANALYZER_PROCESS_RUNTIME stop
right_now = int(time())
if not activating_load_shedding:
if right_now >= (int(spin_start) + (settings.MAX_ANALYZER_PROCESS_RUNTIME - 5)):
metrics_done = len(assigned_metrics) - len(metrics_dict)
metrics_not_done = len(assigned_metrics) - metrics_done
activating_load_shedding = True
logger.info('labelled_metrics_spin_process :: activating load shedding - approaching MAX_ANALYZER_PROCESS_RUNTIME so stopping, analysed %s metrics of the %s assigned metrics, %s metric not analysed' % (
str(metrics_done), str(len(assigned_metrics)),
str(metrics_not_done)))
if not load_shedding_active_log_stop and load_shedding_active:
logger.info('labelled_metrics_spin_process :: load_shedding_active - approaching MAX_ANALYZER_PROCESS_RUNTIME so stopping, analysed %s metrics of the %s assigned metrics, %s metric not analysed' % (
str(metrics_done), str(len(assigned_metrics)),
str(metrics_not_done)))
load_shedding_active_log_stop = True
if activating_load_shedding:
metric_id = 0
try:
metric_id = assigned_metrics_dict[base_name]['id']
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to get assigned_metrics_dict for %s - %s' % (
str(base_name), err))
errors.append([base_name, 'assigned_metrics_dict', str(err)])
labelled_metric = 'labelled_metrics.%s' % str(metric_id)
if last_metrics_last_analysis_dict:
try:
last_anlysis_timestamp_str = last_metrics_last_analysis_dict[labelled_metric]
if last_anlysis_timestamp_str:
metrics_last_analysis_dict[labelled_metric] = int(last_anlysis_timestamp_str)
except:
metrics_last_analysis_dict[labelled_metric] = int(right_now) - 60
else:
metrics_last_analysis_dict[labelled_metric] = int(right_now) - 60
continue
metrics_checked += 1
if not metrics_checked % 1000:
stats = {
'checked': metrics_checked,
'analysed': metrics_analysed,
'skipped': metrics_skipped,
'mad anomalous': mad_anomalies,
'anomalous': anomalies,
'not anomalous': no_anomalies,
'stationary': stationary,
'not stationary': not_stationary,
'monotonicity_checked': monotonicity_checked,
'monotonicity_changed': monotonicity_changed,
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
'normal_daily_peak_metrics_skipped': len(normal_daily_peak_metrics_skipped),
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
'feedback_metrics_skipped': len(feedback_metrics_skipped),
}
logger.info('labelled_metrics_spin_process :: progress: %s' % str(stats))
# @added 20210513 - Feature #4068: ANALYZER_SKIP
if ANALYZER_SKIP and not analyzer_skip_metrics:
pattern_match, metric_matched_by = matched_or_regexed_in_list('analyzer', base_name, ANALYZER_SKIP)
del metric_matched_by
if pattern_match:
analyzer_skip_metrics_skipped += 1
metrics_skipped += 1
# @added 20230404 - Feature #4888: analyzer - load_shedding
metric_id = assigned_metrics_dict[base_name]['id']
labelled_metric = 'labelled_metrics.%s' % str(metric_id)
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
# @added 20210520 - Branch #1444: thunder
# Added to supplement the ran Report app up if analyzer is just running
# long and over running.
update_analyzer_up_key = False
right_now = int(time())
if not last_reported_up:
update_analyzer_up_key = True
else:
try:
if right_now > (last_reported_up + 60):
update_analyzer_up_key = True
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: could not determine if last_reported_up time is exceeded - %s' % (
err))
if update_analyzer_up_key:
# Report app up
try:
self.redis_conn.setex(skyline_app_thunder_key, 120, right_now)
last_reported_up = right_now
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: could not update the Redis %s key - %s' % (
err))
# labelled_metric = 'labelled_metrics.%s' % str(base_name)
# Use the metrics_dict data to optimise when data should be
# retrieved and analysed
last_ts = 0
try:
metric_id = assigned_metrics_dict[base_name]['id']
last_ts = int(float(assigned_metrics_dict[base_name]['last_ts']))
stationary_status = assigned_metrics_dict[base_name]['stationary']
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to get assigned_metrics_dict for %s - %s' % (
str(base_name), err))
errors.append([base_name, 'assigned_metrics_dict', str(err)])
continue
labelled_metric = 'labelled_metrics.%s' % str(metric_id)
if metrics_checked == 1:
logger.debug('debug :: labelled_metrics_spin_process :: %s, metrics_dict: %s' % (
str(base_name), str(assigned_metrics_dict[base_name])))
labelled_metric_by_id = 'labelled_metrics.%s' % str(metric_id)
redis_ts_key = labelled_metric_by_id
check_id_key = False
if check_id_key:
try:
metrics_id_ts_keys_exists = self.redis_conn_decoded.exists(labelled_metric_by_id)
if metrics_id_ts_keys_exists:
redis_ts_key = labelled_metric_by_id
used_metric_id_keys += 1
except:
pass
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
normal_daily_peak_expiry = 0
try:
normal_daily_peak_expiry = normal_daily_peak_metrics_expiry[labelled_metric]
except:
normal_daily_peak_expiry = 0
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
feedback_metric = False
if analyzer_labelled_metrics_busy:
if str(metric_id) in feedback_labelled_metric_ids:
feedback_metric = True
feedback_metric_expiry = 0
if feedback_metric:
feedback_key = 'mirage_labelled_metrics.feedback.expiry.%s' % str(metric_id)
try:
feedback_metric_expiry = self.redis_conn_decoded.get(feedback_key)
except Exception as err:
errors.append([labelled_metric, feedback_key, str(err)])
feedback_metric_expiry = 0
timeseries = []
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters and all_timeseries
timeseries_not_present = False
if all_timeseries:
try:
all_timeseries_index = labelled_metric_and_indices_in_all_timeseries[labelled_metric]
timeseries = all_timeseries[all_timeseries_index][labelled_metric][1]
except KeyError:
logger.info('labelled_metrics_spin_process :: not present in all_timeseries - %s - %s' % (
str(labelled_metric), base_name))
timeseries_not_present_in_all_timeseries[labelled_metric] = base_name
timeseries_not_present = True
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to get timeseries from all_timeseries for %s - %s' % (
str(labelled_metric), err))
errors.append([labelled_metric, 'all_timeseries', str(err)])
timeseries = []
# @modified 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters and all_timeseries and only query Redis if
# the timeseries is not retrieved from all_timeseries
if not timeseries and not timeseries_not_present:
r_start = time()
try:
# timeseries = self.redis_conn_decoded.ts().range(labelled_metric, (from_timestamp * 1000), (until_timestamp * 1000))
timeseries = self.redis_conn_decoded.ts().range(redis_ts_key, ((until_timestamp - (180 * 60)) * 1000), (until_timestamp * 1000))
except Exception as err:
if str(err) == 'TSDB: the key does not exist':
exceptions['TimeseriesDoesNotExist'] += 1
redis_timings.append(time() - r_start)
empty_timeseries_list.append(labelled_metric)
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
logger.error('error :: labelled_metrics_spin_process :: failed to get Redis timeseries for %s - %s' % (
str(labelled_metric), err))
errors.append([labelled_metric, 'ts().range', str(err)])
timeseries = []
redis_timings.append(time() - r_start)
if not timeseries:
metrics_skipped += 1
exceptions['EmptyTimeseries'] += 1
empty_metrics.append(base_name)
empty_timeseries_list.append(labelled_metric)
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
# Convert Redis millisecond timestamps to second timestamps
timeseries = [[int(mts / 1000), value] for mts, value in timeseries]
last_timeseries_timestamp = 0
try:
last_timeseries_timestamp = int(timeseries[-1][0])
except:
metrics_skipped += 1
exceptions['EmptyTimeseries'] += 1
empty_metrics.append(base_name)
empty_timeseries_list.append(labelled_metric)
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
if (until_timestamp - last_timeseries_timestamp) >= settings.STALE_PERIOD:
metrics_skipped += 1
exceptions['Stale'] += 1
stale_metrics[metric_id] = str(last_timeseries_timestamp)
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_stale_metrics.append(str(metric_id))
# try:
# self.redis_conn_decoded.sadd('analyzer_labelled_metrics.stale', str(metric_id))
# except Exception as err:
# redis_set_errors.append(['analyzer_labelled_metrics.stale', str(err)])
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
# TODO: How to not classify as short if the metric stops and starts
# again. In analyzer with FULL_DURATION data this is not so much of
# an issue, but with 3 hours of data...
# if len(timeseries) < 180:
# if len(timeseries) < 60:
if len(timeseries) < settings.MIN_TOLERABLE_LENGTH:
metrics_skipped += 1
exceptions['TooShort'] += 1
tooshort_metrics[base_name] = len(timeseries)
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_tooshort_metrics.append(str(metric_id))
# try:
# self.redis_conn_decoded.sadd('analyzer_labelled_metrics.tooshort', str(metric_id))
# except Exception as err:
# redis_set_errors.append(['analyzer_labelled_metrics.tooshort', str(err)])
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
# Maintain a Redis hash key of the last timestamp of all metrics and
# only update the hash key if the timestamp is recent. This Redis
# hash key is pruned in metrics_manager when an entry has a
# timestamp older that now - FULL_DURATION
# Thunder stale_metrics check requires all metrics to have their
# timestamps recorded in the hash key
update_last_timestamp_hash_key = False
if last_ts:
if last_ts < last_timeseries_timestamp:
update_last_timestamp_hash_key = True
else:
update_last_timestamp_hash_key = True
if update_last_timestamp_hash_key:
metrics_last_timeseries_timestamp_update_dict[metric_id] = last_timeseries_timestamp
else:
# No new data since last analysis
metrics_skipped += 1
exceptions['NoNewData'] += 1
no_new_data_metrics[metric_id] = last_timeseries_timestamp
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
# Get rid of boring series
boring = False
if len(set(item[1] for item in timeseries[-settings.MAX_TOLERABLE_BOREDOM:])) == settings.BOREDOM_SET_SIZE:
# metrics_skipped += 1
exceptions['Boring'] += 1
boring = True
# Class boring as analysed
# continue
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_boring_metrics.append(str(metric_id))
# try:
# self.redis_conn_decoded.sadd('analyzer_labelled_metrics.boring', str(metric_id))
# except Exception as err:
# redis_set_errors.append(['analyzer_labelled_metrics.boring', str(err)])
metric_type_redis_key = None
metric_dict = {}
try:
metric_dict = assigned_metrics_dict[base_name]['metric_dict']
except:
metric_dict = {}
if not metric_dict:
try:
# metric_dict = metric_name_labels_parser(skyline_app, base_name)
metric_dict = self.metric_name_labels_parser(base_name)
if metric_dict:
metrics_dict[base_name]['metric_dict'] = metric_dict
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'metric_name_labels_parser', str(err_msg)])
metrics_skipped += 1
exceptions['LabelsError'] += 1
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
try:
metric_name = metric_dict['metric']
tenant_id = metric_dict['labels']['_tenant_id']
server_id = metric_dict['labels']['_server_id']
metric_type_redis_key = 'metrics_manager.prometheus.metrics_type.%s.%s' % (str(tenant_id), str(server_id))
tenant_ids.append(tenant_id)
except Exception as err:
errors.append([labelled_metric, 'parsing metric_dict', str(err)])
metrics_skipped += 1
exceptions['metricDict'] += 1
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
continue
determine_monotonic = False
calculate_derivative = False
metric_type_known = False
# The metric_subtype allows for SUMMARY and HISTOGRAM metrics
# to be defined as COUNTER or GAUGE metrics due to the fact
# that these metrics can be COUNTERs or GAUGES
metric_subtype = None
add_metric_subtype = False
metric_type = None
try:
# metric_type = metrics_dict[base_name]['type']
metric_type = metric_dict['type']
except:
metric_type = None
if not metric_type:
try:
metric_type = metrics_dict[base_name]['type']
except:
metric_type = None
in_labelled_metrics_id_types = False
try:
in_labelled_metrics_id_types = metric_dict['in_labelled_metrics_id_types']
except:
in_labelled_metrics_id_types = False
if metric_type:
metric_types_known[metric_type]['known'] += 1
# There is an issue in Prometheus metric types in so far as
# developers will not always adhere to metric typing rules
# and in some exporter you will find COUNTER and GAUGE
# metrics in the same metric namespace. To overcome this
# analyzer periodically checks the metrics and identifies
# the skyline_metric_type, because the Prometheus metadata
# cannot be trusted.
if not metric_type:
try:
# @modified 20221229 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# Fetch the data once because with 1000s of new metrics added it
# becomes more efficient
# metric_type_id_str = self.redis_conn_decoded.hget('skyline.labelled_metrics.id.type', str(metric_id))
metric_type_id_str = skyline_labelled_metrics_id_types[str(metric_id)]
if metric_type_id_str:
metric_type = skyline_metric_types_by_id[int(metric_type_id_str)]
except Exception as err:
err_msg = 'hgetall skyline.labelled_metrics.id.type'
errors.append([labelled_metric, err_msg, str(err)])
if metric_type:
metric_subtype = str(metric_type)
metric_types_known[metric_type]['looked_up'] += 1
# metric_type = 'COUNTER'
if not metric_type and metric_type_redis_key:
if metric_type_redis_key not in list(metric_type_redis_keys.keys()):
metric_type_redis_key_dict = None
try:
metric_type_redis_key_dict = self.redis_conn_decoded.hgetall(metric_type_redis_key)
except Exception as err:
err_msg = 'hgetall %s' % metric_type_redis_key
errors.append([labelled_metric, err_msg, str(err)])
metric_type_redis_keys[metric_type_redis_key] = metric_type_redis_key_dict
try:
metric_type = metric_type_redis_keys[metric_type_redis_key][metric_name]
if metric_type:
metrics_dict[base_name]['type'] = metric_type
if metric_type in ['COUNTER', 'GAUGE']:
metrics_type_dict[metric_id] = skyline_metric_types[metric_type]
metric_types_known[metric_type]['looked_up'] += 1
except:
metric_type = None
# TODO - function to determine monotonic and add to hash, remember to remove id
# from the hash when metric becomes inactive. Probably best to do in metrics_manager
if metric_type == 'COUNTER':
metric_subtype = str(metric_type)
calculate_derivative = True
metric_type_known = True
if metric_type == 'GAUGE':
metric_subtype = str(metric_type)
calculate_derivative = False
metric_type_known = True
metric_subtype_redis_key = 'metrics_manager.prometheus.metrics_subtype.%s.%s' % (str(tenant_id), str(server_id))
if metric_type in ['SUMMARY', 'HISTOGRAM', None]:
try:
metric_subtype = self.redis_conn_decoded.hget(metric_subtype_redis_key, str(metric_id))
except Exception as err:
errors.append([labelled_metric, metric_subtype_redis_key, str(err)])
if metric_subtype == 'None':
metric_subtype = None
if not metric_subtype:
add_metric_subtype = True
else:
if not metric_type:
# metric_type = 'UNKNOWN'
metric_type = None
try:
metric_subtypes_queried['UNKNOWN'][metric_subtype] += 1
except Exception as err:
msg = 'subtype queried update dict: metric_subtypes_queried[%s][%s]' % (str(metric_type), str(metric_subtype))
errors.append([labelled_metric, msg, str(err)])
if metric_subtype and not metric_type:
metric_type = str(metric_subtype)
if not metric_subtype:
determine_monotonic = True
# Periodically check metric monotonicity every 4 hours
if monotonicity_checked < monotonicity_check_per_minute:
continue_check = True
# Do not check not older than 1 hour
try:
if timeseries[0][0] > (until_timestamp - 3600):
continue_check = False
except:
continue_check = False
# @modified 20221228 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# The timeseries length could be less than 180 if a few
# data points are missed. Using MIN_TOLERABLE_LENGTH
# if len(timeseries) < 180:
# continue_check = False
if boring:
continue_check = False
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
if normal_daily_peak_expiry:
continue_check = False
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
if feedback_metric_expiry:
continue_check = False
if continue_check:
monotonicity_last_checked = None
check_monotonicity = False
try:
monotonicity_last_checked = int(float(self.redis_conn_decoded.hget('analyzer_labelled_metrics.monotonicity_last_checked', str(metric_id))))
except:
monotonicity_last_checked = None
if not monotonicity_last_checked:
check_monotonicity = True
if monotonicity_last_checked:
if monotonicity_last_checked < (until_timestamp - (3600 * 4)):
check_monotonicity = True
if check_monotonicity:
determine_monotonic = True
# DISABLED
# @modified 20221228 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# RE-ENABLED
# if int(metric_id) != 12060:
# determine_monotonic = False
else:
monotonicity_not_expired += 1
if add_metric_subtype:
determine_monotonic = True
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
if normal_daily_peak_expiry:
determine_monotonic = False
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
if feedback_metric_expiry:
determine_monotonic = False
# @added 20230518 - metric_type.longterm_expire
# Do not overwrite longterm metric_type classifications
metric_type_longterm_expire = None
if determine_monotonic and skyline_labelled_metrics_id_types_longterm_expire:
try:
metric_type_longterm_expire = skyline_labelled_metrics_id_types_longterm_expire[str(metric_id)]
except KeyError:
metric_type_longterm_expire = None
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to determine expire_str from skyline_labelled_metrics_id_types_longterm_expire - %s' % err)
if metric_type_longterm_expire:
determine_monotonic = False
if determine_monotonic:
monotonicity_checked += 1
is_strictly_increasing_monotonicity = False
try:
is_strictly_increasing_monotonicity = strictly_increasing_monotonicity(timeseries)
except Exception as err:
errors.append([labelled_metric, 'is_strictly_increasing_monotonicity', str(err)])
try:
monotonicity_last_checked = self.redis_conn_decoded.hset('analyzer_labelled_metrics.monotonicity_last_checked', str(metric_id), str(until_timestamp))
except:
pass
if is_strictly_increasing_monotonicity:
calculate_derivative = True
if metric_type != 'COUNTER':
logger.debug('debug :: labelled_metrics_spin_process :: monotonicity changed for %s, from type: %s to COUNTER - %s' % (
str(metric_id), str(metric_type), str(assigned_metrics_dict[base_name])))
metric_type = 'COUNTER'
monotonicity_changed += 1
if metric_subtype != 'COUNTER':
logger.debug('debug :: labelled_metrics_spin_process :: monotonicity changed for %s, from subtype: %s to COUNTER' % (
str(metric_id), str(metric_subtype)))
metric_subtype = 'COUNTER'
add_metric_subtype = True
monotonicity_changed += 1
metric_type = 'COUNTER'
metric_subtype = 'COUNTER'
else:
calculate_derivative = False
if metric_type != 'GAUGE':
logger.debug('debug :: labelled_metrics_spin_process :: monotonicity changed for %s, from type: %s to GAUGE - %s' % (
str(metric_id), str(metric_type), str(assigned_metrics_dict[base_name])))
metric_type = 'GAUGE'
monotonicity_changed += 1
if metric_subtype != 'GAUGE':
logger.debug('debug :: labelled_metrics_spin_process :: monotonicity changed for %s, from subtype: %s to GAUGE' % (
str(metric_id), str(metric_subtype)))
metric_subtype = 'GAUGE'
monotonicity_changed += 1
add_metric_subtype = True
metric_type = 'GAUGE'
metric_subtype = 'GAUGE'
if add_metric_subtype:
try:
self.redis_conn_decoded.hset(metric_subtype_redis_key, str(metric_id), metric_subtype)
# metric_subtypes_added += 1
metric_subtypes_added[metric_type][metric_subtype] += 1
except Exception as err:
errors.append([labelled_metric, 'metric_subtype', str(err)])
if metric_subtype == 'COUNTER':
calculate_derivative = True
# Add type to dict for metrics_manager to update
if metric_type in ['COUNTER', 'GAUGE']:
metrics_type_dict[metric_id] = skyline_metric_types[metric_type]
if metric_subtype in ['COUNTER', 'GAUGE']:
metrics_type_dict[metric_id] = skyline_metric_types[metric_subtype]
if not in_labelled_metrics_id_types and not boring:
if metric_subtype:
metrics_type_dict[metric_id] = skyline_metric_types[metric_subtype]
if calculate_derivative and not boring:
try:
timeseries = nonNegativeDerivative(timeseries)
except Exception as err:
errors.append([labelled_metric, 'nonNegativeDerivative timeseries', str(err)])
check_for_anomalous = True
if boring:
check_for_anomalous = False
boring_dict[metric_id] = str(timeseries[-1][1])
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
if normal_daily_peak_expiry:
check_for_anomalous = False
normal_daily_peak_metrics_skipped.append(metric_id)
last_analysed_timeseries = list(timeseries)
last_analysed_metric = base_name
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
if feedback_metric_expiry:
check_for_anomalous = False
feedback_metrics_skipped.append(metric_id)
last_analysed_timeseries = list(timeseries)
last_analysed_metric = base_name
# @added 20230404 - Feature #4888: analyzer - load_shedding
metrics_last_analysis_dict[labelled_metric] = int(right_now)
mad_anomalous = None
use_mad = False
metrics_analysed += 1
# Only check if stationary every ~30 minutes
stationary_status = None
if check_for_anomalous:
try:
stationary_status = assigned_metrics_dict[base_name]['stationary']['v']
stationary_last_checked = int(assigned_metrics_dict[base_name]['stationary']['ts'])
# Only do 1000 stationary tests per run
if len(stationary_timings) < 1000:
if stationary_last_checked < (current_aligned_ts - 1800):
stationary_status = None
else:
stationary_not_expired += 1
else:
stationary_not_expired += 1
except:
stationary_status = None
try:
if str(stationary_status) == 'None':
s_start = time()
stationary_status = is_stationary(timeseries)
stationary_timings.append(time() - s_start)
value_data = '%s,%s' % (str(stationary_status), str(current_aligned_ts))
metrics_stationary_update_dict[str(metric_id)] = value_data
if stationary_status:
stationary += 1
use_mad = True
else:
not_stationary += 1
except Exception as err:
errors.append([labelled_metric, 'is_stationary', str(err)])
if use_mad:
m_start = time()
try:
mad_anomalous = median_absolute_deviation(timeseries)
mad_analysed += 1
if str(mad_anomalous) == 'False':
check_for_anomalous = False
if mad_anomalous:
mad_anomalies += 1
mad_anomalous_dict[base_name] = str(timeseries)
except Exception as err:
errors.append([labelled_metric, 'median_absolute_deviation', str(err)])
mad_timings.append(time() - m_start)
last_analysed_timeseries = list(timeseries)
last_analysed_metric = base_name
full_duration_timeseries = []
if check_for_anomalous:
r_start = time()
try:
full_duration_timeseries = self.redis_conn_decoded.ts().range(redis_ts_key, (from_timestamp * 1000), (until_timestamp * 1000))
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to get Redis full_duration timeseries for %s - %s' % (
str(labelled_metric), err))
errors.append([labelled_metric, 'ts().range', str(err)])
full_duration_timeseries = []
redis_full_duration_timings.append(time() - r_start)
if not full_duration_timeseries:
check_for_anomalous = False
else:
timeseries = full_duration_timeseries
timeseries = [[int(mts / 1000), value] for mts, value in timeseries]
# @added 20230329 - Feature #4882: labelled_metrics - resolution and data sparsity
# Feature #3870: metrics_manager - check_data_sparsity
# Handling the populating of the labelled_metrics data
# sparsity directly in analyzer_labelled_metrics to
# prevent having to surface the data in metrics_manager
# if the sparsity has been recently updated. Running
# the determine_data_frequency and the
# determine_data_sparsity functions, combined together
# take 77.1 µs (microseconds) to run, this means that
# we can process ~12970 timeseries per seconds with
# these functions.
if check_for_anomalous:
update_resolution_and_sparsity = False
last_sparsity_check_timestamp = 0
try:
last_sparsity_check_timestamp_str = labelled_metrics_resolution_sparsity_last_checked_dict[str(metric_id)]
if last_sparsity_check_timestamp_str:
last_sparsity_check_timestamp = int(float(last_sparsity_check_timestamp_str))
if (last_sparsity_check_timestamp + 1800) > int(r_start):
update_resolution_and_sparsity = True
else:
labelled_metrics_resolution_sparsity_recently_checked += 1
except:
last_sparsity_check_timestamp = 0
update_resolution_and_sparsity = True
if update_resolution_and_sparsity:
start_resolution_and_sparsity_timing = time()
metric_resolution = 0
data_sparsity = None
try:
metric_resolution, timestamp_resolutions_count = determine_data_frequency(skyline_app, timeseries, False)
except Exception as err:
errors.append([labelled_metric, 'determine_data_frequency', str(err)])
metric_resolution = 0
if metric_resolution:
labelled_metrics_resolutions[str(metric_id)] = metric_resolution
try:
data_sparsity = determine_data_sparsity(skyline_app, timeseries, resolution=metric_resolution, log=False)
except Exception as err:
errors.append([labelled_metric, 'determine_data_sparsity', str(err)])
data_sparsity = None
if isinstance(data_sparsity, float):
labelled_metrics_sparsity[str(metric_id)] = data_sparsity
labelled_metrics_resolution_sparsity_checked_dict[str(metric_id)] = int(right_now)
resolution_and_sparsity_timings.append(time() - start_resolution_and_sparsity_timing)
anomalous = None
ensemble = []
datapoint = None
algorithms_run = []
# @added 20221228 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# Always check the monotonicity of the timeseries when it
# moves to the full_duration analysis and update the Redis
# sets as appropriate.
# if check_for_anomalous and metric_type != 'COUNTER' and metric_subtype != 'COUNTER':
# @modified 20230518 - metric_type.longterm_expire
# Do not overwrite longterm metric_type classifications
# if full_duration_timeseries:
if full_duration_timeseries and not metric_type_longterm_expire:
monotonicity_checked += 1
is_strictly_increasing_monotonicity = False
try:
is_strictly_increasing_monotonicity = strictly_increasing_monotonicity(full_duration_timeseries)
except Exception as err:
errors.append([labelled_metric, 'is_strictly_increasing_monotonicity', str(err)])
try:
monotonicity_last_checked = self.redis_conn_decoded.hset('analyzer_labelled_metrics.monotonicity_last_checked', str(metric_id), str(until_timestamp))
except:
pass
add_metric_subtype = False
if is_strictly_increasing_monotonicity:
calculate_derivative = True
if metric_type != 'COUNTER':
logger.debug('debug :: labelled_metrics_spin_process :: full_duration monotonicity changed for %s, from type: %s to COUNTER - %s' % (
str(metric_id), str(metric_type), str(assigned_metrics_dict[base_name])))
metric_type = 'COUNTER'
monotonicity_changed += 1
if metric_subtype != 'COUNTER':
logger.debug('debug :: labelled_metrics_spin_process :: full_duration monotonicity changed for %s, from subtype: %s to COUNTER' % (
str(metric_id), str(metric_subtype)))
metric_subtype = 'COUNTER'
add_metric_subtype = True
metric_type = 'COUNTER'
metric_subtype = 'COUNTER'
else:
# @added 20230517 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# Handle changes from COUNTER to GAUGE as well
if metric_type != 'GAUGE':
logger.debug('debug :: labelled_metrics_spin_process :: full_duration monotonicity changed for %s, from type: %s to GAUGE - %s' % (
str(metric_id), str(metric_type), str(assigned_metrics_dict[base_name])))
metric_type = 'GAUGE'
monotonicity_changed += 1
if metric_subtype != 'GAUGE':
logger.debug('debug :: labelled_metrics_spin_process :: full_duration monotonicity changed for %s, from subtype: %s to GAUGE' % (
str(metric_id), str(metric_subtype)))
metric_subtype = 'GAUGE'
add_metric_subtype = True
metric_subtype = 'GAUGE'
metric_type = 'GAUGE'
if add_metric_subtype:
try:
self.redis_conn_decoded.hset(metric_subtype_redis_key, str(metric_id), metric_subtype)
metric_subtypes_added[metric_type][metric_subtype] += 1
except Exception as err:
errors.append([labelled_metric, 'metric_subtype', str(err)])
if metric_subtype == 'COUNTER':
calculate_derivative = True
# Add type to dict for metrics_manager to update
if metric_type in ['COUNTER', 'GAUGE']:
metrics_type_dict[metric_id] = skyline_metric_types[metric_type]
if metric_subtype in ['COUNTER', 'GAUGE']:
metrics_type_dict[metric_id] = skyline_metric_types[metric_subtype]
if check_for_anomalous:
run_selected_algorithm_count += 1
d_start = time()
try:
downsampled_timeseries = downsample_timeseries('analyzer', timeseries, 60, 600, 'mean', 'end')
except Exception as err:
errors.append([labelled_metric, 'downsample_timeseries', str(err)])
downsampled_timeseries = list(timeseries)
downsample_timings.append(time() - d_start)
# Calculate the derivate AFTER downsampling
if check_for_anomalous and calculate_derivative:
if downsampled_timeseries:
try:
downsampled_timeseries = nonNegativeDerivative(downsampled_timeseries)
except Exception as err:
errors.append([labelled_metric, 'nonNegativeDerivative downsampled_timeseries', str(err)])
t_start = time()
try:
anomalous, ensemble, datapoint, negatives_found, algorithms_run = run_selected_algorithm(downsampled_timeseries, base_name, metric_airgaps, metric_airgaps_filled, run_negatives_present, check_for_airgaps_only, custom_stale_metrics_dict)
try:
del negatives_found
except:
pass
# It could have been deleted by the Roomba
except TypeError:
exceptions['DeletedByRoomba'] += 1
except EmptyTimeseries:
exceptions['EmptyTimeseries'] += 1
empty_timeseries_list.append(labelled_metric)
except TooShort:
exceptions['TooShort'] += 1
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_tooshort_metrics.append(str(metric_id))
# try:
# self.redis_conn_decoded.sadd('analyzer_labelled_metrics.tooshort', str(metric_id))
# except Exception as err:
# redis_set_errors.append(['analyzer_labelled_metrics.tooshort', str(err)])
except Stale:
exceptions['Stale'] += 1
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_stale_metrics.append(str(metric_id))
# try:
# self.redis_conn_decoded.sadd('analyzer_labelled_metrics.stale', str(metric_id))
# except Exception as err:
# redis_set_errors.append(['analyzer_labelled_metrics.stale', str(err)])
except Boring:
exceptions['Boring'] += 1
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
identified_boring_metrics.append(str(metric_id))
# try:
# self.redis_conn_decoded.sadd('analyzer_labelled_metrics.boring', str(metric_id))
# except Exception as err:
# redis_set_errors.append(['analyzer_labelled_metrics.boring', str(err)])
except Exception as err:
exceptions['Other'] += 1
errors.append([labelled_metric, 'run_selected_algorithm - Other', str(err)])
threesigma_timings.append(time() - t_start)
# @added 20220919 - Feature #4676: analyzer - illuminance.all key
if ensemble.count(True) > 0:
triggered_algorithms = []
for index, value in enumerate(ensemble):
if value:
algorithm = algorithms_run[index]
try:
algorithm_id = int(algorithms[algorithm])
except:
algorithm_id = 0
triggered_algorithms.append(algorithm_id)
try:
illuminance_all_dict[str(metric_id)] = {
# 't': int(metric_timestamp),
't': int(last_timeseries_timestamp),
'v': float(datapoint),
'a': triggered_algorithms}
except Exception as err:
errors.append([labelled_metric, 'failed to add illuminance_all_dict', str(err)])
if not anomalous:
no_anomalies += 1
try:
data = [metric_id, last_timeseries_timestamp]
not_anomalous_list.append(str(data))
except Exception as err:
errors.append([labelled_metric, 'not_anomalous_list', str(err)])
continue
anomalies += 1
threesigma_anomalous_dict[base_name] = str(timeseries)
if calculate_derivative:
try:
timeseries = nonNegativeDerivative(timeseries)
except Exception as err:
errors.append([labelled_metric, 'nonNegativeDerivative timeseries', str(err)])
# Handle single value timeseries. The second timestamp in the
# timeseries was used because the first timestamp in a
# nonNegativeDerivative timeseries has no value.
try:
from_timestamp = int(timeseries[1][0])
except:
try:
from_timestamp = int(timeseries[0][0])
except:
# Failover to last_timeseries_timestamp
from_timestamp = last_timeseries_timestamp
metric_timestamp = last_timeseries_timestamp
triggered_algorithms_for_waterfall_alert = []
for index, value in enumerate(ensemble):
if value:
algorithm = algorithms_run[index]
triggered_algorithms_for_waterfall_alert.append(algorithm)
waterfall_panorama_data_added_at = until_timestamp
waterfall_panorama_data_source = 'redistimeseries'
waterfall_panorama_data = [
base_name, datapoint, from_timestamp, metric_timestamp,
algorithms_run, triggered_algorithms_for_waterfall_alert,
skyline_app, waterfall_panorama_data_source, this_host,
waterfall_panorama_data_added_at
]
metric = [datapoint, base_name, metric_timestamp]
redis_set = 'analyzer_labelled_metrics.real_anomalous_metrics'
data = str(metric)
try:
self.redis_conn.sadd(redis_set, data)
self.redis_conn.expire(redis_set, 120)
except Exception as err:
errors.append([labelled_metric, 'sadd analyzer_labelled_metrics.real_anomalous_metrics', str(err)])
# Get the anomaly breakdown - who returned True?
triggered_algorithms = []
for index, value in enumerate(ensemble):
if value:
algorithm = algorithms_run[index]
anomaly_breakdown[algorithm] += 1
triggered_algorithms.append(algorithm)
metric_data = [datapoint, base_name, metric_timestamp, triggered_algorithms, algorithms_run]
metric_data = {
'metric': base_name,
'metric_id': int(metric_id),
'metric_dict': metrics_dict[base_name],
'from_timestamp': from_timestamp,
'timestamp': metric_timestamp,
'value': datapoint,
'triggered_algorithms': triggered_algorithms,
'algorithms_run': algorithms_run,
'snab_only_check': False,
'processed_by': {
'analyzer_labelled_metrics': {
'timestamp': int(time()),
'triggered_algorithms': triggered_algorithms,
'algorithms_run': algorithms_run,
'waterfall_panorama_data': waterfall_panorama_data,
'process_number': i_process,
},
},
}
redis_set = 'analyzer_labelled_metrics.anomalous_metrics'
try:
self.redis_conn.sadd(redis_set, str(metric_data))
self.redis_conn.expire(redis_set, 120)
except Exception as err:
errors.append([labelled_metric, 'sadd analyzer_labelled_metrics.anomalous_metrics', str(err)])
# Add ionosphere data
# timeseries_dir = str(metric_id)
# timeseries_dir = base_name.replace('.', '/')
timeseries_dir = labelled_metric.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(metric_timestamp),
str(timeseries_dir))
if not os.path.exists(training_dir):
mkdir_p(training_dir)
full_duration_in_hours = int(settings.FULL_DURATION) / 3600
ionosphere_json_file = '%s/%s.mirage.redis.%sh.json' % (
training_dir, str(labelled_metric),
str(int(full_duration_in_hours)))
if not os.path.isfile(ionosphere_json_file):
# Do not use the downsample_timeseries as there are only
# ~144 data points which is not sufficient for echo
# features profiles or motif matching
# timeseries_json = str(downsampled_timeseries).replace('[', '(').replace(']', ')')
timeseries_json = str(timeseries).replace('[', '(').replace(']', ')')
try:
write_data_to_file(skyline_app, ionosphere_json_file, 'w', timeseries_json)
if VERBOSE_LOGGING:
logger.info('labelled_metrics_spin_process :: added Ionosphere Mirage %sh Redis data timeseries json file :: %s' % (
str(int(full_duration_in_hours)), ionosphere_json_file))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to add Ionosphere Mirage Redis data timeseries json file - %s - %s' % (
ionosphere_json_file, err))
# @added 20200904 - Feature #3734: waterfall alerts
added_to_waterfall_timestamp = int(time())
# [metric, timestamp, value, added_to_waterfall_timestamp, waterfall_panorama_data]
waterfall_data = [labelled_metric, metric_timestamp, datapoint, added_to_waterfall_timestamp, waterfall_panorama_data]
redis_set = 'analyzer.waterfall_alerts.sent_to_mirage'
# Only add to waterfall_alerts if it is not
# a snab_only_check
snab_only_check = False
if not snab_only_check:
try:
self.redis_conn.sadd(redis_set, str(waterfall_data))
# if VERBOSE_LOGGING:
logger.info('labelled_metrics_spin_process :: added to Redis set %s - %s' % (redis_set, str(waterfall_data)))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to add %s to Redis set %s - %s' % (
str(waterfall_data), str(redis_set), e))
redis_hash = 'analyzer_labelled_metrics.mirage_check'
hash_key = '%s.%s' % (str(metric_id), str(int(time())))
try:
self.redis_conn.hset(redis_hash, hash_key, str(metric_data))
logger.info('labelled_metrics_spin_process :: added mirage check for %s with metric_data: %s' % (
str(labelled_metric), str(metric_data)))
except Exception as err:
errors.append([labelled_metric, 'hset analyzer_labelled_metrics.to_mirage', metric_data, str(err)])
# @added 20220504 - Feature #2580: illuminance
try:
illuminance_dict[str(metric_id)] = {
'timestamp': int(metric_timestamp),
'value': float(datapoint),
'triggered_algorithms_count': len(triggered_algorithms)}
except Exception as err:
errors.append([labelled_metric, 'failed to add illuminance_dict', str(err)])
# FINISH HERE FOR NOW
continue
if not_anomalous_list:
redis_set = 'analyzer_labelled_metrics.not_anomalous_metrics'
logger.info('labelled_metrics_spin_process :: adding %s metric ids to %s' % (
str(len(not_anomalous_list)), redis_set))
try:
items_added = self.redis_conn.sadd(redis_set, *not_anomalous_list)
logger.info('labelled_metrics_spin_process :: added %s metric ids to %s' % (
str(items_added), redis_set))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to sadd to %s Redis set - %s' % (
redis_set, err))
errors.append(['labelled_metrics_spin_process', 'sadd analyzer_labelled_metrics.not_anomalous_metrics', str(err)])
if empty_timeseries_list:
redis_set = 'redistimeseries_roomba.metrics_to_remove'
logger.info('labelled_metrics_spin_process :: adding %s metrics to %s' % (
str(len(empty_timeseries_list)), redis_set))
try:
items_added = self.redis_conn.sadd(redis_set, *empty_timeseries_list)
logger.info('labelled_metrics_spin_process :: added %s metrics to %s' % (
str(items_added), redis_set))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to sadd to %s Redis set - %s' % (
redis_set, err))
errors.append(['labelled_metrics_spin_process', 'sadd redistimeseries_roomba.metrics_to_remove', str(err)])
if boring_dict:
redis_set = 'analyzer_labelled_metrics.boring.%s' % str(current_aligned_ts)
logger.info('labelled_metrics_spin_process :: adding %s metrics to %s' % (
str(len(boring_dict)), redis_set))
try:
items_added = self.redis_conn.hset(redis_set, mapping=boring_dict)
logger.info('labelled_metrics_spin_process :: added %s metrics to %s' % (
str(items_added), redis_set))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to hset to %s Redis set - %s' % (
redis_set, err))
errors.append(['labelled_metrics_spin_process', 'hset analyzer_labelled_metrics.boring', str(err)])
try:
self.redis_conn.expire(redis_set, 120)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to expire to %s Redis set - %s' % (
redis_set, err))
logger.info('labelled_metrics_spin_process :: metric_subtypes_added: %s' % str(metric_subtypes_added))
logger.info('labelled_metrics_spin_process :: metric_subtypes_queried: %s' % str(metric_subtypes_queried))
logger.info('labelled_metrics_spin_process :: metric_types_known: %s' % str(metric_types_known))
stats = {
'checked': metrics_checked,
'analysed': metrics_analysed,
'skipped': metrics_skipped,
'anomalous': anomalies,
'not anomalous': no_anomalies,
'redis timings': sum(redis_timings),
'redis_timings_count': len(redis_timings),
'redis full_duration timings': sum(redis_full_duration_timings),
'stationary analysed': len(stationary_timings),
'stationary': stationary,
'not stationary': not_stationary,
'stationary timing': sum(stationary_timings),
'stationary not expired': stationary_not_expired,
'mad analysed': mad_analysed,
'mad anomalous': mad_anomalies,
'mad timings': sum(mad_timings),
'downsampled': len(downsample_timings),
'downsample timings': sum(downsample_timings),
'3sigma analysed': run_selected_algorithm_count,
'3sigma timings': sum(threesigma_timings),
'monotonicity_checked': monotonicity_checked,
'monotonicity_changed': monotonicity_changed,
'resolution_and_sparsity_timings': sum(resolution_and_sparsity_timings),
# @added 20230426 - Feature #4724: custom_algorithms - anomalous_daily_peak
# Added expiry to record metrics identified as normal by anomalous_daily_peaks
'normal_daily_peak_metrics_skipped': len(normal_daily_peak_metrics_skipped),
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
'feedback_metrics_skipped': len(feedback_metrics_skipped),
}
logger.info('labelled_metrics_spin_process :: complete: %s' % str(stats))
for kkey, value in stats.items():
self.stats_q.put((kkey, value))
logger.info('labelled_metrics_spin_process :: used %s metric id Redis ts keys' % str(used_metric_id_keys))
logger.info('labelled_metrics_spin_process :: completed with %s errors' % str(len(errors)))
if errors:
logger.info('labelled_metrics_spin_process :: sample error: %s' % str(errors[0]))
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
if redis_set_errors:
errors = redis_set_errors + errors
sadd_errors = []
if errors:
error_set = 'analyzer_labelled_metrics.errors.%s' % str(current_aligned_ts)
errors_strs = []
for error_list in errors:
errors_strs.append(str(error_list))
try:
self.redis_conn_decoded.sadd(error_set, *errors_strs)
except Exception as err:
sadd_errors.append(['errors_strs', 'sadd analyzer_labelled_metrics.errors', str(err)])
if not sadd_errors:
logger.info('labelled_metrics_spin_process :: updated %s Redis set with %s errors' % (
len(errors), error_set))
else:
logger.error('error :: labelled_metrics_spin_process :: failed to sadd to %s Redis set with %s errors, last err - %s' % (
error_set, str(len(sadd_errors)), err))
try:
self.redis_conn_decoded.expire(error_set, 120)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to expire %s Redis set - %s' % (
error_set, err))
logger.info('labelled_metrics_spin_process :: %s metrics run through run_selected_algorithm' % (
str(run_selected_algorithm_count)))
# @added 20220420 - Feature #4530: namespace.analysed_events
namespace_analysed = defaultdict(int)
for tenant_id in tenant_ids:
parent_namespace = tenant_id
namespace_analysed[parent_namespace] += 1
date_string = str(strftime('%Y-%m-%d', gmtime()))
namespace_analysed_events_hash = 'namespace.analysed_events.%s.%s' % (skyline_app, date_string)
for namespace in list(namespace_analysed.keys()):
try:
self.redis_conn.hincrby(namespace_analysed_events_hash, namespace, namespace_analysed[namespace])
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to increment %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
try:
self.redis_conn.expire(namespace_analysed_events_hash, (86400 * 15))
logger.info('labelled_metrics_spin_process :: updated %s Redis hash' % namespace_analysed_events_hash)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set expire %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
# Add values to the queue so the parent process can collate
for kkey, value in anomaly_breakdown.items():
self.anomaly_breakdown_q.put((kkey, value))
for kkey, value in exceptions.items():
self.exceptions_q.put((kkey, value))
if len(illuminance_dict) > 0:
logger.info('labelled_metrics_spin_process :: calling add_illuminance_entries with %s entries to add' % (
str(len(illuminance_dict))))
current_illuminance_dict = {}
try:
current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(spin_start), illuminance_dict)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: add_illuminance_entries failed - %s' % (
err))
logger.info('labelled_metrics_spin_process :: illuminance Redis hash now has %s entries' % (
str(len(current_illuminance_dict))))
# @added 20220919 - Feature #4676: analyzer - illuminance.all key
if len(illuminance_all_dict) > 0:
logger.info('labelled_metrics_spin_process :: calling add_illuminance_entries (all) with %s entries to add' % (
str(len(illuminance_all_dict))))
current_illuminance_all_dict = {}
try:
current_illuminance_all_dict = add_illuminance_entries(self, skyline_app, int(spin_start), illuminance_all_dict)
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: add_illuminance_entries (all) failed - %s' % (
err))
logger.info('labelled_metrics_spin_process :: illuminance_all Redis hash now has %s entries' % (
str(len(current_illuminance_all_dict))))
if metrics_last_timeseries_timestamp_update_dict:
hash_key = metrics_last_timeseries_timestamp_hash_key
hash_dict = metrics_last_timeseries_timestamp_update_dict
logger.info('labelled_metrics_spin_process :: updating %s Redis hash with %s metrics' % (
hash_key, str(len(hash_dict))))
try:
self.redis_conn.hset(hash_key, mapping=hash_dict)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to update %s Redis hash - %s' % (
hash_key, err))
if metrics_stationary_update_dict:
hash_key = metrics_stationary_hash_key
hash_dict = metrics_stationary_update_dict
logger.info('labelled_metrics_spin_process :: updating %s Redis hash with %s metrics' % (
hash_key, str(len(hash_dict))))
try:
self.redis_conn.hset(hash_key, mapping=hash_dict)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to update %s Redis hash - %s' % (
hash_key, err))
if last_analysed_timeseries and i_process == 1:
last_analysed_timeseries_data = [last_analysed_metric, last_analysed_timeseries]
try:
self.redis_conn.setex('analyzer_labelled_metrics.last_analysed_timeseries', 300, str(last_analysed_timeseries_data))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set analyzer_labelled_metrics.last_analysed_timeseries Redis key - %s' % (
err))
if mad_anomalous_dict and i_process == 1:
hash_key = 'analyzer_labelled_metrics.last_mad_anomalous.%s' % str(current_aligned_ts)
try:
self.redis_conn.hset(hash_key, mapping=mad_anomalous_dict)
self.redis_conn.expire(hash_key, 300)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
if threesigma_anomalous_dict:
hash_key = 'analyzer_labelled_metrics.last_3sigma_anomalous.%s.%s' % (str(i_process), str(current_aligned_ts))
try:
self.redis_conn.hset(hash_key, mapping=threesigma_anomalous_dict)
self.redis_conn.expire(hash_key, 1800)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
if empty_metrics:
hash_key = 'analyzer_labelled_metrics.empty_metrics.%s' % str(current_aligned_ts)
try:
self.redis_conn.sadd(hash_key, *empty_metrics)
self.redis_conn.expire(hash_key, 120)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to sadd %s Redis set - %s' % (
hash_key, err))
record_stale_metrics = True
if record_stale_metrics:
if stale_metrics:
hash_key = 'analyzer_labelled_metrics.stale_metrics.%s' % str(current_aligned_ts)
try:
self.redis_conn.hset(hash_key, mapping=stale_metrics)
self.redis_conn.expire(hash_key, 120)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
if tooshort_metrics:
hash_key = 'analyzer_labelled_metrics.tooshort_metrics.%s' % str(current_aligned_ts)
try:
self.redis_conn.hset(hash_key, mapping=tooshort_metrics)
self.redis_conn.expire(hash_key, 120)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
record_no_new_data_metrics = True
if record_no_new_data_metrics:
if no_new_data_metrics:
hash_key = 'analyzer_labelled_metrics.no_new_data_metrics.%s' % str(current_aligned_ts)
try:
self.redis_conn.hset(hash_key, mapping=no_new_data_metrics)
self.redis_conn.expire(hash_key, 120)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
logger.info('labelled_metrics_spin_process :: adding %s metrics to skyline.labelled_metrics.id.type Redis hash' % (
str(len(metrics_type_dict))))
if metrics_type_dict:
hash_key = 'skyline.labelled_metrics.id.type'
try:
self.redis_conn.hset(hash_key, mapping=metrics_type_dict)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
# @added 20230517 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# If any metrics have changed monotonicity set a key to update the metric_type
# in the parent process
if monotonicity_changed:
try:
self.redis_conn_decoded.setex('skyline.labelled_metrics.id.type.changed', 60, monotonicity_changed)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to create Redis key skyline.labelled_metrics.id.type.changed - %s' % (
err))
# @added 20230329 - Feature #4882: labelled_metrics - resolution and data sparsity
# Feature #3870: metrics_manager - check_data_sparsity
if labelled_metrics_resolution_sparsity_checked_dict:
logger.info('labelled_metrics_spin_process :: checked resolution and sparsity for %s metrics' % (
str(len(labelled_metrics_resolution_sparsity_checked_dict))))
if labelled_metrics_resolutions:
hash_key = 'labelled_metrics.metric_resolutions'
try:
self.redis_conn.hset(hash_key, mapping=labelled_metrics_resolutions)
logger.info('labelled_metrics_spin_process :: updated %s metrics in %s Redis hash' % (
str(len(labelled_metrics_resolutions)), hash_key))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
if labelled_metrics_sparsity:
hash_key = 'labelled_metrics.data_sparsity'
try:
self.redis_conn.hset(hash_key, mapping=labelled_metrics_sparsity)
logger.info('labelled_metrics_spin_process :: updated %s metrics in %s Redis hash' % (
str(len(labelled_metrics_sparsity)), hash_key))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
if labelled_metrics_resolution_sparsity_recently_checked:
logger.info('labelled_metrics_spin_process :: resolution and sparsity not checked on %s metrics that have been recently checked' % (
str(labelled_metrics_resolution_sparsity_recently_checked)))
if labelled_metrics_resolution_sparsity_checked_dict:
hash_key = str(labelled_metrics_resolution_sparsity_last_checked_hash_key)
try:
self.redis_conn.hset(hash_key, mapping=labelled_metrics_resolution_sparsity_checked_dict)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to set %s Redis hash - %s' % (
hash_key, err))
# @modified 20230401 - Feature #4886: analyzer - operation_timings
# Only sadd once
if identified_boring_metrics:
logger.info('labelled_metrics_spin_process :: sadding %s identified_boring_metrics to analyzer_labelled_metrics.boring Redis set' % (
str(len(identified_boring_metrics))))
try:
self.redis_conn_decoded.sadd('analyzer_labelled_metrics.boring', *set(identified_boring_metrics))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to sadd identified_boring_metrics to analyzer_labelled_metrics.boring Redis set - %s' % (
err))
if identified_stale_metrics:
logger.info('labelled_metrics_spin_process :: sadding %s identified_stale_metrics to analyzer_labelled_metrics.stale Redis set' % (
str(len(identified_stale_metrics))))
try:
self.redis_conn_decoded.sadd('analyzer_labelled_metrics.stale', *set(identified_stale_metrics))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to sadd identified_stale_metrics to analyzer_labelled_metrics.stale Redis set - %s' % (
err))
if identified_tooshort_metrics:
logger.info('labelled_metrics_spin_process :: sadding %s identified_tooshort_metrics to analyzer_labelled_metrics.tooshort Redis set' % (
str(len(identified_tooshort_metrics))))
try:
self.redis_conn_decoded.sadd('analyzer_labelled_metrics.tooshort', *set(identified_tooshort_metrics))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to sadd identified_tooshort_metrics to analyzer_labelled_metrics.tooshort Redis set - %s' % (
err))
# @added 20230405 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters and all_timeseries
if timeseries_not_present_in_all_timeseries:
current_aligned_ts = int(int(spin_start) // 60 * 60)
hash_key = 'analyzer_labelled_metrics.timeseries_not_present_in_all_timeseries.%s' % str(current_aligned_ts)
logger.info('labelled_metrics_spin_process :: adding %s metrics to Redis %s hash key' % (
str(len(timeseries_not_present_in_all_timeseries)),
hash_key))
try:
self.redis_conn_decoded.hset(hash_key, mapping=timeseries_not_present_in_all_timeseries)
self.redis_conn_decoded.expire(hash_key, 180)
logger.info('labelled_metrics_spin_process :: added %s metrics to Redis %s hash key' % (
str(len(timeseries_not_present_in_all_timeseries)),
hash_key))
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to set %s hash key - %s' % (
hash_key, err))
# @added 20230404 - Feature #4888: analyzer - load_shedding
if load_shedding_active or activating_load_shedding or load_shedding_hash_exists:
if metrics_last_analysis_dict:
try:
self.redis_conn_decoded.hset(metrics_last_analysis_hash_key, mapping=metrics_last_analysis_dict)
logger.info('labelled_metrics_spin_process :: load shedding - updated %s analysis timestamps in Redis analyzer.metrics.last_analysis hash key' % (
str(len(metrics_last_analysis_dict))))
if load_shedding_active:
# The load_shedding hash only has the expiry set if load
# shedding is active, not if the hash is only being updated.
self.redis_conn_decoded.expire(metrics_last_analysis_hash_key, 300)
logger.info('labelled_metrics_spin_process :: load_shedding_active - set expire to 300 on Redis analyzer.metrics.last_analysis hash key')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to update analysis timestamp in Redis analyzer.metrics.last_analysis hash key - %s' % err)
try:
key_ttl = self.redis_conn_decoded.ttl(metrics_last_analysis_hash_key)
if key_ttl == -1:
self.redis_conn_decoded.expire(metrics_last_analysis_hash_key, 300)
logger.info('labelled_metrics_spin_process :: set the unset expire TTL on Redis analyzer.metrics.last_analysis hash key to 300')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to set the unset expire TTL on Redis analyzer.metrics.last_analysis hash key - %s' % err)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: error in memray with block - %s' % (
err))
logger.info('labelled_metrics_spin_process :: %s metrics had monotonicity_checked and %s changed' % (
str(monotonicity_checked), str(monotonicity_changed)))
spin_end = time() - spin_start
logger.info('labelled_metrics_spin_process :: process %s took %.2f seconds' % (str(i_process), spin_end))
return
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up and discover the number of `unique metrics`.
- Divide the `unique_labelled_metrics` between the number of `ANALYZER_LABELLED_METRICS_PROCESSES`
and assign each process a set of metrics to analyse for anomalies.
- Wait for the processes to finish.
- Determine whether if any anomalous metrics require:
- Alerting on (and set `EXPIRATION_TIME` key in Redis for alert).
- Feed to another module e.g. mirage.
- Alert to syslog.
- Populate the webapp json with the anomalous_metrics details.
- Log the details about the run to the skyline analyzer log.
- Send skyline.analyzer metrics to `GRAPHITE_HOST`
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('analyzer_labelled_metrics :: starting')
if os.path.isfile(skyline_app_loglock):
logger.error('error :: analyzer_labelled_metrics :: bin/%s.d log management seems to have failed, continuing' % skyline_app)
else:
logger.info('analyzer_labelled_metrics :: bin/%s.d log management done' % skyline_app)
if not os.path.exists(settings.SKYLINE_TMP_DIR):
# @modified 20160803 - Adding additional exception handling to Analyzer
try:
mkdir_p(settings.SKYLINE_TMP_DIR)
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to create %s' % settings.SKYLINE_TMP_DIR)
def smtp_trigger_alert(alert, metric, context):
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
try:
p = Process(target=self.spawn_alerter_process, args=(alert, metric, context))
pids.append(p)
pid_count += 1
p.start()
spawned_pids.append(p.pid)
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to spawn_alerter_process')
p_starts = time()
while time() - p_starts <= 15:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
break
else:
# We only enter this if we didn't 'break' above.
logger.info('analyzer_labelled_metrics :: timed out, killing the spawn_trigger_alert process')
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('analyzer_labelled_metrics :: stopping spawn_trigger_alert - %s' % (str(p.is_alive())))
p.join()
# Discover unique labelled_metrics
logger.info('analyzer_labelled_metrics :: memory usage before loading unique labelled_metrics - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
unique_labelled_metrics = []
try:
unique_labelled_metrics = list(self.redis_conn_decoded.smembers('labelled_metrics.unique_labelled_metrics'))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: could not get the unique_labelled_metrics list from labelled_metrics.unique_labelled_metrics Redis set - %s' % err)
logger.info('analyzer_labelled_metrics :: memory usage after loading unique labelled_metrics - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
logger.info('analyzer_labelled_metrics :: memory usage before loading aet.metrics_manager.metric_names_with_ids - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
metric_names_with_ids_key = 'aet.metrics_manager.metric_names_with_ids'
metric_names_with_ids = {}
try:
metric_names_with_ids = self.redis_conn_decoded.hgetall(metric_names_with_ids_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall aet.metrics_manager.metric_names_with_ids Redis hash key %s - %s' % (
err))
logger.info('analyzer_labelled_metrics :: memory usage after loading aet.metrics_manager.metric_names_with_ids - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
metric_ids_with_metric_key = 'aet.metrics_manager.active_labelled_ids_with_metric'
metric_ids_with_name = {}
try:
metric_ids_with_name = self.redis_conn_decoded.hgetall(metric_ids_with_metric_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall aet.metrics_manager.active_labelled_ids_with_metric Redis hash key %s - %s' % (
err))
if not metric_ids_with_name:
for base_name in metric_names_with_ids:
metric_id = metric_names_with_ids[base_name]
metric_ids_with_name[metric_id] = base_name
logger.info('analyzer_labelled_metrics :: memory usage after loading aet.metrics_manager.active_labelled_ids_with_metric - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
logger.info('analyzer_labelled_metrics :: memory usage before creating labelled_metrics_id_types - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
labelled_metrics_id_types = {}
try:
labelled_metrics_id_types = self.redis_conn_decoded.hgetall('skyline.labelled_metrics.id.type')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall skyline.labelled_metrics.id.type Redis hash key %s - %s' % (
err))
logger.info('analyzer_labelled_metrics :: memory usage after creating labelled_metrics_id_types - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
tenant_ids_with_count = {}
tenant_ids_base_names = {}
tenant_ids_labelled_metrics = {}
logger.info('analyzer_labelled_metrics :: memory usage before creating metrics_and_labels_dict - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
metrics_and_labels_dict = {}
metric_type_redis_keys_list = []
errors = []
for labelled_metric in unique_labelled_metrics:
metric_id = labelled_metric.replace('labelled_metrics.', '', 1)
try:
base_name = metric_ids_with_name[str(metric_id)]
except:
continue
metric_dict = {}
try:
# metric_dict = metric_name_labels_parser(skyline_app, base_name)
metric_dict = self.metric_name_labels_parser(base_name)
if metric_dict:
metrics_and_labels_dict[base_name] = metric_dict
try:
tenant_id = metric_dict['labels']['_tenant_id']
server_id = metric_dict['labels']['_server_id']
metric_type_redis_key = 'metrics_manager.prometheus.metrics_type.%s.%s' % (str(tenant_id), str(server_id))
metric_type_redis_keys_list.append(metric_type_redis_key)
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
try:
tenant_ids_with_count[tenant_id] += 1
tenant_ids_base_names[tenant_id].append(base_name)
tenant_ids_labelled_metrics[tenant_id].append(labelled_metric)
except:
tenant_ids_with_count[tenant_id] = 1
tenant_ids_base_names[tenant_id] = [base_name]
tenant_ids_labelled_metrics[tenant_id] = [labelled_metric]
except:
pass
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'metric_name_labels_parser', str(err_msg)])
metric_type = None
in_labelled_metrics_id_types = False
try:
metric_type_id_str = labelled_metrics_id_types[str(metric_id)]
if metric_type_id_str:
in_labelled_metrics_id_types = True
if metric_type_id_str == '1':
metric_type = 'COUNTER'
if metric_type_id_str == '0':
metric_type = 'GAUGE'
except:
metric_type = None
try:
metrics_and_labels_dict[base_name]['type'] = metric_type
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'adding metric type to metrics_and_labels_dict', str(err_msg)])
try:
metrics_and_labels_dict[base_name]['in_labelled_metrics_id_types'] = in_labelled_metrics_id_types
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'adding in_labelled_metrics_id_types to metrics_and_labels_dict', str(err_msg)])
if errors:
logger.error('error :: analyzer_labelled_metrics :: metric_name_labels_parser encountered %s errors, sample: %s' % (
str(len(errors)), str(errors[0])))
logger.info('analyzer_labelled_metrics :: memory usage after creating metrics_and_labels_dict - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
logger.info('analyzer_labelled_metrics :: memory usage before creating metric_type_redis_keys - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
metric_type_redis_keys = {}
metric_type_redis_keys_list = list(set(metric_type_redis_keys_list))
errors = []
for metric_type_redis_key in metric_type_redis_keys_list:
if metric_type_redis_key not in list(metric_type_redis_keys.keys()):
metric_type_redis_key_dict = None
try:
metric_type_redis_key_dict = self.redis_conn_decoded.hgetall(metric_type_redis_key)
except Exception as err:
err_msg = 'hgetall %s' % metric_type_redis_key
errors.append([metric_type_redis_key, err_msg, str(err)])
metric_type_redis_keys[metric_type_redis_key] = metric_type_redis_key_dict
if errors:
logger.error('error :: analyzer_labelled_metrics :: metric_name_labels_parser encountered %s errors, sample: %s' % (
str(len(errors)), str(errors[0])))
for base_name in list(metrics_and_labels_dict.keys()):
metric_type = None
try:
metric_type = metrics_and_labels_dict[base_name]['type']
except:
metric_type = None
if metric_type:
continue
try:
metric_type = metric_type_redis_keys[metric_type_redis_key][metrics_and_labels_dict[base_name]['metric_name']]
if metric_type:
metrics_and_labels_dict[base_name]['type'] = metric_type
except:
# Do not default to COUNTER
# metrics_and_labels_dict[base_name]['type'] = 'COUNTER'
metrics_and_labels_dict[base_name]['type'] = None
logger.info('analyzer_labelled_metrics :: memory usage after determining metric types from metric_type_redis_keys - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
fetch_new_metrics = False
no_id_metrics = []
while 1:
now = time()
logger.info('analyzer_labelled_metrics :: memory usage at beginning of run - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
# Make sure Redis is up
try:
self.redis_conn.ping()
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: cannot ping Redis at socket path %s, reconnect will be attempted in 10 seconds - %s' % (settings.REDIS_SOCKET_PATH, err))
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: cannot connect to get_redis_conn - %s' % err)
continue
# Report app up
try:
# Report app AND Redis as up
redis_is_up = self.redis_conn.setex(skyline_app_thunder_key, 120, now)
if redis_is_up:
try:
self.redis_conn.setex('redis', 120, now)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: could not update the Redis redis key - %s' % (
err))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: could not update the Redis %s key - %s' % (
skyline_app, err))
# Discover unique labelled_metrics
try:
unique_labelled_metrics = list(self.redis_conn_decoded.smembers('labelled_metrics.unique_labelled_metrics'))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: could not get the unique_labelled_metrics list from labelled_metrics.unique_labelled_metrics Redis set - %s' % err)
sleep(10)
continue
unique_labelled_metrics_count = len(unique_labelled_metrics)
if unique_labelled_metrics_count == 0:
logger.info('analyzer_labelled_metrics :: no labelled_metrics in redis try adding some')
sleep(10)
continue
# If there are ANALYZER_SKIP metrics declare shuffle the metrics
# so that the load is distributed evenly between the processes
# rather than 1 process getting all the metrics to skip
if ANALYZER_SKIP and ANALYZER_LABELLED_METRICS_PROCESSES > 1:
logger.info('analyzer_labelled_metrics :: ANALYZER_SKIP set shuffling unique_labelled_metrics')
shuffle(unique_labelled_metrics)
logger.info('analyzer_labelled_metrics :: memory usage before creating metrics_last_timeseries_timestamp_dict - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
metrics_last_timeseries_timestamp_dict = {}
metrics_last_timeseries_timestamp_hash_key = 'analyzer_labelled_metrics.last_timeseries_timestamp'
try:
metrics_last_timeseries_timestamp_dict = self.redis_conn_decoded.hgetall(metrics_last_timeseries_timestamp_hash_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to create metrics_last_timeseries_timestamp_dict from Redis hash key %s - %s' % (
metrics_last_timeseries_timestamp_hash_key, err))
logger.info('analyzer_labelled_metrics :: memory usage after creating metrics_last_timeseries_timestamp_dict - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
# @added 20230517 - Bug #4766: analyzer_labelled_metrics - monotonicity checked incorrect classification
# If any metrics have changed monotonicity update the metric_type
update_labelled_metrics_id_type = False
try:
update_labelled_metrics_id_type = self.redis_conn_decoded.exists('skyline.labelled_metrics.id.type.changed')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: exists failed on Redis key skyline.labelled_metrics.id.type.changed - %s' % (
err))
if update_labelled_metrics_id_type:
logger.info('analyzer_labelled_metrics :: skyline.labelled_metrics.id.type.changed exists reloading skyline.labelled_metrics.id.type to use new metric_type from monotonicity changes')
try:
self.redis_conn_decoded.delete('skyline.labelled_metrics.id.type.changed')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to delete skyline.labelled_metrics.id.type.changed key from Redis - %s' % (
err))
logger.info('analyzer_labelled_metrics :: memory usage before updating labelled_metrics_id_types because skyline.labelled_metrics.id.type.changed exists - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
try:
labelled_metrics_id_types = self.redis_conn_decoded.hgetall('skyline.labelled_metrics.id.type')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall skyline.labelled_metrics.id.type Redis hash key - %s' % (
err))
logger.info('analyzer_labelled_metrics :: memory usage after creating labelled_metrics_id_types - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
errors = []
for metric_id_str in list(labelled_metrics_id_types.keys()):
base_name = None
try:
base_name = metric_ids_with_name[metric_id_str]
except:
continue
metric_type = None
in_labelled_metrics_id_types = False
try:
metric_type_id_str = labelled_metrics_id_types[str(metric_id_str)]
if metric_type_id_str:
in_labelled_metrics_id_types = True
if metric_type_id_str == '1':
metric_type = 'COUNTER'
if metric_type_id_str == '0':
metric_type = 'GAUGE'
except:
metric_type = None
if metric_type:
try:
metrics_and_labels_dict[base_name]['type'] = metric_type
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'adding metric type to metrics_and_labels_dict', str(err_msg)])
try:
metrics_and_labels_dict[base_name]['in_labelled_metrics_id_types'] = in_labelled_metrics_id_types
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'adding in_labelled_metrics_id_types to metrics_and_labels_dict', str(err_msg)])
if len(list(metrics_and_labels_dict.keys())) < unique_labelled_metrics_count:
fetch_new_metrics = True
if no_id_metrics:
fetch_new_metrics = True
if fetch_new_metrics:
try:
metric_names_with_ids = self.redis_conn_decoded.hgetall(metric_names_with_ids_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall aet.metrics_manager.metric_names_with_ids Redis hash key %s - %s' % (
err))
try:
metric_ids_with_name = self.redis_conn_decoded.hgetall(metric_ids_with_metric_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall aet.metrics_manager.active_labelled_ids_with_metric Redis hash key %s - %s' % (
err))
if not metric_ids_with_name:
for base_name in list(metric_names_with_ids.keys()):
metric_id = metric_names_with_ids[base_name]
metric_ids_with_name[metric_id] = base_name
if fetch_new_metrics:
new_metrics = []
metric_type_redis_keys_list = []
errors = []
for labelled_metric in unique_labelled_metrics:
metric_id = labelled_metric.replace('labelled_metrics.', '', 1)
try:
base_name = metric_ids_with_name[str(metric_id)]
except:
continue
# base_name = labelled_metric.replace('labelled_metrics.', '', 1)
metric_dict = {}
try:
metric_dict = metrics_and_labels_dict[base_name]
except:
metric_dict = {}
metric_type = None
try:
metric_type = metric_dict['type']
except:
metric_type = None
if not metric_type:
try:
# metric_dict = metric_name_labels_parser(skyline_app, base_name)
metric_dict = self.metric_name_labels_parser(base_name)
if metric_dict:
try:
metrics_and_labels_dict[base_name] = {}
metrics_and_labels_dict[base_name] = metric_dict
tenant_id = metric_dict['labels']['_tenant_id']
server_id = metric_dict['labels']['_server_id']
metric_type_redis_key = 'metrics_manager.prometheus.metrics_type.%s.%s' % (str(tenant_id), str(server_id))
metric_type_redis_keys_list.append(metric_type_redis_key)
new_metrics.append(base_name)
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
try:
tenant_ids_with_count[tenant_id] += 1
tenant_ids_base_names[tenant_id].append(base_name)
tenant_ids_labelled_metrics[tenant_id].append(labelled_metric)
except:
tenant_ids_with_count[tenant_id] = 1
tenant_ids_base_names[tenant_id] = [base_name]
tenant_ids_labelled_metrics[tenant_id] = [labelled_metric]
except:
pass
except Exception as err:
err_msg = '%s - %s' % (err, traceback.format_exc())
errors.append([labelled_metric, 'metric_name_labels_parser', str(err_msg)])
if errors:
logger.error('error :: analyzer_labelled_metrics :: metric_name_labels_parser encountered %s errors, sample: %s' % (
str(len(errors)), str(errors[0])))
metric_type_redis_keys_list = list(set(metric_type_redis_keys_list))
errors = []
for metric_type_redis_key in metric_type_redis_keys_list:
if metric_type_redis_key not in metric_type_redis_keys:
metric_type_redis_key_dict = None
try:
metric_type_redis_key_dict = self.redis_conn_decoded.hgetall(metric_type_redis_key)
except Exception as err:
err_msg = 'hgetall %s' % metric_type_redis_key
errors.append([metric_type_redis_key, err_msg, str(err)])
metric_type_redis_keys[metric_type_redis_key] = metric_type_redis_key_dict
if errors:
logger.error('error :: analyzer_labelled_metrics :: metric_name_labels_parser encountered %s errors, sample: %s' % (
str(len(errors)), str(errors[0])))
for base_name in new_metrics:
try:
metric_type = metric_type_redis_keys[metric_type_redis_key][metrics_and_labels_dict[base_name]['metric_name']]
if metric_type:
metrics_and_labels_dict[base_name]['type'] = metric_type
except:
# metrics_and_labels_dict[base_name]['type'] = 'COUNTER'
metrics_and_labels_dict[base_name]['type'] = None
logger.info('analyzer_labelled_metrics :: memory usage before creating stationary_metrics_dict - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
stationary_metrics_dict = {}
stationary_metrics_hash_key = 'analyzer_labelled_metrics.stationary_metrics'
try:
stationary_metrics_dict = self.redis_conn_decoded.hgetall(stationary_metrics_hash_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to hgetall %s Redis hash key - %s' % (
stationary_metrics_hash_key, err))
logger.info('analyzer_labelled_metrics :: memory usage after creating stationary_metrics_dict - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
# Discover assigned metrics
keys_per_processor = int(ceil(float(unique_labelled_metrics_count) / float(ANALYZER_LABELLED_METRICS_PROCESSES)))
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
try:
filters_dict = get_tenant_id_mrange_split(self, ANALYZER_LABELLED_METRICS_PROCESSES, tenant_ids_with_count)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: get_tenant_id_mrange_split failed - %s' % err)
filters_dict = {}
logger.info('analyzer_labelled_metrics :: filters_dict: %s' % str(filters_dict))
no_id_metrics = []
# @added 20230123 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
# all_db_base_names = {}
# all_db_base_names_with_ids = {}
# all_db_ids_with_base_names = {}
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
for i_process in range(1, ANALYZER_LABELLED_METRICS_PROCESSES + 1):
if i_process > unique_labelled_metrics_count:
logger.warning('warning :: analyzer_labelled_metrics :: skyline is set for more cores than needed.')
logger.info('analyzer_labelled_metrics :: memory usage before starting process - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
if i_process == ANALYZER_LABELLED_METRICS_PROCESSES:
assigned_max = unique_labelled_metrics_count
else:
assigned_max = min(unique_labelled_metrics_count, i_process * keys_per_processor)
# Fix analyzer worker metric assignment #94
# https://github.com/etsy/skyline/pull/94 @languitar:worker-fix
assigned_min = (i_process - 1) * keys_per_processor
assigned_keys = range(assigned_min, assigned_max)
# Compile assigned metrics
error_logged = False
assigned_metrics = [unique_labelled_metrics[index] for index in assigned_keys]
# @added 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
assigned_filter_metrics = []
filters_list = []
if filters_dict:
filter_key = i_process - 1
try:
filters_list = filters_dict[filter_key]
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to determine filters_dict[%s] from filters_dict - %s' % (
str(filter_key), err))
filters_list = []
if filters_list:
for tenant_id in filters_list:
try:
assigned_filter_metrics = assigned_filter_metrics + tenant_ids_labelled_metrics[tenant_id]
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed metrics for tenant_id %s from tenant_ids_labelled_metrics - %s' % (
str(tenant_id), err))
if assigned_filter_metrics:
assigned_metrics = list(set(assigned_filter_metrics))
del assigned_filter_metrics
del assigned_keys
assigned_metrics_dict = {}
for labelled_metric in assigned_metrics:
try:
# base_name = labelled_metric.replace('labelled_metrics.', '', 1)
# metric_id = None
metric_id = labelled_metric.replace('labelled_metrics.', '', 1)
base_name = None
no_metric_id = True
try:
base_name = metric_ids_with_name[metric_id]
no_metric_id = False
except:
# no_id_metrics.append(metric_id)
# continue
no_metric_id = True
# try:
# metric_id = metric_names_with_ids[base_name]
# no_metric_id = False
# except:
# no_id_metrics.append(metric_id)
# continue
# no_metric_id = True
if no_metric_id:
no_id_metrics.append(metric_id)
continue
try:
last_timestamp = int(str(metrics_last_timeseries_timestamp_dict[metric_id]))
except:
last_timestamp = 0
try:
stationary_str = stationary_metrics_dict[str(metric_id)]
if stationary_str:
stationary_str_elements = stationary_str.split(',')
stationary_value = False
if str(stationary_str_elements[0]) == 'True':
stationary_value = True
stationary_dict = {'v': stationary_value, 'ts': int(stationary_str_elements[1])}
except:
stationary_dict = {'v': None, 'last_ts': 0}
metric_dict = {}
try:
metric_dict = metrics_and_labels_dict[base_name]
except:
metric_dict = {}
assigned_metrics_dict[base_name] = {
'id': metric_id,
'last_ts': last_timestamp,
'stationary': stationary_dict,
'metric_dict': metric_dict,
}
except Exception as err:
if not error_logged:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to add %s details to assigned_metrics_dict - %s' % (
labelled_metric, err))
error_logged = True
del assigned_metrics
try:
# @modified 20230404 - Feature #4890: analyzer_labelled_metrics - use mrange
# Added filters
# p = Process(target=self.labelled_metrics_spin_process, args=(i_process, assigned_metrics_dict))
p = Process(target=self.labelled_metrics_spin_process, args=(i_process, assigned_metrics_dict, filters_list))
pids.append(p)
pid_count += 1
logger.info('analyzer_labelled_metrics :: starting %s of %s labelled_metrics_spin_process/es with filters_list: %s' % (
str(pid_count), str(ANALYZER_LABELLED_METRICS_PROCESSES),
str(filters_list)))
p.start()
started_pid = p.pid
spawned_pids.append(started_pid)
logger.info('analyzer_labelled_metrics :: started labelled_metrics_spin_process with pid %s' % (
str(started_pid)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to spawn process - %s' % err)
del assigned_metrics_dict
logger.info('analyzer_labelled_metrics :: memory usage after starting process - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
del unique_labelled_metrics
del metrics_last_timeseries_timestamp_dict
# del metric_names_with_ids
del stationary_metrics_dict
logger.info('analyzer_labelled_metrics :: memory usage after starting processes - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
if no_id_metrics:
logger.info('analyzer_labelled_metrics :: no_id_metrics count: %s' % (
str(len(no_id_metrics))))
no_id_metrics_key = 'analyzer_labelled_metrics.no_id_metrics.%s' % str(now)
try:
self.redis_conn.sadd(no_id_metrics_key, *set(no_id_metrics))
self.redis_conn.expire(no_id_metrics_key, 300)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to sadd %s Redis set - %s' % (
no_id_metrics_key, err))
# Send wait signal to zombie processes
logger.info('analyzer_labelled_metrics :: started pids: %s' % (
str(spawned_pids)))
p_starts = time()
while time() - p_starts <= MAX_ANALYZER_LABELLED_METRICS_PROCESS_RUNTIME:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('analyzer_labelled_metrics :: %s labelled_metrics_spin_process/es completed in %.2f seconds' % (
str(ANALYZER_LABELLED_METRICS_PROCESSES), time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('analyzer_labelled_metrics :: timed out, killing all labelled_metrics_spin_process processes')
for p in pids:
logger.info('analyzer_labelled_metrics :: killing labelled_metrics_spin_process process')
p.terminate()
# p.join()
logger.info('analyzer_labelled_metrics :: killed labelled_metrics_spin_process process')
for p in pids:
if p.is_alive():
logger.info('analyzer_labelled_metrics :: sending SIGKILL to pid %s labelled_metrics_spin_process - %s' % (
str(p.pid), str(p.is_alive())))
try:
os.kill(p.pid, SIGKILL)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: SIGKILL failed - %s' % err)
# p.join()
# Log the last reported error by any algorithms that errored in the
# spawned processes from algorithms.py
for completed_pid in spawned_pids:
logger.info('analyzer_labelled_metrics :: labelled_metrics_pin_process with pid %s completed' % (str(completed_pid)))
logger.info('analyzer_labelled_metrics :: memory usage after completed processes - %s' % str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))
# Grab data from the queue and populate dictionaries
exceptions = {}
anomaly_breakdown = {}
while 1:
try:
key, value = self.anomaly_breakdown_q.get_nowait()
if key not in anomaly_breakdown:
anomaly_breakdown[key] = value
else:
anomaly_breakdown[key] += value
except Empty:
break
while 1:
try:
key, value = self.exceptions_q.get_nowait()
if key not in exceptions:
exceptions[key] = value
else:
exceptions[key] += value
except Empty:
break
exceptions_metrics = ['Boring', 'Stale', 'TooShort', 'Other', 'EmptyTimeseries', 'NoNewData']
for i_exception in exceptions_metrics:
if i_exception not in exceptions:
exceptions[i_exception] = 0
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
if i_exception == 'TooShort':
try:
self.redis_conn_decoded.delete('aet.analyzer_labelled_metrics.tooshort')
logger.info('deleted Redis set aet.analyzer_labelled_metrics.tooshort as no tooshort exceptions')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to delete Redis set aet.analyzer_labelled_metrics.tooshort - %s' % (
err))
if i_exception == 'Stale':
try:
self.redis_conn_decoded.delete('aet.analyzer_labelled_metrics.stale')
logger.info('deleted Redis set aet.analyzer_labelled_metrics.stale as no stale exceptions')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to delete Redis set aet.analyzer_labelled_metrics.stale - %s' % (
err))
if i_exception == 'Boring':
try:
self.redis_conn_decoded.delete('aet.analyzer_labelled_metrics.boring')
logger.info('deleted Redis set aet.analyzer_labelled_metrics.boring as no boring exceptions')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to delete Redis set aet.analyzer_labelled_metrics.boring - %s' % (
err))
else:
# @added 20230309 - Feature #4864: analyzer_labelled_metrics - exceptions metrics set
if i_exception == 'TooShort':
try:
self.redis_conn_decoded.rename('analyzer_labelled_metrics.tooshort', 'aet.analyzer_labelled_metrics.tooshort')
logger.info('renamed Redis set analyzer_labelled_metrics.tooshort to aet.analyzer_labelled_metrics.tooshort')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to rename Redis set analyzer_labelled_metrics.tooshort to aet.analyzer_labelled_metrics.tooshort - %s' % (
err))
if i_exception == 'Stale':
try:
self.redis_conn_decoded.rename('analyzer_labelled_metrics.stale', 'aet.analyzer_labelled_metrics.stale')
logger.info('renamed Redis set analyzer_labelled_metrics.stale to aet.analyzer_labelled_metrics.stale')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to rename Redis set analyzer_labelled_metrics.stale to aet.analyzer_labelled_metrics.stale - %s' % (
err))
if i_exception == 'Boring':
try:
self.redis_conn_decoded.rename('analyzer_labelled_metrics.boring', 'aet.analyzer_labelled_metrics.boring')
logger.info('renamed Redis set analyzer_labelled_metrics.boring to aet.analyzer_labelled_metrics.boring')
except Exception as err:
logger.error('error :: labelled_metrics_spin_process :: failed to rename Redis set analyzer_labelled_metrics.boring to aet.analyzer_labelled_metrics.boring - %s' % (
err))
# @added 20200603 - Feature #3566: custom_algorithms
anomaly_breakdown_algorithms = list(settings.ALGORITHMS)
if CUSTOM_ALGORITHMS:
for custom_algorithm in settings.CUSTOM_ALGORITHMS:
anomaly_breakdown_algorithms.append(custom_algorithm)
stats = {}
while 1:
try:
key, value = self.stats_q.get_nowait()
if key not in stats:
stats[key] = value
else:
stats[key] += value
except Empty:
break
# @modified 20200603 - Feature #3566: custom_algorithms
# for i_anomaly_breakdown in settings.ALGORITHMS:
for i_anomaly_breakdown in anomaly_breakdown_algorithms:
if i_anomaly_breakdown not in anomaly_breakdown:
anomaly_breakdown[i_anomaly_breakdown] = 0
# Set the anomaly_end_timestamp
not_anomalous_metric_ids = []
not_anomalous_metrics_data = []
redis_set = 'current.anomalies'
try:
current_anomalies = list(self.redis_conn_decoded.smembers(redis_set))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to get Redis set %s - %s' % (redis_set, err))
current_anomalies = []
if current_anomalies:
try:
redis_set = 'analyzer_labelled_metrics.not_anomalous_metrics'
not_anomalous_metrics = self.redis_conn_decoded.smembers(redis_set)
except:
not_anomalous_metrics = []
for item in not_anomalous_metrics:
try:
list_data = literal_eval(item)
not_anomalous_metrics_data.append(list_data)
not_anomalous_metric_ids.append(int(list_data[0]))
except:
pass
for item in current_anomalies:
try:
list_data = literal_eval(str(item))
anomalous_metric = str(list_data[0])
if '_tenant_id="' not in anomalous_metric:
continue
anomaly_timestamp = int(list_data[1])
# @added 20200608 - Feature #3306: Record anomaly_end_timestamp
# Remove entries from the current.anomalies set if the
# timestamp is older than FULL_DURATION
if anomaly_timestamp < (now - settings.FULL_DURATION):
redis_set = 'current.anomalies'
try:
self.redis_conn.srem(redis_set, str(list_data))
logger.info('analyzer_labelled_metrics :: removed %s from Redis set %s as the anomaly_timestamp is older than FULL_DURATION - %s' % (
anomalous_metric, redis_set, str(list_data)))
continue
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to remove %s for Redis set %s - %s' % (
str(list_data), redis_set, err))
try:
anomaly_id = int(list_data[2])
except:
anomaly_id = None
try:
anomaly_end_timestamp = int(list_data[3])
except:
anomaly_end_timestamp = None
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to determine current_anomalies item')
if anomaly_end_timestamp:
continue
if not anomaly_id:
continue
anomalous_metric_id = 0
try:
anomalous_metric_id = get_metric_id_from_base_name(skyline_app, anomalous_metric)
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: get_metric_id_from_base_name failed for %s - %s' % (
str(anomalous_metric), err))
if int(anomalous_metric_id) in not_anomalous_metric_ids:
for metric_id, anomaly_end_timestamp in not_anomalous_metrics_data:
if int(anomalous_metric_id) == int(metric_id):
update_item = False
redis_set = 'current.anomalies'
try:
self.redis_conn.srem(redis_set, str(list_data))
update_item = True
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to remove %s for Redis set %s' % (str(list_data), redis_set))
if update_item:
new_list_data = [anomalous_metric, anomaly_timestamp, anomaly_id, anomaly_end_timestamp]
try:
redis_set = 'current.anomalies'
self.redis_conn.sadd(redis_set, str(new_list_data))
logger.info('analyzer_labelled_metrics :: set anomaly_end_timestamp to %s for %s in Redis set %s' % (
str(anomaly_end_timestamp), anomalous_metric, redis_set))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_labelled_metrics :: failed to add %s to Redis set %s - %s' % (str(new_list_data), redis_set, err))
not_anomalous_count = 0
total_anomalies = 0
redis_set = 'analyzer_labelled_metrics.not_anomalous_metrics'
try:
not_anomalous_count = self.redis_conn_decoded.scard(redis_set)
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: failed to determine not_anomalous_count from %s - %s' % (redis_set, err))
not_anomalous_count = 0
try:
self.redis_conn.delete('aet.analyzer_labelled_metrics.not_anomalous_metrics')
except:
pass
try:
self.redis_conn.rename('analyzer_labelled_metrics.not_anomalous_metrics', 'aet.analyzer_labelled_metrics.not_anomalous_metrics')
except:
pass
redis_set = 'analyzer_labelled_metrics.anomalous_metrics'
try:
total_anomalies = self.redis_conn_decoded.scard(redis_set)
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: failed to determine anomalous_count from %s - %s' % (redis_set, err))
total_anomalies = 0
try:
self.redis_conn.delete('aet.analyzer_labelled_metrics.anomalous_metrics')
except:
pass
try:
self.redis_conn.rename('analyzer_labelled_metrics.anomalous_metrics', 'aet.analyzer_labelled_metrics.anomalous_metrics')
except:
pass
# @added 20230425 - Feature #4894: labelled_metrics - SKYLINE_FEEDBACK_NAMESPACES
# Create busy key to enable feedback_labelled_metrics
if total_anomalies >= 50:
try:
self.redis_conn_decoded.setex('analyzer_labelled_metrics.busy', 120, total_anomalies)
logger.info('analyzer_labelled_metrics :: created analyzer_labelled_metrics.busy key')
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: failed to setex analyzer_labelled_metrics.busy - %s' % err)
not_anomalous_count = 0
total_errors = 0
current_aligned_ts = int(int(p_starts) // 60 * 60)
error_set = 'analyzer_labelled_metrics.errors.%s' % str(current_aligned_ts)
try:
total_errors = self.redis_conn_decoded.scard(error_set)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: labelled_metrics_spin_process :: failed to scard %s Redis set - %s' % (
error_set, err))
total_analyzed = not_anomalous_count + total_anomalies
run_time = time() - p_starts
# Log progress
logger.info('analyzer_labelled_metrics :: seconds to run :: %.2f' % run_time)
logger.info('analyzer_labelled_metrics :: total metrics :: %s' % str(unique_labelled_metrics_count))
logger.info('analyzer_labelled_metrics :: total analyzed :: %s' % total_analyzed)
logger.info('analyzer_labelled_metrics :: total anomalies :: %s' % total_anomalies)
logger.info('analyzer_labelled_metrics :: total errors :: %s' % total_errors)
logger.info('analyzer_labelled_metrics :: exception stats :: %s' % exceptions)
logger.info('analyzer_labelled_metrics :: anomaly breakdown :: %s' % anomaly_breakdown)
logger.info('analyzer_labelled_metrics :: stats :: %s' % stats)
send_metric_name = '%s.labelled_metrics.run_time' % (skyline_app_graphite_namespace)
run_time_str = '%.2f' % run_time
try:
send_graphite_metric(self, skyline_app, send_metric_name, run_time_str)
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, run_time_str, err))
# @added 20230404 - Feature #4888: analyzer - load_shedding
cache_key = 'analyzer_labelled_metrics.run_time'
try:
self.redis_conn_decoded.hset('analyzer_labelled_metrics.run_time', 'value', float(run_time))
self.redis_conn_decoded.hset('analyzer_labelled_metrics.run_time', 'timestamp', int(time()))
except Exception as err:
logger.error('error :: Analyzer could not update the Redis analyzer_labelled_metrics.run_time hash - %s' % (
err))
for stat in list(stats.keys()):
stat_name = stat.replace(' ', '_')
send_metric_name = '%s.labelled_metrics.%s' % (
skyline_app_graphite_namespace, stat_name)
try:
send_graphite_metric(self, skyline_app, send_metric_name, str(stats[stat]))
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(stats[stat]), err))
logger.info('analyzer_labelled_metrics :: sent Graphite %s stats metrics' % str(len(stats)))
for exception in list(exceptions.keys()):
send_metric_name = '%s.labelled_metrics.exceptions.%s' % (
skyline_app_graphite_namespace, exception)
try:
send_graphite_metric(self, skyline_app, send_metric_name, str(exceptions[exception]))
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(exceptions[exception]), err))
logger.info('analyzer_labelled_metrics :: sent Graphite %s exceptions metrics' % str(len(exceptions)))
unique_labelled_metrics = []
mem_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
logger.info('analyzer_labelled_metrics :: memory usage after completed run - %s' % str(mem_usage))
send_metric_name = '%s.labelled_metrics.mem_usage' % skyline_app_graphite_namespace
try:
send_graphite_metric(self, skyline_app, send_metric_name, str(mem_usage))
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(mem_usage), err))
process_runtime = time() - now
analyzer_optimum_run_duration = settings.ANALYZER_OPTIMUM_RUN_DURATION
if process_runtime < analyzer_optimum_run_duration:
sleep_for = (analyzer_optimum_run_duration - process_runtime)
logger.info('analyzer_labelled_metrics :: completed in %.2f seconds, sleeping for %.2f seconds due to low run time...' % (
(time() - now), sleep_for))
sleep(sleep_for)
try:
del sleep_for
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: failed to del sleep_for - %s' % err)
try:
del process_runtime
except Exception as err:
logger.error('error :: analyzer_labelled_metrics :: failed to del process_runtime - %s' % err)