"""
metrics_manager.py
"""
from __future__ import division
import logging
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
from os import kill, getpid
import traceback
from sys import version_info
from sys import exit as sys_exit
from ast import literal_eval
from timeit import default_timer as timer
# @added 20201209 - Feature #3870: metrics_manager - check_data_sparsity
from msgpack import Unpacker
# from collections import Counter
# @added 20201213 - Feature #3890: metrics_manager - sync_cluster_files
import requests
import settings
from skyline_functions import (
# @modified 20201209 - Feature #3870: metrics_manager - check_data_sparsity
# Added send_graphite_metric
get_redis_conn, get_redis_conn_decoded, send_graphite_metric,
# @added 20201213 - Feature #3890: metrics_manager - sync_cluster_files
mkdir_p,
# @added 20210624 - Feature #4150: metrics_manager - roomba batch processing metrics
sort_timeseries)
from matched_or_regexed_in_list import matched_or_regexed_in_list
# @added 20210430 - Task #4030: refactoring
# @modified 20220302 - Feature #4444: webapp - inactive_metrics
# Feature #3828: Add inactive columns to the metrics DB table
# from functions.database.queries.get_all_db_metric_names import get_all_db_metric_names
from functions.database.queries.get_all_active_db_metric_names import get_all_active_db_metric_names
# @added 20210519 - Feature #4076: CUSTOM_STALE_PERIOD
from functions.settings.get_custom_stale_period import custom_stale_period
from functions.thunder.stale_metrics import thunder_stale_metrics
# @added 20210601 - Feature #4000: EXTERNAL_SETTINGS
from functions.settings.manage_external_settings import manage_external_settings
# @added 20210602 - Feature #4076: CUSTOM_STALE_PERIOD
from functions.settings.get_external_settings import get_external_settings
from functions.thunder.manage_thunder_alerted_on_stale_metrics_hash_key import manage_thunder_alerted_on_stale_metrics_hash_key
from functions.redis.prune_metrics_timestamp_hash_key import prune_metrics_timestamp_hash_key
from functions.thunder.no_data import thunder_no_data
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Feature #3870: metrics_manager - check_data_sparsity
# Use common functions
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.timeseries.determine_data_sparsity import determine_data_sparsity
# @added 20220110 - Bug #4364: Prune old thunder.events
# Branch #1444: thunder
from functions.redis.update_set import update_redis_set
# @added 20220128 - Feature #4404: flux - external_settings - aggregation
from functions.settings.external_settings_aggregation import external_settings_aggregation
# @added 20220128 - Feature #4404: flux - external_settings - aggregation
# Feature #4324: flux - reload external_settings
# Feature #4376: webapp - update_external_settings
from functions.flux.reload_flux import reload_flux
# @added 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
from functions.metrics_manager.last_known_value_metrics import last_known_value_metrics
from functions.metrics_manager.zero_fill_metrics import zero_fill_metrics
# @added 20220410 - Task #4514: Integrate opentelemetry
# Feature #4516: flux - opentelemetry traces
from functions.metrics_manager.do_not_alert_on_stale_metrics import do_not_alert_on_stale_metrics
from functions.metrics_manager.non_derivative_metrics import non_derivative_metrics
# @added 20220419 - Feature #4528: metrics_manager - derivative_metric_check
# Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
from functions.metrics_manager.derivative_metrics_check import derivative_metrics_check
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
from functions.metrics_manager.filesafe_names import filesafe_names
# @added 20220420 - Feature #4530: namespace.analysed_events
from functions.metrics_manager.namespace_analysed_events import namespace_analysed_events
# @added 20220426 - Feature #4536: Handle Redis failure
from functions.memcache.get_memcache_key import get_memcache_key
from functions.memcache.set_memcache_key import set_memcache_key
# @added 20220427 - Feature #4536: Handle Redis failure
from functions.metrics_manager.get_flux_namespaces import get_flux_namespaces
skyline_app = 'analyzer'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
# @modified 20190524 - Feature #2882: Mirage - periodic_check
# Branch #3002: docker
# Moved the interpolation of the MIRAGE_PERIODIC_ variables to the top
# of the file out of spawn_process so they can be accessed in run too
# @added 20190408 - Feature #2882: Mirage - periodic_check
# Add Mirage periodic checks so that Mirage is analysing each metric at
# least once per hour.
try:
# @modified 20200606 - Bug #3572: Apply list to settings imports
MIRAGE_PERIODIC_CHECK = list(settings.MIRAGE_PERIODIC_CHECK)
except:
MIRAGE_PERIODIC_CHECK = False
try:
MIRAGE_PERIODIC_CHECK_INTERVAL = settings.MIRAGE_PERIODIC_CHECK_INTERVAL
except:
MIRAGE_PERIODIC_CHECK_INTERVAL = 3600
# @added 20200505 - Feature #2882: Mirage - periodic_check
# Surface this once
try:
# @modified 20200606 - Bug #3572: Apply list to settings imports
mirage_periodic_check_namespaces = list(settings.MIRAGE_PERIODIC_CHECK_NAMESPACES)
except:
mirage_periodic_check_namespaces = []
try:
ANALYZER_ENABLED = settings.ANALYZER_ENABLED
except:
ANALYZER_ENABLED = True
logger.info('warning :: ANALYZER_ENABLED is not declared in settings.py, defaults to True')
# @added 20200528 - Feature #3560: External alert config
try:
EXTERNAL_ALERTS = list(settings.EXTERNAL_ALERTS)
except:
EXTERNAL_ALERTS = {}
# @added 20200602 - Feature #3560: External alert config
if EXTERNAL_ALERTS:
from external_alert_configs import get_external_alert_configs
# @added 20200607 - Feature #3566: custom_algorithms
try:
MIRAGE_ALWAYS_METRICS = list(settings.MIRAGE_ALWAYS_METRICS)
except:
MIRAGE_ALWAYS_METRICS = []
# @added 20200827 - Feature #3708: FLUX_ZERO_FILL_NAMESPACES
try:
FLUX_ZERO_FILL_NAMESPACES = settings.FLUX_ZERO_FILL_NAMESPACES
except:
FLUX_ZERO_FILL_NAMESPACES = []
# @added 20210407 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
try:
FLUX_LAST_KNOWN_VALUE_NAMESPACES = settings.FLUX_LAST_KNOWN_VALUE_NAMESPACES
except:
FLUX_LAST_KNOWN_VALUE_NAMESPACES = []
# @added 20201017 - Feature #3818: ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED
# This was implemented to allow a busy analyzer to offload low priority metrics
# to analyzer_batch, unsuccessfully. It works, but takes loanger and ages
# actually. Being left in as may be workable with a different logic.
try:
ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED = settings.ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED
except:
ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED = False
# Always disable until refactored to work more efficiently if possible
ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED = False
# @added 20201030 - Feature #3812: ANALYZER_ANALYZE_LOW_PRIORITY_METRICS
try:
ANALYZER_ANALYZE_LOW_PRIORITY_METRICS = settings.ANALYZER_ANALYZE_LOW_PRIORITY_METRICS
except:
ANALYZER_ANALYZE_LOW_PRIORITY_METRICS = True
# @added 20201030 - Feature #3808: ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS
try:
ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS = settings.ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS
except:
ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS = False
# @added 20201018 - Feature #3810: ANALYZER_MAD_LOW_PRIORITY_METRICS
try:
ANALYZER_MAD_LOW_PRIORITY_METRICS = settings.ANALYZER_MAD_LOW_PRIORITY_METRICS
except:
ANALYZER_MAD_LOW_PRIORITY_METRICS = 0
# @added 20201030 - Feature #3808: ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS
# Set the default ANALYZER_MAD_LOW_PRIORITY_METRICS to 10 if not set and
# ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS is set.
if ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS:
if not ANALYZER_MAD_LOW_PRIORITY_METRICS:
ANALYZER_MAD_LOW_PRIORITY_METRICS = 10
# Determine all the settings that place Analyzer in a mode to handle low
# priority metrics differently
ANALYZER_MANAGE_LOW_PRIORITY_METRICS = False
if ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED:
ANALYZER_MANAGE_LOW_PRIORITY_METRICS = True
if not ANALYZER_ANALYZE_LOW_PRIORITY_METRICS:
ANALYZER_MANAGE_LOW_PRIORITY_METRICS = True
if ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS:
ANALYZER_MANAGE_LOW_PRIORITY_METRICS = True
if ANALYZER_MAD_LOW_PRIORITY_METRICS:
ANALYZER_MANAGE_LOW_PRIORITY_METRICS = True
low_priority_metrics_hash_key = 'analyzer.low_priority_metrics.last_analyzed_timestamp'
metrics_last_timestamp_hash_key = 'analyzer.metrics.last_analyzed_timestamp'
# @added 20201209 - Feature #3870: metrics_manager - check_data_sparsity
try:
CHECK_DATA_SPARSITY = settings.CHECK_DATA_SPARSITY
except:
CHECK_DATA_SPARSITY = True
try:
inactive_after = settings.FULL_DURATION - settings.METRICS_INACTIVE_AFTER
except:
inactive_after = settings.FULL_DURATION - 3600
# @added 20201210 - Feature #3870: metrics_manager - check_data_sparsity
try:
SKIP_CHECK_DATA_SPARSITY_NAMESPACES = list(settings.SKIP_CHECK_DATA_SPARSITY_NAMESPACES)
except:
SKIP_CHECK_DATA_SPARSITY_NAMESPACES = []
# Declare how often to run metrics_manager is seconds as this is used on some
# Redis key TTLs as well
RUN_EVERY = 300
# @added 20201212 - Feature #3884: ANALYZER_CHECK_LAST_TIMESTAMP
try:
ANALYZER_CHECK_LAST_TIMESTAMP = settings.ANALYZER_CHECK_LAST_TIMESTAMP
except:
ANALYZER_CHECK_LAST_TIMESTAMP = False
# @added 20201213 - Feature #3890: metrics_manager - sync_cluster_files
try:
REMOTE_SKYLINE_INSTANCES = list(settings.REMOTE_SKYLINE_INSTANCES)
except:
REMOTE_SKYLINE_INSTANCES = []
try:
SYNC_CLUSTER_FILES = settings.SYNC_CLUSTER_FILES
except:
SYNC_CLUSTER_FILES = False
try:
FAKE_CLUSTER_SYNC = settings.FAKE_CLUSTER_SYNC
except:
FAKE_CLUSTER_SYNC = False
# @added 20201214 - Feature #3890: metrics_manager - sync_cluster_files
# Feature #3820: HORIZON_SHARDS
try:
HORIZON_SHARDS = settings.HORIZON_SHARDS.copy()
except:
HORIZON_SHARDS = {}
number_of_horizon_shards = 0
HORIZON_SHARD = 0
if HORIZON_SHARDS:
import zlib
number_of_horizon_shards = len(HORIZON_SHARDS)
HORIZON_SHARD = HORIZON_SHARDS[this_host]
# @added 20210202 - Feature #3934: ionosphere_performance
try:
IONOSPHERE_PERFORMANCE_DATA_POPULATE_CACHE = settings.IONOSPHERE_PERFORMANCE_DATA_POPULATE_CACHE
except:
IONOSPHERE_PERFORMANCE_DATA_POPULATE_CACHE = False
try:
IONOSPHERE_PERFORMANCE_DATA_POPULATE_CACHE_DEPTH = int(settings.IONOSPHERE_PERFORMANCE_DATA_POPULATE_CACHE_DEPTH)
except:
IONOSPHERE_PERFORMANCE_DATA_POPULATE_CACHE_DEPTH = 0
# @added 20210406 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
try:
FLUX_AGGREGATE_NAMESPACES = settings.FLUX_AGGREGATE_NAMESPACES.copy()
except:
FLUX_AGGREGATE_NAMESPACES = {}
try:
FLUX_EXTERNAL_AGGREGATE_NAMESPACES = settings.FLUX_EXTERNAL_AGGREGATE_NAMESPACES
except:
FLUX_EXTERNAL_AGGREGATE_NAMESPACES = False
# @added 20210513 - Feature #4068: ANALYZER_SKIP
try:
ANALYZER_SKIP = settings.ANALYZER_SKIP
except:
ANALYZER_SKIP = []
# @added 20210519 - Feature #4076: CUSTOM_STALE_PERIOD
try:
CUSTOM_STALE_PERIOD = settings.CUSTOM_STALE_PERIOD.copy()
except:
CUSTOM_STALE_PERIOD = {}
# @added 20210601 - Feature #4000: EXTERNAL_SETTINGS
try:
EXTERNAL_SETTINGS = settings.EXTERNAL_SETTINGS.copy()
except:
EXTERNAL_SETTINGS = {}
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
full_uniques = '%sunique_metrics' % settings.FULL_NAMESPACE
LOCAL_DEBUG = False
[docs]class Metrics_Manager(Thread):
"""
The Metrics_Manager class which controls the metrics_manager thread and
spawned processes.
All of this functionality was previously done in the Analyzer thread itself
however with 10s of 1000s of metrics, this process can take longer than a
minute to achieve, which would make Analyzer lag. All the original commits
and references from the Analyzer code have been maintained here, although
the logical order has been changed and the blocks ordered in a different,
but more appropriate and efficient manner than they were laid out in Analyzer.
Further some blocks from Analyzer were removed with the new consolidated
methods using sets, they were no longer required.
"""
def __init__(self, parent_pid):
"""
Initialize the Metrics_Manager
"""
super(Metrics_Manager, self).__init__()
try:
self.redis_conn = get_redis_conn(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_redis_conn - failed - %s' % (
err))
self.redis_conn = None
try:
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_redis_conn_decoded - failed - %s' % (
err))
self.redis_conn_decoded = None
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
sys_exit(0)
# @added 20201214 - Feature #3890: metrics_manager - sync_cluster_files
# Feature #3820: HORIZON_SHARDS
[docs] def assigned_to_shard(self, metric_name):
"""
Determine which shard a metric is assigned to.
"""
assigned_host = this_host
if HORIZON_SHARDS:
try:
metric_as_bytes = str(metric_name).encode()
value = zlib.adler32(metric_as_bytes)
modulo_result = value % number_of_horizon_shards
for shard_hostname in HORIZON_SHARDS:
if HORIZON_SHARDS[shard_hostname] == modulo_result:
assigned_host = shard_hostname
except Exception as e:
logger.error('error :: metrics_manager :: failed to determine what shard %s is assigned to via modulo and HORIZON_SHARDS: %s' % (str(metric_name), e))
return assigned_host
[docs] def get_remote_data(self, remote_skyline_instance, data_required, endpoint, save_file=False):
try:
connect_timeout = int(settings.GRAPHITE_CONNECT_TIMEOUT)
read_timeout = int(settings.GRAPHITE_READ_TIMEOUT)
except:
connect_timeout = 5
read_timeout = 10
use_timeout = (int(connect_timeout), int(read_timeout))
data = []
r = None
user = None
password = None
use_auth = False
try:
user = str(remote_skyline_instance[1])
password = str(remote_skyline_instance[2])
use_auth = True
except:
user = None
password = None
logger.info('metrics_manager :: get_remote_data - querying %s for %s on %s' % (
str(remote_skyline_instance[0]), str(data_required), str(endpoint)))
try:
url = '%s/%s' % (str(remote_skyline_instance[0]), endpoint)
if use_auth:
r = requests.get(url, timeout=use_timeout, auth=(user, password))
else:
r = requests.get(url, timeout=use_timeout)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_remote_data - failed to get %s from %s' % (
str(endpoint), str(remote_skyline_instance[0])))
if not r:
logger.warning('warning :: metrics_manager :: get_remote_data - no r from %s on %s' % (
endpoint, str(remote_skyline_instance[0])))
return data
if r:
if r.status_code != 200:
logger.error('error :: metrics_manager :: get_remote_data - %s from %s responded with status code %s and reason %s' % (
endpoint, str(remote_skyline_instance[0]), str(r.status_code), str(r.reason)))
if save_file and r.status_code == 200:
file_saved = False
try:
open(save_file, 'wb').write(r.content)
if not os.path.isfile(save_file):
logger.error('error :: metrics_manager :: get_remote_data - failed to save_file %s from %s' % (
str(save_file), str(remote_skyline_instance[0])))
else:
file_saved = True
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_remote_data - failed to get %s from %s' % (
endpoint, str(remote_skyline_instance[0])))
return file_saved
js = None
try:
js = r.json()
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_remote_data - failed to get json from the response from %s on %s' % (
endpoint, str(remote_skyline_instance)))
if js:
logger.info('metrics_manager :: get_remote_data - got response for %s from %s' % (
str(data_required), str(remote_skyline_instance[0])))
try:
data = js['data'][data_required]
logger.debug('metrics_manager :: debug :: get_remote_data - response for %s has %s items' % (
str(data_required), str(len(data))))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_remote_data - failed to build remote_training_data from %s on %s' % (
str(data_required), str(remote_skyline_instance)))
return data
# @added 20201213 - Feature #3890: metrics_manager - sync_cluster_files
[docs] def sync_cluster_files(self, i):
"""
Fetch any missing training_data and features_profiles directories and
files from REMOTE_SKYLINE_INSTANCES
"""
spin_start = time()
logger.info('metrics_manager :: sync_cluster_files started')
local_skyline_instance = None
for remote_skyline_instance in REMOTE_SKYLINE_INSTANCES:
# @added 20201213 - Feature #3890: metrics_manager - sync_cluster_files
# if remote_skyline_instance == settings.SKYLINE_URL:
if remote_skyline_instance[3] == this_host:
local_skyline_instance = remote_skyline_instance
if FAKE_CLUSTER_SYNC:
logger.info('metrics_manager :: sync_cluster_files FAKE_CLUSTER_SYNC - True')
skyline_url = 'http://%s:%s' % (settings.WEBAPP_IP, str(settings.WEBAPP_PORT))
if settings.WEBAPP_AUTH_ENABLED:
local_skyline_instance = [
skyline_url, settings.WEBAPP_AUTH_USER,
settings.WEBAPP_AUTH_USER_PASSWORD, this_host
]
else:
local_skyline_instance = [skyline_url, None, None, this_host]
if not local_skyline_instance:
logger.info('metrics_manager :: sync_cluster_files cound not determine local_skyline_instance from REMOTE_SKYLINE_INSTANCES, created')
skyline_url = 'http://%s:%s' % (settings.WEBAPP_IP, str(settings.WEBAPP_PORT))
if settings.WEBAPP_AUTH_ENABLED:
local_skyline_instance = [
skyline_url, settings.WEBAPP_AUTH_USER,
settings.WEBAPP_AUTH_USER_PASSWORD, this_host
]
else:
local_skyline_instance = [skyline_url, None, None, this_host]
# Check training data on REMOTE_SKYLINE_INSTANCES and determine what
# training_data needs to be fetched
training_data_raw = []
try:
training_data_raw = list(self.redis_conn_decoded.smembers('ionosphere.training_data'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to generate a list from ionosphere.training_data Redis set')
training_data_raw = None
training_data = []
if training_data_raw:
for training_data_str in training_data_raw:
try:
training_data_item = literal_eval(training_data_str)
training_metric = str(training_data_item[0])
training_timestamp = int(training_data_item[1])
training_data.append([training_metric, training_timestamp])
except Exception as e:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to interpolate training data: %s' % e)
training_data_raw = None
training_data_already_present = 0
training_data_to_fetch = 0
training_data_fetched = 0
# Rate limit this so that a new instance becomes eventually consistent
# add does not thunder against all the other Skyline instances to
# populate itself
max_training_data_to_fetch = 30
for remote_skyline_instance in REMOTE_SKYLINE_INSTANCES:
if training_data_fetched >= max_training_data_to_fetch:
logger.warning('warning :: metrics_manager :: fetched training data has reached the limit of %s, not continuing to fetch more this run' % str(max_training_data_to_fetch))
break
remote_training_data = []
data_required = 'metrics'
endpoint = 'api?training_data'
save_file = False
try:
remote_training_data = self.get_remote_data(remote_skyline_instance, data_required, endpoint, save_file)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
endpoint, str(remote_skyline_instance[0])))
remote_training_data_to_fetch = []
training_data_already_fetched = 0
if remote_training_data:
logger.info('metrics_manager :: sync_cluster_files - got %s %s from %s' % (
str(len(remote_training_data)), str(data_required), str(remote_skyline_instance[0])))
for remote_training_data_item in remote_training_data:
if remote_training_data_item in training_data:
training_data_already_present += 1
continue
training_data_to_fetch += 1
remote_training_data_to_fetch.append(remote_training_data_item)
logger.info('metrics_manager :: sync_cluster_files - %s training_data dirs from %s already present locally' % (
str(training_data_already_present), str(remote_skyline_instance[0])))
logger.info('metrics_manager :: sync_cluster_files - %s training_data dirs to fetch from %s' % (
str(len(remote_training_data_to_fetch)), str(remote_skyline_instance[0])))
if remote_training_data_to_fetch:
for fetch_item in remote_training_data_to_fetch:
cache_key_exists = False
try:
cache_key = 'analyzer.metrics_manager.training_data_fetched.%s.%s' % (
str(fetch_item[0]), str(fetch_item[1]))
cache_key_exists = self.redis_conn.get(cache_key)
except:
pass
if cache_key_exists:
training_data_already_fetched += 1
continue
remote_training_data_files = []
try:
data_required = 'files'
save_file = False
endpoint = 'ionosphere_files?source=training_data&metric=%s×tamp=%s' % (
str(fetch_item[0]), str(fetch_item[1]))
remote_training_data_files = self.get_remote_data(remote_skyline_instance, data_required, endpoint, save_file)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
endpoint, str(remote_skyline_instance[0])))
if not remote_training_data_files:
logger.debug('metrics_manager :: debug :: sync_cluster_files - no training_data files were returned from %s' % (
str(remote_skyline_instance[0])))
if remote_training_data_files:
files_to_fetch = len(remote_training_data_files)
files_fetched = 0
files_exist = 0
logger.info('metrics_manager :: sync_cluster_files - %s training_data files to fetch from %s' % (
str(files_to_fetch),
str(remote_skyline_instance[0])))
for remote_file in remote_training_data_files:
try:
data_file = remote_training_data_files[remote_file]
data_dir = os.path.dirname(data_file)
if not os.path.exists(data_dir):
mkdir_p(data_dir)
if not os.path.exists(data_dir):
logger.error('error :: metrics_manager :: sync_cluster_files - failed to create dir - %s' % data_dir)
continue
data_required = 'file_saved'
endpoint = 'ionosphere_file?file=%s' % str(data_file)
file_saved = None
if not os.path.isfile(data_file):
file_saved = self.get_remote_data(remote_skyline_instance, data_required, endpoint, data_file)
if not file_saved:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
data_file, str(remote_skyline_instance[0])))
continue
logger.info('metrics_manager :: sync_cluster_files - got %s from %s' % (
data_file, str(remote_skyline_instance[0])))
files_fetched += 1
else:
if LOCAL_DEBUG:
logger.info('metrics_manager :: sync_cluster_files - file exists locally nothing to do %s' % (
data_file))
files_exist += 1
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
endpoint, str(remote_skyline_instance[0])))
if files_fetched == files_to_fetch:
training_data_fetched += 1
try:
cache_key = 'analyzer.metrics_manager.training_data_fetched.%s.%s' % (
str(fetch_item[0]), str(fetch_item[1]))
self.redis_conn.setex(cache_key, 900, int(time()))
logger.info('created Redis key - %s' % (cache_key))
except:
pass
if files_exist == files_to_fetch:
logger.info('metrics_manager :: sync_cluster_files - all %s files to fetch exist locally, nothing to do' % (
str(files_to_fetch)))
try:
cache_key = 'analyzer.metrics_manager.training_data_fetched.%s.%s' % (
str(fetch_item[0]), str(fetch_item[1]))
self.redis_conn.setex(cache_key, 900, int(time()))
logger.info('created Redis key - %s' % (cache_key))
except:
pass
if (files_fetched + files_exist) != files_to_fetch:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get all %s training_data files for %s from %s' % (
str(files_to_fetch), str(fetch_item), str(remote_skyline_instance[0])))
logger.info('metrics_manager :: sync_cluster_files - %s training_data dirs already fetched and present locally' % (
str(training_data_already_fetched)))
# Check what features_profiles directories need to be retrieved from
# REMOTE_SKYLINE_INSTANCES and saved locally
logger.info('metrics_manager :: checking for expected_features_profiles_dir')
expected_features_profile_dirs = {}
try:
endpoint = 'api?expected_features_profiles_dir'
data_required = 'features_profile_dirs'
save_file = False
expected_features_profile_dirs = self.get_remote_data(local_skyline_instance, data_required, endpoint, save_file)
logger.info('metrics_manager :: %s entries in the expected_features_profiles_dir response' % str(len(expected_features_profile_dirs)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
endpoint, str(local_skyline_instance[0])))
expected_features_profile_dirs = {}
local_features_profile_dirs = {}
try:
local_features_profile_dirs = self.redis_conn_decoded.hgetall('analyzer.metrics_manager.local_features_profile_dirs')
logger.info('metrics_manager :: %s entries in the analyzer.metrics_manager.local_features_profile_dirs Redis hash key' % str(len(local_features_profile_dirs)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key analyzer.metrics_manager.local_features_profile_dirs')
local_features_profile_dirs = {}
get_features_profile_dirs = {}
if len(local_features_profile_dirs) == len(expected_features_profile_dirs):
logger.info('metrics_manager :: all %s expected features_profile_dirs are present locally' % str(len(expected_features_profile_dirs)))
else:
for expected_features_profile_dir in expected_features_profile_dirs:
dir_known = None
if local_features_profile_dirs:
try:
dir_known = local_features_profile_dirs[expected_features_profile_dir]
except:
dir_known = False
if dir_known:
continue
try:
expected_dir = expected_features_profile_dirs[expected_features_profile_dir]
if os.path.exists(expected_dir):
try:
self.redis_conn.hset(
'analyzer.metrics_manager.local_features_profile_dirs',
expected_features_profile_dir, expected_dir)
logger.info('metrics_manager :: %s exists locally, added to analyzer.metrics_manager.local_features_profile_dirs' % str(expected_dir))
continue
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.local_features_profile_dirs for - %s' % str(expected_dir))
except Exception as e:
logger.error('error :: metrics_manager :: failed to test expected_dir: %s' % e)
try:
get_features_profile_dirs[expected_features_profile_dir] = expected_features_profile_dirs[expected_features_profile_dir]
except Exception as e:
logger.error('error :: metrics_manager :: failed to add to get_features_profile_dirs: %s' % e)
logger.info('metrics_manager :: set to get %s features_profile_dirs not present locally' % str(len(get_features_profile_dirs)))
# Add any entires that need to be checked for an update
local_features_profile_dirs_to_update = {}
try:
local_features_profile_dirs_to_update = self.redis_conn_decoded.hgetall('analyzer.metrics_manager.local_features_profile_dirs.to_update')
logger.info('metrics_manager :: %s entries in the analyzer.metrics_manager.local_features_profile_dirs.to_update Redis hash key' % str(len(local_features_profile_dirs_to_update)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key analyzer.metrics_manager.local_features_profile_dirs.to_update')
local_features_profile_dirs_to_update = {}
if local_features_profile_dirs_to_update:
for local_features_profile_dir_to_update in local_features_profile_dirs_to_update:
try:
get_features_profile_dirs[local_features_profile_dir_to_update] = local_features_profile_dirs_to_update[local_features_profile_dir_to_update]
except Exception as e:
logger.error('error :: metrics_manager :: failed to add to get_features_profile_dirs: %s' % e)
# Rate limit this so that a new instance becomes eventually consistent
# add does not thunder against all the other Skyline instances to
# populate itself
max_fps_to_fetch = 30
fps_fetched = 0
if get_features_profile_dirs:
for fp_id in get_features_profile_dirs:
if fps_fetched >= max_fps_to_fetch:
logger.warning('warning :: metrics_manager :: get_features_profile_dirs has reached the limit of %s, not continuing to fetch more this run' % str(max_fps_to_fetch))
break
features_profile_dir = None
endpoint = None
try:
features_profile_dir = get_features_profile_dirs[fp_id]
features_profile_dir_elements = features_profile_dir.split('/')
timestamp = features_profile_dir_elements[-1]
metric_and_timestamp_path = features_profile_dir.replace(settings.IONOSPHERE_PROFILES_FOLDER, '')
metric_and_timestamp_path_elements = metric_and_timestamp_path.split('/')
metric = None
for element in metric_and_timestamp_path_elements:
if element != timestamp:
if metric:
metric = '%s.%s' % (metric, str(element))
else:
metric = str(element)
endpoint = 'ionosphere_files?source=features_profiles&metric=%s×tamp=%s' % (
str(metric), str(timestamp))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to determine endpoint related to fp_id - %s' % (
str(fp_id)))
features_profile_files = {}
remote_skyline_instance = False
USE_REMOTE_SKYLINE_INSTANCES = []
for remote_skyline_instance in REMOTE_SKYLINE_INSTANCES:
USE_REMOTE_SKYLINE_INSTANCES.append(remote_skyline_instance)
if not USE_REMOTE_SKYLINE_INSTANCES and FAKE_CLUSTER_SYNC:
USE_REMOTE_SKYLINE_INSTANCES.append(local_skyline_instance)
# @added 20201214 - Feature #3890: metrics_manager - sync_cluster_files
# Feature #3820: HORIZON_SHARDS
# Determine the authorative REMOTE_SKYLINE_INSTANCES which is
# assigned to metric and query it first.
authorative_host = None
if REMOTE_SKYLINE_INSTANCES:
try:
authorative_host = self.assigned_to_shard(metric)
except Exception as e:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to determine authorative_host for %s - %s' % (
str(metric), e))
if authorative_host:
USE_REMOTE_SKYLINE_INSTANCES = []
for remote_skyline_instance in REMOTE_SKYLINE_INSTANCES:
try:
if remote_skyline_instance[3] == authorative_host:
USE_REMOTE_SKYLINE_INSTANCES.append(remote_skyline_instance)
except Exception as e:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to the remote_skyline_instance list for the authorative_host - %s - %s' % (
str(authorative_host), e))
for remote_skyline_instance in REMOTE_SKYLINE_INSTANCES:
try:
if remote_skyline_instance[3] != authorative_host:
USE_REMOTE_SKYLINE_INSTANCES.append(remote_skyline_instance)
except Exception as e:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to the secondary remote_skyline_instance list for the authorative_host - %s - %s' % (
str(authorative_host), e))
for remote_skyline_instance in USE_REMOTE_SKYLINE_INSTANCES:
if endpoint:
data_required = 'files'
save_file = False
host_features_profile_files = {}
try:
host_features_profile_files = self.get_remote_data(remote_skyline_instance, data_required, endpoint, save_file)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
endpoint, str(remote_skyline_instance[0])))
host_features_profile_files = {}
if host_features_profile_files:
for features_profile_file in host_features_profile_files:
known_features_profile_file = False
try:
known_features_profile_file = features_profile_files[features_profile_file]
except:
known_features_profile_file = None
if not known_features_profile_file:
try:
features_profile_files[features_profile_file] = host_features_profile_files[features_profile_file]
except Exception as e:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to add features_profile_file %s to features_profile_files - %s' % (
str(features_profile_file), e))
if len(features_profile_files) > 0:
logger.info('%s features profile files found to download from %s to %s' % (
str(len(features_profile_files)), remote_skyline_instance[0],
features_profile_dir))
# break
else:
logger.info('%s features profile files found to download from %s to %s' % (
str(len(features_profile_files)), remote_skyline_instance[0],
features_profile_dir))
if features_profile_files and FAKE_CLUSTER_SYNC:
features_profile_files = {}
logger.info('FAKE_CLUSTER_SYNC reseting features_profile_files with %s items found' % str(len(features_profile_files)))
if FAKE_CLUSTER_SYNC and not features_profile_files:
logger.info('FAKE_CLUSTER_SYNC no features_profile_files found')
if FAKE_CLUSTER_SYNC:
try:
self.redis_conn.hset(
'analyzer.metrics_manager.local_features_profile_dirs',
# expected_features_profile_dir, expected_dir)
features_profile_dir, expected_dir)
logger.info('metrics_manager :: %s exists locally, added to analyzer.metrics_manager.local_features_profile_dirs' % expected_dir)
continue
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.local_features_profile_dirs for - %s' % str(expected_dir))
features_profile_files = {}
continue
if features_profile_files:
files_to_fetch = len(features_profile_files)
files_fetched = 0
files_present = 0
logger.info('metrics_manager :: sync_cluster_files - %s features_profile files to fetch for fp_id %s from %s' % (
str(files_to_fetch), str(fp_id),
str(remote_skyline_instance[0])))
data_required = 'file_saved'
for remote_file in features_profile_files:
try:
data_file = features_profile_files[remote_file]
data_dir = os.path.dirname(data_file)
if not os.path.exists(data_dir):
mkdir_p(data_dir)
if not os.path.exists(data_dir):
logger.error('error :: metrics_manager :: sync_cluster_files - failed to create dir - %s' % data_dir)
continue
endpoint = 'ionosphere_file?file=%s' % str(data_file)
# Only fetch it if it does not exist
if not os.path.isfile(data_file):
data_required = 'file_saved'
file_saved = False
file_saved = self.get_remote_data(remote_skyline_instance, data_required, endpoint, data_file)
if not file_saved:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
data_file, str(remote_skyline_instance[0])))
continue
logger.info('metrics_manager :: sync_cluster_files - got %s from %s' % (
data_file, str(remote_skyline_instance[0])))
files_fetched += 1
else:
files_present += 1
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get %s from %s' % (
endpoint, str(remote_skyline_instance[0])))
if (files_fetched + files_present) == files_to_fetch:
logger.info('metrics_manager :: sync_cluster_files - got all %s features_profile files that needed to be fetch (%s were already present) for %s from %s' % (
str(files_to_fetch), str(files_present), str(fp_id),
str(remote_skyline_instance[0])))
try:
self.redis_conn.hset(
'analyzer.metrics_manager.local_features_profile_dirs',
# expected_features_profile_dir, expected_dir)
features_profile_dir, expected_dir)
logger.info('metrics_manager :: %s features profile dir exists locally, added to analyzer.metrics_manager.local_features_profile_dirs' % str(expected_dir))
continue
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.local_features_profile_dirs for - %s' % str(expected_dir))
try:
self.redis_conn.hset(
'analyzer.metrics_manager.local_features_profile_dirs.to_update',
# expected_features_profile_dir, expected_dir)
features_profile_dir, expected_dir)
logger.info('metrics_manager :: %s features profile, added to analyzer.metrics_manager.local_features_profile_dirs.to_update' % str(expected_dir))
continue
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.local_features_profile_dirs for - %s' % str(expected_dir))
in_local_features_profile_dirs_to_update = False
try:
for local_features_profile_dir_to_update in local_features_profile_dirs_to_update:
# if local_features_profile_dir_to_update == expected_features_profile_dir:
if local_features_profile_dir_to_update == features_profile_dir:
in_local_features_profile_dirs_to_update = True
try:
self.redis_conn.hdel(
'analyzer.metrics_manager.local_features_profile_dirs.to_update',
# expected_features_profile_dir, expected_dir)
features_profile_dir, expected_dir)
logger.info('metrics_manager :: %s features profile, removed from analyzer.metrics_manager.local_features_profile_dirs.to_update' % str(expected_dir))
continue
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.local_features_profile_dirs for - %s' % str(expected_dir))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine if entry should be remove from in analyzer.metrics_manager.local_features_profile_dirs.to_update')
if not in_local_features_profile_dirs_to_update:
try:
self.redis_conn.hset(
'analyzer.metrics_manager.local_features_profile_dirs.to_update',
# expected_features_profile_dir, expected_dir)
features_profile_dir, expected_dir)
logger.info('metrics_manager :: %s features profile, added to analyzer.metrics_manager.local_features_profile_dirs.to_update' % str(expected_dir))
continue
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.local_features_profile_dirs for - %s' % str(expected_dir))
else:
logger.error('error :: metrics_manager :: sync_cluster_files - failed to get all %s features_profile files for %s from %s' % (
str(files_to_fetch), str(fp_id), str(remote_skyline_instance[0])))
if files_fetched > 0:
fps_fetched += 1
logger.info('metrics_manager :: sync_cluster_files:: features_profiles_dir done')
# @added 20220223 - Feature #4464: flux - quota - cluster_sync
# Feature #4400: flux - quota
# Sync flux.quota.namespace_metrics keys between cluster nodes. Added
# into sync_cluster_files process as it is cluster related.
# The ancillary of this function, to remove metrics from a
# flux.quota.namespace_metrics key is handled by
# Feature #4468: flux - remove_namespace_quota_metrics which is done
# using functions.flux.remove_namespace_quota_metrics via the webapp
# api endpoint.
file_sync_spin_end = time()
file_sync_duration = file_sync_spin_end - spin_start
logger.info('metrics_manager :: sync_cluster_files file sync took %.2f seconds' % file_sync_duration)
metrics_manager_flux_namespace_quotas = {}
try:
metrics_manager_flux_namespace_quotas = self.redis_conn_decoded.hgetall('metrics_manager.flux.namespace_quotas')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files :: failed to hgetall metrics_manager.flux.namespace_quotas - %s' % err)
skyline_url = 'http://%s:%s' % (settings.WEBAPP_IP, str(settings.WEBAPP_PORT))
if settings.WEBAPP_AUTH_ENABLED:
local_skyline_instance = [
skyline_url, settings.WEBAPP_AUTH_USER,
settings.WEBAPP_AUTH_USER_PASSWORD, this_host
]
else:
local_skyline_instance = [skyline_url, None, None, this_host]
# If there are flux quotas then sync the quota keys
if metrics_manager_flux_namespace_quotas:
logger.info('metrics_manager :: sync_cluster_files :: updating flux.quota.namespace_metrics Redis keys with cluster data')
for parent_namespace in list(metrics_manager_flux_namespace_quotas.keys()):
namespace_quota_key = 'flux.quota.namespace_metrics.%s' % parent_namespace
endpoint = 'redis_data&type=set&key=%s&cluster_data=true' % namespace_quota_key
save_file = False
data = []
try:
data = self.get_remote_data(local_skyline_instance, namespace_quota_key, endpoint, save_file)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files :: failed to get %s from %s - %s' % (
endpoint, str(local_skyline_instance[0]), err))
if data:
# Fetch the current Redis data to determine if any new metrics
# need to be added
original_key_data = []
try:
original_key_data = list(self.redis_conn.smembers(namespace_quota_key))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files :: failed to add multiple members to the %s Redis set - %s' % (
namespace_quota_key, err))
if original_key_data:
# Compare the current Redis data with the data from the
# other nodes and only add missing metrics to the set to reduce
# the sadd operation to the lowest N
try:
original_key_data_set = set(list(original_key_data))
data_set = set(list(data))
set_difference = data_set.difference(original_key_data_set)
data = list(set_difference)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files :: failed to determine metrics to remove from mirage.hash_key.metrics_resolutions')
if data:
try:
self.redis_conn.sadd(namespace_quota_key, *set(data))
logger.info('metrics_manager :: sync_cluster_files :: updated the %s Redis set with %s new metrics' % (
namespace_quota_key, str(len(data))))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files :: failed to add multiple members to the %s Redis set - %s' % (
namespace_quota_key, err))
if metrics_manager_flux_namespace_quotas:
quota_sync_end = time()
quota_sync_duration = quota_sync_end - file_sync_spin_end
logger.info('metrics_manager :: sync_cluster_files :: cluster quota sync of flux.quota.namespace_metrics Redis keys took %.2f seconds' % quota_sync_duration)
spin_end = time() - spin_start
logger.info('metrics_manager :: sync_cluster_files took %.2f seconds' % spin_end)
return
[docs] def metric_management_process(self, i):
"""
Create and manage the required lists and Redis sets
"""
spin_start = time()
logger.info('metrics_manager :: metric_management_process started')
if not self.redis_conn or not self.redis_conn_decoded:
logger.warning('metrics_manager :: no redis connection, returning')
return
last_run_timestamp = 0
try:
last_run_timestamp = self.redis_conn_decoded.get('analyzer.metrics_manager.last_run_timestamp')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from %s Redis set' % full_uniques)
last_run_timestamp = 0
if last_run_timestamp:
logger.info('metrics_manager :: analyzer.metrics_manager.last_run_timestamp Redis key has not expired, not running')
return
unique_metrics = []
try:
unique_metrics = list(self.redis_conn_decoded.smembers(full_uniques))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from %s Redis set' % full_uniques)
unique_metrics = []
# Check if this process is unnecessary
if len(unique_metrics) == 0:
logger.error('error :: metrics_manager :: there are no metrics in %s Redis set' % full_uniques)
return
# @added 20220426 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
try:
success = set_memcache_key(skyline_app, full_uniques, unique_metrics)
if success:
logger.info('metrics_manager :: set memcache %s key' % full_uniques)
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set %s - %s' % (
full_uniques, err))
####
# Check whether any alert settings or metrics have been changed, added
# or removed. If so do a full refresh.
####
refresh_redis_alert_sets = False
# @added 20220211 - Feature #4284: flux - telegraf
# Create a list of parent_namespaces
all_parent_namespaces = []
####
# Create a list of base_names from the unique_metrics
####
# @added 20200723 - Feature #3560: External alert config
# Speed this up only check alerters if not already in the set
unique_base_names = []
logger.info('metrics_manager :: creating unique_base_names list')
for metric_name in unique_metrics:
# @added 20191014 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
if python_version == 3:
metric_name = str(metric_name)
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
if metric_name.startswith(settings.FULL_NAMESPACE):
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric_name
# @added 20200723 - Feature #3560: External alert config
# Speed this up only check alerters if not already in the set
# metric_in_smtp_alerters_set = False
unique_base_names.append(base_name)
# @added 20220211 - Feature #4284: flux - telegraf
# Create a list of parent_namespaces
all_parent_namespaces.append(base_name.split('.')[0])
logger.info('metrics_manager :: created unique_base_names list of %s metrics' % str(len(unique_base_names)))
# @added 20220211 - Feature #4284: flux - telegraf
# Create a list of parent_namespaces
all_parent_namespaces = list(set(all_parent_namespaces))
logger.info('metrics_manager :: created all_parent_namespaces list of %s top level namespaces' % str(len(all_parent_namespaces)))
# @added 20210308 - Feature #3978: luminosity - classify_metrics
# Feature #3642: Anomaly type classification
# Make a Redis set of unique_base_names for other apps to use
if unique_base_names:
unique_base_names = list(set(unique_base_names))
logger.info('metrics_manager :: recreating the analyzer.unique_base_names set')
try:
self.redis_conn.sadd('new_analyzer.unique_base_names', *set(unique_base_names))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the new_analyzer.unique_base_names Redis set')
try:
self.redis_conn.delete('new_analyzer.unique_base_names.old')
except:
pass
try:
self.redis_conn.rename('analyzer.unique_base_names', 'analyzer.unique_base_names.old')
except:
pass
try:
self.redis_conn.rename('new_analyzer.unique_base_names', 'analyzer.unique_base_names')
except:
pass
try:
self.redis_conn.delete('unique_base_names.old')
except:
pass
try:
self.redis_conn.sunionstore('aet.analyzer.unique_base_names', 'analyzer.unique_base_names')
logger.info('metrics_manager :: copied Redis set analyzer.unique_base_names to aet.analyzer.unique_base_names via sunion')
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to copy Redis set analyzer.unique_base_names to aet.analyzer.unique_base_names via sunion')
manage_flux_aggregate_namespaces_redis_key = 'metrics_manager.manage_flux_aggregate_namespaces'
# @added 20220128 - Feature #4404: flux - external_settings - aggregation
# Feature #4324: flux - reload external_settings
# Feature #4376: webapp - update_external_settings
external_settings_updated = None
do_reload_flux = False
try:
external_settings_updated = self.redis_conn_decoded.get('skyline.external_settings.update.metrics_manager')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager ::: could not get skyline.external_settings.update.metrics_manager from Redis, err: %s' % err)
if external_settings_updated:
do_reload_flux = True
logger.info('metrics_manager :: set to reload_flux as skyline.external_settings.update.metrics_manager is present')
logger.info('metrics_manager :: removing Redis key metrics_manager.manage_flux_aggregate_namespaces to refresh')
try:
self.redis_conn.delete(manage_flux_aggregate_namespaces_redis_key)
except Exception as err:
if LOCAL_DEBUG:
logger.error('error :: metrics_manager :: could not delete Redis key metrics_manager.manage_flux_aggregate_namespaces: %s' % str(err))
#####
# Check whether any internal or external alert settings have been changed
# if so do a full refresh
####
# @added 20220410 - Feature #3560: External alert config
# This handles a metrics_manager key rather than an analyzer key to
# trigger refreshing on
metrics_manager_last_all_alerts = None
try:
metrics_manager_last_all_alerts_str = self.redis_conn_decoded.get('metrics_manager.last_all_alerts')
if metrics_manager_last_all_alerts_str:
metrics_manager_last_all_alerts = literal_eval(metrics_manager_last_all_alerts_str)
# A normal sorted nor set can be used as the list has dicts in it
metrics_manager_last_all_alerts = sorted(metrics_manager_last_all_alerts, key=lambda item: item[0])
logger.info('metrics_manager :: metrics_manager_last_all_alerts from metrics_manager.last_all_alerts Redis set has %s items' % (
str(len(metrics_manager_last_all_alerts))))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from the metrics_manager.last_all_alerts Redis key')
metrics_manager_last_all_alerts = None
# @added 20200528 - Feature #3560: External alert config
external_alerts = {}
external_from_cache = None
internal_alerts = {}
internal_from_cache = None
all_alerts = list(settings.ALERTS)
all_from_cache = None
if EXTERNAL_ALERTS:
try:
external_alerts, external_from_cache, internal_alerts, internal_from_cache, all_alerts, all_from_cache = get_external_alert_configs(skyline_app)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: could not determine external alert configs')
logger.info('metrics_manager :: retrieved %s external_alerts configurations from_cache %s, %s internal_alerts from_cache %s and %s all_alerts from_cache %s' % (
str(len(external_alerts)), str(external_from_cache),
str(len(internal_alerts)), str(internal_from_cache),
str(len(all_alerts)), str(all_from_cache)))
if LOCAL_DEBUG:
logger.debug('debug :: metrics_manager :: all_alerts :: %s' % str(all_alerts))
if not all_alerts:
logger.error('error :: metrics_manager :: all_alerts is not set, so creating from settings.ALERTS')
all_alerts = list(settings.ALERTS)
# If there was a last known alerts configuration compare it to the
# current known alerts configuration if they are different do a full
# refresh
# @added 20201017 - Feature #3788: snab_flux_load_test
# Feature #3560: External alert config
last_all_alerts_set = None
try:
last_all_alerts_data = self.redis_conn_decoded.get('analyzer.last_all_alerts')
if last_all_alerts_data:
last_all_alerts = literal_eval(last_all_alerts_data)
# A normal sorted nor set can be used as the list has dicts in it
last_all_alerts_set = sorted(last_all_alerts, key=lambda item: item[0])
logger.info('metrics_manager :: last_all_alerts_set from analyzer.last_all_alerts Redis set has %s items' % str(len(last_all_alerts_set)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from the analyzer.last_all_alerts Redis key')
last_all_alerts_set = None
all_alerts_set = None
if all_alerts:
try:
all_alerts_list = [list(row) for row in all_alerts]
# A normal sorted nor set can be used as the list has dicts in it
all_alerts_set = sorted(all_alerts_list, key=lambda item: item[0])
logger.info('metrics_manager :: all_alerts_set from all_alerts has %s items' % str(len(all_alerts_set)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create a sorted list from all_alerts object of type %s' % str(type(all_alerts_list)))
# Set the last known alert configuration to the current configuration
try:
self.redis_conn.set('analyzer.last_all_alerts', str(all_alerts_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set analyzer.last_all_alerts Redis key')
# Compare the last known with the current, if there was a last known
# configuration, if different do a full refresh
if last_all_alerts_set:
if str(all_alerts_set) != str(metrics_manager_last_all_alerts):
logger.info('metrics_manager :: metrics_manager alert settings have changed, sets will be refreshed')
refresh_redis_alert_sets = True
# @added 20220410 - Feature #3560: External alert config
# This handles a metrics_manager key rather than an analyzer key to
# trigger refreshing on
if all_alerts_set:
if str(all_alerts_set) != str(metrics_manager_last_all_alerts):
logger.info('metrics_manager :: alert settings have changed from the metrics_manager.last_all_alerts, alert sets will be refreshed')
refresh_redis_alert_sets = True
try:
self.redis_conn.set('metrics_manager.last_all_alerts', str(all_alerts_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set metrics_manager.last_all_alerts Redis key')
# Compare the current unique_metrics to the last smtp_alerter_metrics +
# non_smtp_alerter_metrics, if they have changed do a full refresh
if not refresh_redis_alert_sets:
smtp_alerter_metrics = []
non_smtp_alerter_metrics = []
try:
smtp_alerter_metrics = list(self.redis_conn_decoded.smembers('analyzer.smtp_alerter_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get list from analyzer.smtp_alerter_metrics Redis key')
refresh_redis_alert_sets = True
smtp_alerter_metrics = None
try:
non_smtp_alerter_metrics = list(self.redis_conn_decoded.smembers('analyzer.non_smtp_alerter_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get list from analyzer.non_smtp_alerter_metrics Redis key')
non_smtp_alerter_metrics = None
known_alerter_metrics_set = None
if smtp_alerter_metrics or non_smtp_alerter_metrics:
try:
known_alerter_metrics = smtp_alerter_metrics + non_smtp_alerter_metrics
known_alerter_metrics_set = set(known_alerter_metrics)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get list from analyzer.non_smtp_alerter_metrics Redis key')
# Compare known metrics to current unique_base_names if they are
# different do a full refresh
if known_alerter_metrics_set:
changed_metrics = []
try:
unique_base_names_set = set(list(unique_base_names))
if unique_base_names_set == known_alerter_metrics_set:
logger.info('metrics_manager :: unique_base_names_set and known_alerter_metrics_set are the same')
else:
set_difference = unique_base_names_set.difference(known_alerter_metrics_set)
for metric in set_difference:
changed_metrics.append(metric)
logger.info('metrics_manager :: there are %s metrics that have changed, sets will be refreshed' % str(len(changed_metrics)))
refresh_redis_alert_sets = True
del set_difference
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine hether the unique_base_names_set and known_alerter_metrics_set are different')
smtp_alerter_metrics = []
non_smtp_alerter_metrics = []
mirage_metrics = []
refresh_redis_alert_sets = True
# @added 20201104 - Feature #3788: snab_flux_load_test
# Feature #3560: External alert config
if refresh_redis_alert_sets:
logger.info('metrics_manager :: sets being refreshed, determining smtp_alerter_metrics')
all_smtp_alerter_metrics = []
all_mirage_metrics = []
mirage_metrics_expiration_times = []
mirage_metrics_keys = []
start_refresh = timer()
for base_name in unique_base_names:
if base_name not in all_smtp_alerter_metrics:
# Use the all_alerts list which includes external alert configs
# for alert in settings.ALERTS:
for alert in all_alerts:
pattern_match = False
if str(alert[1]) == 'smtp':
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, [alert[0]])
if LOCAL_DEBUG and pattern_match:
logger.debug('debug :: metrics_manager :: %s matched alert - %s' % (base_name, alert[0]))
try:
del metric_matched_by
except:
pass
if pattern_match:
all_smtp_alerter_metrics.append(base_name)
# @added 20160922 - Branch #922: Ionosphere
# Add a Redis set of mirage.unique_metrics
if settings.ENABLE_MIRAGE:
mirage_metric = False
try:
SECOND_ORDER_RESOLUTION_FULL_DURATION = int(alert[3])
if SECOND_ORDER_RESOLUTION_FULL_DURATION > 24:
mirage_metric = True
except:
mirage_metric = False
if mirage_metric:
metric_name = '%s%s' % (settings.FULL_NAMESPACE, base_name)
all_mirage_metrics.append(metric_name)
# @added 20200805 - Task #3662: Change mirage.last_check keys to timestamp value
# Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Add the mirage metric and its EXPIRATION_TIME to
# the mirage.metrics_expiration_times so that Mirage
# can determine the metric EXPIRATION_TIME without
# having to create and iterate the all_alerts
# object in the Mirage analysis phase so that the
# reported anomaly timestamp can be used to determine
# whether the EXPIRATION_TIME should be applied to a
# batch metric in the alerting and Ionosphere contexts
# mirage_alert_expiration_data = [base_name, int(alert[2])]
mirage_alert_expiration_data = [base_name, int(alert[2])]
mirage_metrics_expiration_times.append(mirage_alert_expiration_data)
# @added 20200904 - Task #3730: Validate Mirage running multiple processes
# Also always add the mirage.metrics Redis key for the
# metric which contains its hours_to_resolve so
# that the spin_process can add the mirage check
# files immediately, rather than waiting to add
# the mirage checks all in the alerting phase.
# This is done to reduce the time it takes to
# get through the analysis pipeline.
mirage_metrics_keys.append([base_name, int(alert[2]), SECOND_ORDER_RESOLUTION_FULL_DURATION])
break
except Exception as err:
logger.error('error :: metrics_manager :: all_smtp_alerter_metrics and all_mirage_metrics - %s' % err)
pattern_match = False
end_classify = timer()
logger.info('metrics_manager :: classifying metrics took %.6f seconds' % (end_classify - start_refresh))
logger.info('metrics_manager :: %s all_smtp_alerter_metrics were determined' % str(len(all_smtp_alerter_metrics)))
if all_smtp_alerter_metrics:
smtp_alerter_metrics = list(set(list(all_smtp_alerter_metrics)))
logger.info('metrics_manager :: %s unique smtp_alerter_metrics determined' % str(len(smtp_alerter_metrics)))
# Recreate the Redis set the analyzer.smtp_alerter_metrics
if smtp_alerter_metrics:
logger.info('metrics_manager :: recreating the analyzer.smtp_alerter_metrics Redis set')
try:
self.redis_conn.sadd('new_analyzer.smtp_alerter_metrics', *set(smtp_alerter_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the new_analyzer.smtp_alerter_metrics Redis set')
try:
self.redis_conn.delete('analyzer.smtp_alerter_metrics.old')
except:
pass
try:
self.redis_conn.rename('analyzer.smtp_alerter_metrics', 'analyzer.smtp_alerter_metrics.old')
except:
pass
try:
# @added 20180423 - Feature #2360: CORRELATE_ALERTS_ONLY
# Branch #2270: luminosity
# Add a Redis set of smtp_alerter_metrics for Luminosity to only
# cross correlate on metrics with an alert setting
self.redis_conn.rename('new_analyzer.smtp_alerter_metrics', 'analyzer.smtp_alerter_metrics')
except:
pass
try:
self.redis_conn.delete('analyzer.smtp_alerter_metrics.old')
except:
pass
logger.info('metrics_manager :: recreated the analyzer.smtp_alerter_metrics Redis set')
logger.info('metrics_manager :: determing non_smtp_alerter_metrics')
try:
unique_base_names_set = set(list(unique_base_names))
smtp_alerter_metrics_set = set(list(smtp_alerter_metrics))
if unique_base_names_set == smtp_alerter_metrics_set:
logger.info('metrics_manager :: unique_base_names_set and smtp_alerter_metrics_set are the same, no non_smtp_alerter_metrics')
else:
set_difference = unique_base_names_set.difference(smtp_alerter_metrics_set)
for metric in set_difference:
non_smtp_alerter_metrics.append(metric)
logger.info('metrics_manager :: there are %s non_alerter_metrics' % str(len(non_smtp_alerter_metrics)))
del set_difference
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine non_smtp_alerter_metrics from sets')
# Recreate the Redis set the analyzer.non_smtp_alerter_metrics
if non_smtp_alerter_metrics:
logger.info('metrics_manager :: recreating the analyzer.non_smtp_alerter_metrics Redis set')
try:
self.redis_conn.sadd('new_analyzer.non_smtp_alerter_metrics', *set(non_smtp_alerter_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the new_analyzer.non_smtp_alerter_metrics Redis set')
try:
self.redis_conn.delete('analyzer.non_smtp_alerter_metrics.old')
except:
pass
try:
self.redis_conn.rename('analyzer.non_smtp_alerter_metrics', 'analyzer.non_smtp_alerter_metrics.old')
except:
pass
try:
self.redis_conn.rename('new_analyzer.non_smtp_alerter_metrics', 'analyzer.non_smtp_alerter_metrics')
except:
pass
try:
self.redis_conn.delete('analyzer.non_smtp_alerter_metrics.old')
except:
pass
logger.info('metrics_manager :: recreated the analyzer.non_smtp_alerter_metrics Redis set')
try:
self.redis_conn.sunionstore('aet.analyzer.smtp_alerter_metrics', 'analyzer.smtp_alerter_metrics')
logger.info('metrics_manager :: copied Redis set analyzer.smtp_alerter_metrics to aet.analyzer.smtp_alerter_metrics via sunion')
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to copy Redis set analyzer.smtp_alerter_metrics to aet.analyzer.smtp_alerter_metrics via sunion')
try:
self.redis_conn.sunionstore('aet.analyzer.non_smtp_alerter_metrics', 'analyzer.non_smtp_alerter_metrics')
logger.info('metrics_manager :: copied Redis set analyzer.non_smtp_alerter_metrics to aet.analyzer.non_smtp_alerter_metrics via sunion')
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to copy Redis set analyzer.non_smtp_alerter_metrics to aet.analyzer.non_smtp_alerter_metrics via sunion')
logger.info('metrics_manager :: %s mirage metrics determined' % str(len(all_mirage_metrics)))
if all_mirage_metrics:
mirage_metrics = list(set(list(all_mirage_metrics)))
logger.info('metrics_manager :: %s unique mirage_metrics determined' % str(len(mirage_metrics)))
# Recreate the Redis set the mirage.unique_metrics
if mirage_metrics:
# @modified 20220411 - use sunionstore
# logger.info('metrics_manager :: recreating the mirage.unique_metrics Redis set')
logger.info('metrics_manager :: updating the mirage.unique_metrics Redis set')
try:
self.redis_conn.sadd('new_mirage.unique_metrics', *set(mirage_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add multiple members to the new_mirage.unique_metrics Redis set')
# try:
# self.redis_conn.delete('mirage.unique_metrics.old')
# except:
# pass
try:
# @modified 20220411
# self.redis_conn.rename('mirage.unique_metrics', 'mirage.unique_metrics.old')
self.redis_conn_decoded.sunionstore('mirage.unique_metrics', 'new_mirage.unique_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sunionstore Redis set mirage.unique_metrics with new_mirage.unique_metricsd')
# try:
# self.redis_conn.rename('new_mirage.unique_metrics', 'mirage.unique_metrics')
# except:
# logger.error(traceback.format_exc())
# logger.error('error :: metrics_manager :: failed to rename Redis set new_mirage.unique_metrics to mirage.unique_metrics')
try:
# self.redis_conn.delete('mirage.unique_metrics.old')
self.redis_conn.delete('new_mirage.unique_metrics')
except:
pass
# logger.info('metrics_manager :: recreated the mirage.unique_metrics Redis set')
logger.info('metrics_manager :: updated the mirage.unique_metrics Redis set')
end_refresh = timer()
logger.info('metrics_manager :: refresh of smtp_alerter_metrics, non_smtp_alerter_metrics and mirage_metrics took %.6f seconds' % (end_refresh - start_refresh))
if mirage_metrics_expiration_times:
logger.info('metrics_manager :: managing mirage.hash_key.metrics_expiration_times Redis hash key')
updated_keys = 0
added_keys = 0
removed_keys = 0
mirage_metrics_expiration_times_errors = 0
last_metrics_expiration_times = []
try:
raw_last_metrics_expiration_times = self.redis_conn_decoded.hgetall('mirage.hash_key.metrics_expiration_times')
for base_name_bytes in raw_last_metrics_expiration_times:
base_name = str(base_name_bytes)
expiration_time = int(raw_last_metrics_expiration_times[base_name])
last_metrics_expiration_times.append([base_name, expiration_time])
logger.info('metrics_manager :: %s entries in mirage.hash_key.metrics_expiration_times Redis hash key' % str(len(last_metrics_expiration_times)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key mirage.hash_key.metrics_expiration_times')
last_metrics_expiration_times = []
# Add them all if there are none in the hash key
if not last_metrics_expiration_times:
logger.info('metrics_manager :: nothing found in Redis hash key, added all %s metrics from mirage_metrics_expiration_times' % (
str(len(mirage_metrics_expiration_times))))
error_logged = False
for item in mirage_metrics_expiration_times:
try:
self.redis_conn.hset(
'mirage.hash_key.metrics_expiration_times',
item[0], int(item[1]))
added_keys += 1
except:
mirage_metrics_expiration_times_errors += 1
if not error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in mirage.hash_key.metrics_expiration_times for - %s' % str(item))
error_logged = True
logger.info('metrics_manager :: added all %s metrics to mirage.hash_key.metrics_expiration_times Redis hash' % (
str(len(mirage_metrics_expiration_times))))
# Determine the base_names in the last_metrics_expiration_times
last_metrics_expiration_times_metrics = []
if last_metrics_expiration_times:
try:
last_metrics_expiration_times_metrics = [item[0] for item in last_metrics_expiration_times]
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate list of metric names from last_metrics_expiration_times')
if last_metrics_expiration_times_metrics:
logger.info('metrics_manager :: checking entries in mirage.hash_key.metrics_expiration_times Redis hash key are correct')
error_logged = False
for item in mirage_metrics_expiration_times:
try:
base_name = item[0]
if base_name in last_metrics_expiration_times_metrics:
last_expiration_time = int(raw_last_metrics_expiration_times[base_name])
if last_expiration_time != int(item[1]):
self.redis_conn.hset(
'mirage.hash_key.metrics_expiration_times',
base_name, int(item[1]))
updated_keys += 1
else:
self.redis_conn.hset(
'mirage.hash_key.metrics_expiration_times',
base_name, int(item[1]))
added_keys += 1
except:
mirage_metrics_expiration_times_errors += 1
if not error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to manage entry in mirage.hash_key.metrics_expiration_times for - %s' % str(item))
error_logged = True
logger.info('metrics_manager :: checked entries in mirage.hash_key.metrics_expiration_times Redis hash key, %s updated, %s added' % (
str(updated_keys), str(added_keys)))
# Remove any metrics in no longer present
present_metrics_expiration_times_metrics = []
try:
present_metrics_expiration_times_metrics = [item[0] for item in mirage_metrics_expiration_times]
logger.info('metrics_manager :: %s current known metrics from mirage_metrics_expiration_times' % str(len(present_metrics_expiration_times_metrics)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate list of metric names from mirage_metrics_expiration_times')
present_metrics_expiration_times_metrics = None
if present_metrics_expiration_times_metrics:
logger.info('metrics_manager :: checking if any entries in mirage.hash_key.metrics_expiration_times Redis hash key need to be removed')
error_logged = False
for base_name in last_metrics_expiration_times_metrics:
try:
if base_name not in present_metrics_expiration_times_metrics:
self.redis_conn.hdel(
'mirage.hash_key.metrics_expiration_times',
base_name)
removed_keys += 1
except:
mirage_metrics_expiration_times_errors += 1
if not error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to remove entry from mirage.hash_key.metrics_expiration_times for - %s' % str(base_name))
error_logged = True
logger.info('metrics_manager :: removed %s entries in mirage.hash_key.metrics_expiration_times Redis hash key' % str(removed_keys))
logger.info('metrics_manager :: managed mirage.hash_key.metrics_expiration_times Redis hash key')
# @added 20200904 - Task #3730: Validate Mirage running multiple processes
# Also always add the mirage.metrics Redis key for the
# metric which contains its hours_to_resolve so
# that the spin_process can add the mirage check
# files immediately, rather than waiting to add
# the mirage checks all in the alerting phase.
# This is done to reduce the time it takes to
# get through the analysis pipeline.
# @modified 20201109 - Feature #3830: metrics_manager
# Changed to a single mirage.hash_key.metrics_resolutions hash key
# rather than individual mirage.metrics. Redis keys for each mirage
# metric
if mirage_metrics_keys:
logger.info('metrics_manager :: managing the mirage.hash_key.metrics_resolutions Redis hash key')
last_metrics_resolutions = {}
try:
raw_last_metrics_resolutions = self.redis_conn_decoded.hgetall('mirage.hash_key.metrics_resolutions')
for base_name_bytes in raw_last_metrics_resolutions:
base_name = str(base_name_bytes)
hours_to_resolve = int(raw_last_metrics_resolutions[base_name])
last_metrics_resolutions[base_name] = hours_to_resolve
logger.info('metrics_manager :: %s entries in mirage.hash_key.metrics_resolutions Redis hash key' % str(len(last_metrics_resolutions)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key mirage.hash_key.metrics_resolutions')
last_metrics_resolutions = {}
logger.info('metrics_manager :: there are %s metrics in the mirage.hash_key.metrics_resolutions Redis hash key' % str(len(last_metrics_resolutions)))
logger.info('metrics_manager :: determining if any metrics need to be removed from the mirage.hash_key.metrics_resolutions Redis hash key, via set difference')
metrics_to_remove = []
current_metrics_resolutions = {}
for item in mirage_metrics_keys:
base_name = item[0]
current_metrics_resolutions[base_name] = item[2]
if current_metrics_resolutions:
try:
last_metrics_resolutions_set = set(list(last_metrics_resolutions))
current_metrics_resolutions_set = set(list(current_metrics_resolutions))
set_difference = last_metrics_resolutions_set.difference(current_metrics_resolutions_set)
metrics_to_remove = set_difference
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine metrics to remove from mirage.hash_key.metrics_resolutions')
metrics_to_remove = []
logger.info('metrics_manager :: there are %s metrics to remove from the mirage.hash_key.metrics_resolutions Redis hash key' % str(len(metrics_to_remove)))
if metrics_to_remove:
metrics_to_remove_error_logged = False
metrics_removed = 0
for base_name in metrics_to_remove:
try:
self.redis_conn.hdel(
'mirage.hash_key.metrics_resolutions', base_name)
metrics_removed += 1
except:
if not metrics_to_remove_error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine metrics to remove from mirage.hash_key.metrics_resolutions')
metrics_to_remove_error_logged = True
logger.info('metrics_manager :: removed %s metrics from the mirage.hash_key.metrics_resolutions Redis hash key' % str(metrics_removed))
logger.info('metrics_manager :: determining if there are any new metrics to add to the mirage.hash_key.metrics_resolutions Redis hash key, via set difference')
metrics_to_add = []
if current_metrics_resolutions:
try:
last_metrics_resolutions_set = set(list(last_metrics_resolutions))
current_metrics_resolutions_set = set(list(current_metrics_resolutions))
set_difference = last_metrics_resolutions_set.difference(last_metrics_resolutions_set)
metrics_to_add = set_difference
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine metrics to remove from mirage.hash_key.metrics_resolutions')
metrics_to_add = []
metrics_added = 0
logger.info('metrics_manager :: there are %s metrics to add to the mirage.hash_key.metrics_resolutions Redis hash key' % str(len(metrics_to_add)))
if metrics_to_add:
metrics_to_add_error_logged = False
for base_name in metrics_to_add:
try:
hours_to_resolve = current_metrics_resolutions[base_name]
self.redis_conn.hset(
'mirage.hash_key.metrics_resolutions', base_name,
hours_to_resolve)
metrics_added += 1
except Exception as e:
if not metrics_to_add_error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add %s to mirage.hash_key.metrics_resolutions - %s' % (base_name, e))
metrics_to_add_error_logged = True
logger.info('metrics_manager :: added %s metrics to the mirage.hash_key.metrics_resolutions Redis hash key' % str(metrics_added))
# Update any changed metric resolutions, this is a fast iterator
logger.info('metrics_manager :: checking if any metrics need their resolution updated in the mirage.hash_key.metrics_resolutions Redis hash key')
metrics_resolutions_updated = 0
metrics_updated_error_logged = False
for base_name in list(current_metrics_resolutions.keys()):
update_metric_resolution = False
try:
last_resolution = last_metrics_resolutions[base_name]
except:
last_resolution = 0
try:
current_resolution = current_metrics_resolutions[base_name]
except:
current_resolution = 0
if not last_resolution:
update_metric_resolution = True
last_resolution = current_resolution
if last_resolution != current_resolution:
update_metric_resolution = True
if update_metric_resolution:
try:
self.redis_conn.hset(
'mirage.hash_key.metrics_resolutions', base_name,
current_resolution)
metrics_resolutions_updated += 1
except:
if not metrics_updated_error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to update the resolution of %s to mirage.hash_key.metrics_resolutions' % base_name)
metrics_updated_error_logged = True
logger.info('metrics_manager :: updated the resolutions of %s metrics in the mirage.hash_key.metrics_resolutions Redis hash key' % str(metrics_resolutions_updated))
logger.info('metrics_manager :: smtp_alerter_metrics :: %s' % str(len(smtp_alerter_metrics)))
logger.info('metrics_manager :: non_smtp_alerter_metrics :: %s' % str(len(non_smtp_alerter_metrics)))
logger.info('metrics_manager :: mirage_metrics :: %s' % str(len(mirage_metrics)))
# @added 20210601 - Feature #4000: EXTERNAL_SETTINGS
external_settings = {}
external_from_cache = False
if EXTERNAL_SETTINGS:
logger.info('metrics_manager :: managing external_settings')
try:
external_settings, external_from_cache = manage_external_settings(skyline_app)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: fetch_external_settings failed - %s' % (
e))
if external_settings:
logger.info('metrics_manager :: %s external_settings from cache %s' % (
str(len(list(external_settings.keys()))), str(external_from_cache)))
# @added 20220329 - Feature #4446: Optimise horizon worker in_skip_list
# Update the horizon.do_not_skip_metrics Redis hash periodically to
# ensure that any new metrics that are added to DO_NOT_SKIP_LIST are
# added to the horizon.do_not_skip_metrics Redis hash and that they are
# removed from the horizon.skip_metrics Redis hash
do_not_skip_metrics_dict = {}
try:
do_not_skip_metrics_dict = self.redis_conn_decoded.hgetall('horizon.do_not_skip_metrics')
except Exception as err:
logger.error('error :: metrics_manager :: failed to hgetall horizon.do_not_skip_metrics: %s' % err)
logger.info('metrics_manager :: got %s metrics from horizon.do_not_skip_metrics Redis hash' % (
str(len(do_not_skip_metrics_dict))))
horizon_do_not_skip_metrics = list(do_not_skip_metrics_dict.keys())
skip_metrics_dict = {}
try:
skip_metrics_dict = self.redis_conn_decoded.hgetall('horizon.skip_metrics')
except Exception as err:
logger.error('error :: metrics_manager :: failed to hgetall horizon.skip_metrics: %s' % err)
logger.info('metrics_manager :: got %s metrics from horizon.skip_metrics Redis hash' % (
str(len(skip_metrics_dict))))
horizon_skip_metrics = list(skip_metrics_dict.keys())
# @added 20220426 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well
logger.info('metrics_manager :: creating metrics_manager_skip_list and metrics_manager_do_not_skip_list')
metrics_manager_skip_list = []
metrics_manager_do_not_skip_list = []
ALL_SKIP_LIST = []
try:
ALL_SKIP_LIST = list(settings.SKIP_LIST)
except AttributeError:
ALL_SKIP_LIST = []
except:
ALL_SKIP_LIST = []
ALL_DO_NOT_SKIP_LIST = []
try:
ALL_DO_NOT_SKIP_LIST = list(settings.DO_NOT_SKIP_LIST)
except AttributeError:
ALL_DO_NOT_SKIP_LIST = []
except:
ALL_DO_NOT_SKIP_LIST = []
if external_settings:
for settings_key in list(external_settings.keys()):
external_setting_keys = list(external_settings[settings_key].keys())
skip_metrics = []
external_namespace = None
try:
external_namespace = external_settings[settings_key]['namespace']
except KeyError:
external_namespace = None
except:
external_namespace = None
if 'skip_metrics' in external_setting_keys:
skip_metrics = external_settings[settings_key]['skip_metrics']
if skip_metrics:
if isinstance(skip_metrics, list):
if external_namespace:
for namespace in skip_metrics:
metric_namespaces = '%s.%s' % (external_namespace, namespace)
ALL_SKIP_LIST.append(metric_namespaces)
else:
ALL_SKIP_LIST = ALL_SKIP_LIST + skip_metrics
do_not_skip_metrics = []
if 'do_not_skip_metrics' in external_setting_keys:
do_not_skip_metrics = external_settings[settings_key]['do_not_skip_metrics']
if do_not_skip_metrics:
if isinstance(do_not_skip_metrics, list):
if external_namespace:
for namespace in do_not_skip_metrics:
metric_namespaces = '%s.%s' % (external_namespace, namespace)
ALL_DO_NOT_SKIP_LIST.append(metric_namespaces)
else:
ALL_DO_NOT_SKIP_LIST = ALL_DO_NOT_SKIP_LIST + do_not_skip_metrics
horizon_metrics_received = []
try:
horizon_metrics_received = list(self.redis_conn_decoded.smembers('aet.horizon.metrics_received'))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: smembers failed on aet.horizon.metrics_received - %s' % err)
horizon_metrics_received = []
all_unique_base_names = list(set(unique_base_names + horizon_metrics_received + horizon_skip_metrics + horizon_do_not_skip_metrics))
if ALL_SKIP_LIST:
for base_name in all_unique_base_names:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, ALL_SKIP_LIST)
del metric_matched_by
if pattern_match:
metrics_manager_skip_list.append(base_name)
logger.info('metrics_manager :: added %s base_names to metrics_manager_skip_list' % str(len(metrics_manager_skip_list)))
metrics_manager_removed_from_skip_list = []
if ALL_DO_NOT_SKIP_LIST:
for base_name in all_unique_base_names:
if base_name in metrics_manager_skip_list:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, ALL_DO_NOT_SKIP_LIST)
del metric_matched_by
if pattern_match:
metrics_manager_do_not_skip_list.append(base_name)
if base_name in metrics_manager_skip_list:
metrics_manager_skip_list.remove(base_name)
metrics_manager_removed_from_skip_list.append(base_name)
else:
metrics_manager_do_not_skip_list.append(base_name)
logger.info('metrics_manager :: added %s base_names to metrics_manager_do_not_skip_list' % str(len(metrics_manager_do_not_skip_list)))
logger.info('metrics_manager :: removed %s base_names from metrics_manager_skip_list that were found in metrics_manager_do_not_skip_list' % str(len(metrics_manager_removed_from_skip_list)))
if metrics_manager_skip_list:
try:
self.redis_conn.sadd('new.metrics_manager.skip_list', *set(metrics_manager_skip_list))
self.redis_conn.rename('new.metrics_manager.skip_list', 'metrics_manager.skip_list')
logger.info('metrics_manager :: created metrics_manager.skip_list Redis hash with %s metrics' % str(len(metrics_manager_skip_list)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to create Redis set new.metrics_manager.skip_list and rename to metrics_manager.skip_list')
if metrics_manager_do_not_skip_list:
try:
self.redis_conn.sadd('new.metrics_manager.do_not_skip_list', *set(metrics_manager_do_not_skip_list))
self.redis_conn.rename('new.metrics_manager.do_not_skip_list', 'metrics_manager.do_not_skip_list')
logger.info('metrics_manager :: created metrics_manager.do_not_skip_list Redis hash with %s metrics' % str(len(metrics_manager_do_not_skip_list)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to create Redis set new.metrics_manager.do_not_skip_list and rename to metrics_manager.do_not_skip_list')
# @added 20220329 - Feature #4446: Optimise horizon worker in_skip_list
# Update the horizon.do_not_skip_metrics Redis hash periodically to
# ensure that any new metrics that are added to DO_NOT_SKIP_LIST are
# added to the horizon.do_not_skip_metrics Redis hash and that they are
# removed from the horizon.skip_metrics Redis hash
if do_not_skip_metrics_dict and settings.DO_NOT_SKIP_LIST:
all_horizon_metrics = list(set(unique_base_names + list(skip_metrics_dict.keys())))
update_do_not_skip_metrics = {}
for base_name in all_horizon_metrics:
if base_name in horizon_do_not_skip_metrics:
continue
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, settings.DO_NOT_SKIP_LIST)
if pattern_match:
update_do_not_skip_metrics[base_name] = int(spin_start)
del metric_matched_by
new_do_not_skip_metrics = list(update_do_not_skip_metrics.keys())
logger.info('metrics_manager :: found %s new metrics to add to horizon.do_not_skip_metrics Redis hash' % (
str(len(new_do_not_skip_metrics))))
if update_do_not_skip_metrics:
try:
self.redis_conn_decoded.hset('horizon.do_not_skip_metrics', mapping=update_do_not_skip_metrics)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to update horizon.do_not_skip_metrics Redis hash - %s' % (
err))
remove_from_skip_metrics = []
for base_name in new_do_not_skip_metrics:
if base_name in list(skip_metrics_dict.keys()):
remove_from_skip_metrics.append(base_name)
if remove_from_skip_metrics:
logger.info('metrics_manager :: %s metrics to remove from horizon.skip_metrics Redis hash' % (
str(len(remove_from_skip_metrics))))
try:
self.redis_conn_decoded.hdel('horizon.skip_metrics', *set(remove_from_skip_metrics))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hdel metrics from horizon.skip_metrics Redis hash - %s' % (
err))
del remove_from_skip_metrics
del update_do_not_skip_metrics
del all_horizon_metrics
del new_do_not_skip_metrics
# @added 20220427 - Feature #4446: Optimise horizon worker in_skip_list
# Feature #4536: Handle Redis failure
# Add flux required data to memcache as well
logger.info('metrics_manager :: updating horizon.do_not_skip_metrics with metrics_manager_do_not_skip_list')
horizon_do_not_skip_set = set(horizon_do_not_skip_metrics)
metrics_manager_do_not_skip_set = set(metrics_manager_do_not_skip_list)
missing_from_horizon_do_not_skip = list(metrics_manager_do_not_skip_set.difference(horizon_do_not_skip_set))
if missing_from_horizon_do_not_skip:
logger.info('metrics_manager :: adding %s metrics to horizon.do_not_skip_metrics Redis hash' % (
str(len(missing_from_horizon_do_not_skip))))
missing_from_horizon_do_not_skip_dict = {}
current_timestamp = int(time())
for base_name in missing_from_horizon_do_not_skip:
missing_from_horizon_do_not_skip_dict[base_name] = current_timestamp
try:
self.redis_conn_decoded.hset('horizon.do_not_skip_metrics', mapping=missing_from_horizon_do_not_skip_dict)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hset metrics in horizon.do_not_skip_metrics Redis hash - %s' % (
err))
else:
logger.info('metrics_manager :: no metrics missing from horizon.do_not_skip_metrics Redis hash')
remove_from_horizon_do_not_skip = list(horizon_do_not_skip_set.difference(metrics_manager_do_not_skip_set))
if remove_from_horizon_do_not_skip:
logger.info('metrics_manager :: removing %s metrics to horizon.do_not_skip_metrics Redis hash' % (
str(len(remove_from_horizon_do_not_skip))))
try:
self.redis_conn_decoded.hdel('horizon.do_not_skip_metrics', *set(remove_from_horizon_do_not_skip))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hset metrics in horizon.do_not_skip_metrics Redis hash - %s' % (
err))
try:
self.redis_conn_decoded.sadd('metrics_manager.removed_from.horizon.do_not_skip_metrics', *set(remove_from_horizon_do_not_skip))
self.redis_conn_decoded.expire('metrics_manager.removed_from.horizon.do_not_skip_metrics', 86400)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sadd metrics to metrics_manager.removed_from.horizon.do_not_skip_metrics Redis set - %s' % (
err))
else:
logger.info('metrics_manager :: no metrics to remove from horizon.do_not_skip_metrics Redis hash')
# del do_not_skip_metrics_dict
# del horizon_do_not_skip_metrics
logger.info('metrics_manager :: updating horizon.skip_metrics with metrics_manager_skip_list')
horizon_skip_set = set(horizon_skip_metrics)
metrics_manager_skip_set = set(metrics_manager_skip_list)
missing_from_horizon_skip = list(metrics_manager_skip_set.difference(horizon_skip_set))
if missing_from_horizon_skip:
logger.info('metrics_manager :: adding %s metrics to horizon.skip_metrics Redis hash' % (
str(len(missing_from_horizon_skip))))
missing_from_horizon_skip_dict = {}
current_timestamp = int(time())
for base_name in missing_from_horizon_skip:
missing_from_horizon_skip_dict[base_name] = current_timestamp
try:
self.redis_conn_decoded.hset('horizon.skip_metrics', mapping=missing_from_horizon_skip_dict)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hset metrics in horizon.skip_metrics Redis hash - %s' % (
err))
else:
logger.info('metrics_manager :: no metrics missing from horizon.skip_metrics Redis hash')
remove_from_horizon_skip = list(horizon_skip_set.difference(metrics_manager_skip_set))
if remove_from_horizon_skip:
logger.info('metrics_manager :: removing %s metrics to horizon.skip_metrics Redis hash' % (
str(len(remove_from_horizon_skip))))
try:
self.redis_conn_decoded.hdel('horizon.skip_metrics', *set(remove_from_horizon_skip))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hset metrics in horizon.skip_metrics Redis hash - %s' % (
err))
try:
self.redis_conn_decoded.sadd('metrics_manager.removed_from.horizon.skip_metrics', *set(remove_from_horizon_skip))
self.redis_conn_decoded.expire('metrics_manager.removed_from.horizon.skip_metrics', 86400)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sadd metrics to metrics_manager.removed_from.horizon.skip_metrics Redis set - %s' % (
err))
else:
logger.info('metrics_manager :: no metrics to remove from horizon.skip_metrics Redis hash')
# @added 20220126 - Feature #4400: flux - quota
namespaces_with_quotas = []
namespace_quotas_settings = {}
try:
namespace_quotas_settings = settings.FLUX_NAMESPACE_QUOTAS
except AttributeError:
namespace_quotas_settings = {}
except:
namespace_quotas_settings = {}
if namespace_quotas_settings:
for namespace in namespace_quotas_settings:
quota = 0
try:
quota = namespace_quotas_settings[namespace]
except KeyError:
continue
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: determining quota for %s, err: %s' % (
str(namespace), err))
if quota:
namespaces_with_quotas.append(namespace)
try:
self.redis_conn.hset('metrics_manager.flux.namespace_quotas', namespace, quota)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set quota for %s to %s in metrics_manager.flux.namespace_quotas, err: %s' % (
namespace, str(quota), err))
del namespace_quotas_settings
if external_settings:
for external_setting in external_settings:
quota = 0
try:
namespace = external_settings[external_setting]['namespace']
quota = external_settings[external_setting]['metric_limit']
except KeyError:
continue
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: determining quota from external_setting - %s, err: %s' % (
str(external_setting), err))
if quota:
namespaces_with_quotas.append(namespace)
try:
self.redis_conn.hset('metrics_manager.flux.namespace_quotas', namespace, quota)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set quota for %s to %s in metrics_manager.flux.namespace_quotas, err: %s' % (
namespace, str(quota), err))
namespace_quotas_dict = {}
if namespaces_with_quotas:
try:
namespace_quotas_dict = self.redis_conn_decoded.hgetall('metrics_manager.flux.namespace_quotas')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hgetall metrics_manager.flux.namespace_quotas Redis hash key - %s' % (
err))
# @added 20220426 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well
if settings.MEMCACHE_ENABLED:
try:
success = set_memcache_key(skyline_app, 'metrics_manager.flux.namespace_quotas', namespace_quotas_dict)
if success:
logger.info('metrics_manager :: set memcache metrics_manager.flux.namespace_quotas key')
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set metrics_manager.flux.namespace_quotas - %s' % (
err))
for parent_namespace in list(namespace_quotas_dict.keys()):
namespace_quota_key = 'flux.quota.namespace_metrics.%s' % parent_namespace
namespace_metrics = []
try:
namespace_metrics = list(self.redis_conn_decoded.smembers(namespace_quota_key))
except:
namespace_metrics = []
if namespace_metrics:
success = False
try:
success = set_memcache_key(skyline_app, namespace_quota_key, namespace_metrics)
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set %s - %s' % (
namespace_quota_key, err))
if success:
logger.info('metrics_manager :: set memcache %s key' % namespace_quota_key)
flux_namespace_not_skipped_metrics = {}
flux_namespace_not_skipped_metrics_key = 'flux.not_skipped_metrics.%s' % parent_namespace
try:
flux_namespace_not_skipped_metrics = self.redis_conn_decoded.hgetall(flux_namespace_not_skipped_metrics_key)
except:
flux_namespace_not_skipped_metrics = {}
if flux_namespace_not_skipped_metrics:
success = False
try:
success = set_memcache_key(skyline_app, flux_namespace_not_skipped_metrics_key, flux_namespace_not_skipped_metrics)
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set %s - %s' % (
flux_namespace_not_skipped_metrics_key, err))
if success:
logger.info('metrics_manager :: set memcache %s key' % flux_namespace_not_skipped_metrics_key)
flux_namespace_skipped_metrics = {}
flux_namespace_skipped_metrics_key = 'flux.skipped_metrics.%s' % parent_namespace
try:
flux_namespace_skipped_metrics = self.redis_conn_decoded.hgetall(flux_namespace_skipped_metrics_key)
except:
flux_namespace_skipped_metrics = {}
if flux_namespace_skipped_metrics:
success = False
try:
success = set_memcache_key(skyline_app, flux_namespace_skipped_metrics_key, flux_namespace_skipped_metrics)
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set %s - %s' % (
flux_namespace_skipped_metrics_key, err))
if success:
logger.info('metrics_manager :: set memcache %s key' % flux_namespace_skipped_metrics_key)
# Remove entries which no longer have a quota
for namespace in namespace_quotas_dict:
if namespace not in namespaces_with_quotas:
logger.info('metrics_manager :: removing %s from metrics_manager.flux.namespace_quotas Redis hash' % (
namespace))
try:
self.redis_conn.hdel('metrics_manager.flux.namespace_quotas', namespace)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set quota for %s to %s in metrics_manager.flux.namespace_quotas, err: %s' % (
namespace, str(quota), err))
del namespace_quotas_dict
del namespaces_with_quotas
# @added 20210406 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
# Analyzer determines what metrics flux should aggregate by creating the
# the flux.aggregate_metrics Redis set, which flux/listen and flux/aggregate
# reference. This is done in analyzer/metrics_manager because it manages
# metric Redis sets as it always runs. It is only managed every 5 mins.
aggregate_namespaces = []
flux_aggregate_metrics = []
flux_aggregate_zerofill_metrics = []
flux_aggregate_lkv_metrics = []
# @added 20220128 - Feature #4404: flux - external_settings - aggregation
external_settings_aggregations = {}
# @modified 20220222 - Feature #4284: flux - telegraf
# Feature #4404: flux - external_settings - aggregation
# Added or external_settings
if FLUX_AGGREGATE_NAMESPACES or FLUX_EXTERNAL_AGGREGATE_NAMESPACES or external_settings:
if FLUX_AGGREGATE_NAMESPACES:
aggregate_namespaces = list(FLUX_AGGREGATE_NAMESPACES.keys())
if FLUX_EXTERNAL_AGGREGATE_NAMESPACES:
logger.info('metrics_manager :: TODO :: FLUX_EXTERNAL_AGGREGATE_NAMESPACES')
# TODO
# flux_external_aggregate_namespaces_dict = self.redis_conn_decoded.hgetall(...
# for i_metric in ...:
# aggregate_namespaces.append(i_metric)
# @added 20220128 - Feature #4404: flux - external_settings - aggregation
if external_settings:
external_settings_aggregations = external_settings_aggregation(skyline_app, external_settings, True)
if external_settings_aggregations:
for namespace_key in list(external_settings_aggregations.keys()):
aggregate_namespaces.append(namespace_key)
FLUX_AGGREGATE_NAMESPACES[namespace_key] = external_settings_aggregations[namespace_key]
aggregate_namespaces = list(set(aggregate_namespaces))
logger.info('metrics_manager :: aggregate_namespaces: %s' % str(aggregate_namespaces))
if FLUX_AGGREGATE_NAMESPACES:
for namespace_key in list(FLUX_AGGREGATE_NAMESPACES.keys()):
try:
self.redis_conn.hset('metrics_manager.new.flux.aggregate_namespaces', namespace_key, str(FLUX_AGGREGATE_NAMESPACES[namespace_key]))
except Exception as err:
if LOCAL_DEBUG:
logger.error('error :: metrics_manager :: could not add to metrics_manager.new.flux.aggregate_namespaces Redis hash key: %s' % str(err))
try:
self.redis_conn.rename('metrics_manager.new.flux.aggregate_namespaces', 'metrics_manager.flux.aggregate_namespaces')
except Exception as err:
if LOCAL_DEBUG:
logger.error('error :: metrics_manager :: failed to rename Redis hash metrics_manager.new.flux.aggregate_namespaces to metrics_manager.flux.aggregate_namespaces, %s' % str(err))
# @added 20220426 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well
if settings.MEMCACHE_ENABLED:
metrics_manager_flux_aggregate_namespaces = {}
try:
metrics_manager_flux_aggregate_namespaces = self.redis_conn_decoded.hgetall('metrics_manager.flux.aggregate_namespaces')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: hgetall failed for metrics_manager.flux.aggregate_namespaces - %e' % err)
if metrics_manager_flux_aggregate_namespaces:
try:
success = set_memcache_key(skyline_app, 'metrics_manager.flux.aggregate_namespaces', metrics_manager_flux_aggregate_namespaces)
if success:
logger.info('metrics_manager :: set memcache metrics_manager.flux.aggregate_namespaces key')
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set metrics_manager.flux.aggregate_namespaces - %s' % (
err))
manage_flux_aggregate_namespaces = False
# Only manage every 5 mins
# @modified 20220128 - Feature #4404: flux - external_settings - aggregation
# Feature #4324: flux - reload external_settings
# Feature #4376: webapp - update_external_settings
# Moved variable definition to above
# manage_flux_aggregate_namespaces_redis_key = 'metrics_manager.manage_flux_aggregate_namespaces'
try:
manage_flux_aggregate_namespaces = self.redis_conn.get(manage_flux_aggregate_namespaces_redis_key)
except Exception as e:
if LOCAL_DEBUG:
logger.error('error :: metrics_manager :: could not query Redis for metrics_manager.manage_flux_aggregate_namespaces key: %s' % str(e))
if not manage_flux_aggregate_namespaces:
logger.info('metrics_manager :: managing FLUX_AGGREGATE_NAMESPACES Redis sets')
try:
self.redis_conn.delete('analyzer.flux_aggregate_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete analyzer.flux_aggregate_metrics Redis set')
try:
self.redis_conn.delete('metrics_manager.flux_zero_fill_aggregate_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete metrics_manager.flux_zero_fill_aggregate_metrics Redis set')
try:
self.redis_conn.delete('metrics_manager.flux_last_known_value_aggregate_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete metrics_manager.flux_last_known_value_aggregate_metrics Redis set')
for i_base_name in unique_base_names:
pattern_match, metric_matched_by = matched_or_regexed_in_list('analyzer', i_base_name, aggregate_namespaces)
if pattern_match:
try:
flux_aggregate_metrics.append(i_base_name)
matched_namespace = metric_matched_by['matched_namespace']
metric_aggregation_settings = FLUX_AGGREGATE_NAMESPACES[matched_namespace]
# Add the configuration to Redis
self.redis_conn.hset('metrics_manager.flux.aggregate_namespaces.settings', i_base_name, str(metric_aggregation_settings))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete metrics_manager.flux_last_known_value_aggregate_metrics Redis set')
# @added 20210407 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
# Allow for zero_fill and last_known_value setting to be
# defined in the FLUX_AGGREGATE_NAMESPACES setting
add_to_zero_fill = False
try:
add_to_zero_fill = metric_aggregation_settings['zero_fill']
except:
add_to_zero_fill = False
if add_to_zero_fill:
flux_aggregate_zerofill_metrics.append(i_base_name)
add_to_last_known_value = False
try:
add_to_last_known_value = metric_aggregation_settings['last_known_value']
except:
add_to_last_known_value = False
if add_to_last_known_value:
flux_aggregate_lkv_metrics.append(i_base_name)
if flux_aggregate_zerofill_metrics:
try:
logger.info('metrics_manager :: adding %s aggregate metrics that have zerofill aggregation setting to metrics_manager.flux_zero_fill_aggregate_metrics' % str(len(flux_aggregate_zerofill_metrics)))
self.redis_conn.sadd('metrics_manager.flux_zero_fill_aggregate_metrics', *set(flux_aggregate_zerofill_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple aggregation metric members to the metrics_manager.flux_zero_fill_aggregate_metrics Redis set')
if flux_aggregate_lkv_metrics:
try:
logger.info('metrics_manager :: adding %s aggregate metrics that have last_known_value aggregation setting to metrics_manager.flux_last_known_value_aggregate_metrics' % str(len(flux_aggregate_lkv_metrics)))
self.redis_conn.sadd('metrics_manager.flux_last_known_value_aggregate_metrics', *set(flux_aggregate_lkv_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the metrics_manager.flux_last_known_value_aggregate_metrics Redis set')
if flux_aggregate_metrics:
logger.info('metrics_manager :: popuating analyzer.flux_aggregate_metrics Redis set with %s metrics' % str(len(flux_aggregate_metrics)))
try:
self.redis_conn.sadd('analyzer.flux_aggregate_metrics', *set(flux_aggregate_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.flux_aggregate_metrics Redis set')
# @added 20220429 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
metrics_manager_flux_aggregate_namespaces_settings = {}
try:
metrics_manager_flux_aggregate_namespaces_settings = self.redis_conn_decoded.hgetall('metrics_manager.flux.aggregate_namespaces.settings')
except Exception as err:
logger.error('error :: metrics_manager :: failed to hgetall metrics_manager.flux.aggregate_namespaces.settings - %s' % err)
if metrics_manager_flux_aggregate_namespaces_settings:
logger.info('metrics_manager :: populating metrics_manager.flux.aggregate_namespaces.settings memcache key with %s namespace settings' % str(len(metrics_manager_flux_aggregate_namespaces_settings)))
success = False
try:
success = set_memcache_key('analyzer', 'metrics_manager.flux.aggregate_namespaces.settings', metrics_manager_flux_aggregate_namespaces_settings)
except Exception as err:
logger.error('error :: metrics_manager :: failed to set memcache key metrics_manager.flux.aggregate_namespaces.settings - %s' % (str(err)))
if success:
logger.info('metrics_manager :: metrics_manager.flux.aggregate_namespaces.settings memcache ket set')
try:
key_timestamp = int(time())
self.redis_conn.setex(manage_flux_aggregate_namespaces_redis_key, RUN_EVERY, key_timestamp)
except:
logger.error('error :: metrics_manager :: failed to set key %s manage_flux_aggregate_namespaces_redis_key' % manage_flux_aggregate_namespaces_redis_key)
logger.info('metrics_manager :: checking if any metrics need to be removed from analyzer.flux_aggregate_metrics')
flux_aggregate_metrics_to_remove = []
flux_aggregate_metrics_list = []
try:
flux_aggregate_metrics_list = list(self.redis_conn_decoded.smembers('analyzer.flux_aggregate_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from analyzer.flux_aggregate_metrics Redis set')
for flux_aggregate_base_name in flux_aggregate_metrics_list:
if flux_aggregate_base_name not in unique_base_names:
flux_aggregate_metrics_to_remove.append(flux_aggregate_base_name)
if flux_aggregate_metrics_to_remove:
try:
logger.info('metrics_manager :: removing %s metrics from analyzer.flux_aggregate_metrics' % str(len(flux_aggregate_metrics_to_remove)))
self.redis_conn.srem('analyzer.flux_aggregate_metrics', *set(flux_aggregate_metrics_to_remove))
# Reload the new set
try:
flux_aggregate_metrics_list = list(self.redis_conn_decoded.smembers('analyzer.flux_aggregate_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from analyzer.flux_aggregate_metrics Redis set after removals')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.flux_aggregate_metrics Redis set')
else:
logger.info('metrics_manager :: no metrics need to remove from analyzer.flux_aggregate_metrics')
if flux_aggregate_metrics_list:
# Replace the existing flux.aggregate_metrics Redis set
try:
self.redis_conn.sunionstore('metrics_manager.flux.aggregate_metrics', 'analyzer.flux_aggregate_metrics')
logger.info('metrics_manager :: replaced metrics_manager.flux.aggregate_metrics Redis set with the newly created analyzer.flux_aggregate_metrics set')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sunionstore metrics_manager.flux.aggregate_metrics from analyzer.flux_aggregate_metrics Redis sets')
# @added 20220426 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well
if settings.MEMCACHE_ENABLED:
metrics_manager_flux_aggregate_metrics = []
try:
metrics_manager_flux_aggregate_metrics = list(self.redis_conn_decoded.smembers('metrics_manager.flux.aggregate_metrics'))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: smembers failed for metrics_manager.flux.aggregate_metrics - %e' % err)
if metrics_manager_flux_aggregate_metrics:
try:
success = set_memcache_key(skyline_app, 'metrics_manager.flux.aggregate_metrics', metrics_manager_flux_aggregate_metrics)
if success:
logger.info('metrics_manager :: set memcache metrics_manager.flux.aggregate_metrics key')
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set metrics_manager.flux.aggregate_metrics - %s' % (
err))
# Create a Redis hash for flux/listen. These entries are
# managed flux/listen, if the timestamp for an entry is
# older than 12 hours flux/listen removes it from the hash
time_now = int(time())
for flux_aggregate_metric in flux_aggregate_metrics_list:
try:
self.redis_conn.hset(
'metrics_manager.flux.aggregate_metrics.hash', flux_aggregate_metric,
time_now)
except:
pass
# @added 20200827 - Feature #3708: FLUX_ZERO_FILL_NAMESPACES
# Analyzer determines what metrics flux should 0 fill by creating
# the flux.zero_fill_metrics Redis set, which flux references. This
# is done in Analyzer because it manages metric Redis sets as it
# always runs. It is only managed in Analyzer every 5 mins.
# @modified 20210407 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
# if FLUX_ZERO_FILL_NAMESPACES:
if FLUX_ZERO_FILL_NAMESPACES or flux_aggregate_zerofill_metrics:
manage_flux_zero_fill_namespaces = False
flux_zero_fill_metrics = []
# Only manage every 5 mins
manage_flux_zero_fill_namespaces_redis_key = 'metrics_manager.manage_flux_zero_fill_namespaces'
try:
manage_flux_zero_fill_namespaces = self.redis_conn.get(manage_flux_zero_fill_namespaces_redis_key)
except Exception as e:
if LOCAL_DEBUG:
logger.error('error :: metrics_manager :: could not query Redis for metrics_manager.manage_flux_zero_fill_namespaces key: %s' % str(e))
if not manage_flux_zero_fill_namespaces:
logger.info('metrics_manager :: managing FLUX_ZERO_FILL_NAMESPACES Redis sets')
try:
self.redis_conn.delete('analyzer.flux_zero_fill_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete analyzer.flux_zero_fill_metrics Redis set')
# @added 20210407 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
if flux_aggregate_zerofill_metrics:
for i_base_name in flux_aggregate_zerofill_metrics:
flux_zero_fill_metrics.append(i_base_name)
logger.info('metrics_manager :: added %s flux_aggregate_zerofill_metrics to add to Redis sets' % str(len(flux_aggregate_zerofill_metrics)))
for i_base_name in unique_base_names:
flux_zero_fill_metric = False
pattern_match, metric_matched_by = matched_or_regexed_in_list('analyzer', i_base_name, FLUX_ZERO_FILL_NAMESPACES)
if pattern_match:
flux_zero_fill_metric = True
if flux_zero_fill_metric:
flux_zero_fill_metrics.append(i_base_name)
if flux_zero_fill_metrics:
logger.info('metrics_manager :: popuating analyzer.flux_zero_fill_metrics Redis set with %s metrics' % str(len(flux_zero_fill_metrics)))
try:
self.redis_conn.sadd('analyzer.flux_zero_fill_metrics', *set(flux_zero_fill_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.flux_zero_fill_metrics Redis set')
try:
key_timestamp = int(time())
self.redis_conn.setex(manage_flux_zero_fill_namespaces_redis_key, RUN_EVERY, key_timestamp)
except:
logger.error('error :: metrics_manager :: failed to set key %s' % manage_flux_zero_fill_namespaces_redis_key)
logger.info('metrics_manager :: checking if any metrics need to be removed from analyzer.flux_zero_fill_metrics')
flux_zero_fill_metrics_to_remove = []
flux_zero_fill_metrics_list = []
try:
flux_zero_fill_metrics_list = list(self.redis_conn_decoded.smembers('analyzer.flux_zero_fill_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from analyzer.flux_zero_fill_metrics Redis set')
for flux_zero_fill_base_name in flux_zero_fill_metrics_list:
if flux_zero_fill_base_name not in unique_base_names:
flux_zero_fill_metrics_to_remove.append(flux_zero_fill_base_name)
if flux_zero_fill_metrics_to_remove:
try:
logger.info('metrics_manager :: removing %s metrics from smtp_alerter_metrics' % str(len(flux_zero_fill_metrics_to_remove)))
self.redis_conn.srem('analyzer.flux_zero_fill_metrics', *set(flux_zero_fill_metrics_to_remove))
# Reload the new set
try:
flux_zero_fill_metrics_list = list(self.redis_conn_decoded.smembers('analyzer.flux_zero_fill_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from analyzer.flux_zero_fill_metrics Redis set after removals')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.flux_zero_fill_metrics Redis set')
else:
logger.info('metrics_manager :: no metrics need to remove from analyzer.flux_zero_fill_metrics')
if flux_zero_fill_metrics_list:
# Replace the existing flux.zero_fill_metrics Redis set
try:
self.redis_conn.sunionstore('flux.zero_fill_metrics', 'analyzer.flux_zero_fill_metrics')
logger.info('metrics_manager :: replaced flux.zero_fill_metrics Redis set with the newly created analyzer.flux_zero_fill_metrics set')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sunionstore flux.zero_fill_metrics from analyzer.flux_zero_fill_metrics Redis sets')
# @added 20220428 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
try:
flux_zero_fill_metrics_list = list(self.redis_conn_decoded.smembers('flux.zero_fill_metric'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from flux.zero_fill_metric Redis set for memcache')
try:
success = set_memcache_key(skyline_app, 'flux.zero_fill_metrics', flux_zero_fill_metrics_list)
if success:
logger.info('metrics_manager :: set memcache flux.zero_fill_metrics key')
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set flux.zero_fill_metrics - %s' % (
err))
# @added 20210407 - Feature #4004: flux - aggregator.py and FLUX_AGGREGATE_NAMESPACES
# Analyzer determines what metrics flux send the last known value for by
# creating
# the flux.last_known_value_metrics Redis set, which flux references. This
# is done in Analyzer because it manages metric Redis sets as it
# always runs. It is only managed in Analyzer every 5 mins.
if FLUX_LAST_KNOWN_VALUE_NAMESPACES or flux_aggregate_lkv_metrics:
manage_flux_last_known_value_namespaces = False
flux_last_known_value_metrics = []
# Only manage every 5 mins
manage_flux_last_known_value_namespaces_redis_key = 'metrics_manager.manage_flux_last_known_value_namespaces'
try:
manage_flux_last_known_value_namespaces = self.redis_conn.get(manage_flux_last_known_value_namespaces_redis_key)
except Exception as e:
if LOCAL_DEBUG:
logger.error('error :: metrics_manager :: could not query Redis for metrics_manager.manage_flux_last_known_value_namespaces key: %s' % str(e))
if not manage_flux_last_known_value_namespaces:
logger.info('metrics_manager :: managing FLUX_LAST_KNOWN_VALUE_NAMESPACES Redis sets')
try:
self.redis_conn.delete('analyzer.flux_last_known_value_metrics')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete analyzer.flux_last_known_value_metrics Redis set')
if flux_aggregate_lkv_metrics:
for i_base_name in flux_aggregate_lkv_metrics:
flux_last_known_value_metrics.append(i_base_name)
logger.info('metrics_manager :: added %s flux_aggregate_lkv_metrics to add to Redis sets' % str(len(flux_aggregate_lkv_metrics)))
for i_base_name in unique_base_names:
flux_last_known_value_metric = False
pattern_match, metric_matched_by = matched_or_regexed_in_list('analyzer', i_base_name, FLUX_LAST_KNOWN_VALUE_NAMESPACES)
if pattern_match:
flux_last_known_value_metric = True
if flux_last_known_value_metric:
flux_last_known_value_metrics.append(i_base_name)
if flux_last_known_value_metrics:
logger.info('metrics_manager :: popuating analyzer.flux_last_known_value_metrics Redis set with %s metrics' % str(len(flux_last_known_value_metrics)))
try:
self.redis_conn.sadd('analyzer.flux_last_known_value_metrics', *set(flux_last_known_value_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.flux_last_known_value_metrics Redis set')
try:
key_timestamp = int(time())
self.redis_conn.setex(manage_flux_last_known_value_namespaces_redis_key, RUN_EVERY, key_timestamp)
except:
logger.error('error :: metrics_manager :: failed to set key :: manage_flux_last_known_value_namespaces_redis_key, %s' % manage_flux_last_known_value_namespaces_redis_key)
logger.info('metrics_manager :: checking if any metrics need to be removed from analyzer.flux_last_known_value_metrics')
flux_last_known_value_metrics_to_remove = []
flux_last_known_value_metrics_list = []
try:
flux_last_known_value_metrics_list = list(self.redis_conn_decoded.smembers('analyzer.flux_last_known_value_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from analyzer.flux_last_known_value_metrics Redis set')
for flux_last_known_value_base_name in flux_last_known_value_metrics_list:
if flux_last_known_value_base_name not in unique_base_names:
flux_last_known_value_metrics_to_remove.append(flux_last_known_value_base_name)
if flux_last_known_value_metrics_to_remove:
try:
logger.info('metrics_manager :: removing %s metrics from analyzer.flux_last_known_value_metrics' % str(len(flux_last_known_value_metrics_to_remove)))
self.redis_conn.srem('analyzer.flux_last_known_value_metrics', *set(flux_last_known_value_metrics_to_remove))
# Reload the new set
try:
flux_last_known_value_metrics_list = list(self.redis_conn_decoded.smembers('analyzer.flux_last_known_value_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from analyzer.flux_last_known_value_metrics Redis set after removals')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.flux_last_known_value_metrics Redis set')
else:
logger.info('metrics_manager :: no metrics need to remove from analyzer.flux_last_known_value_metrics')
if flux_last_known_value_metrics_list:
# Replace the existing flux.last_known_value_metrics Redis set
try:
self.redis_conn.sunionstore('flux.last_known_value_metrics', 'analyzer.flux_last_known_value_metrics')
logger.info('metrics_manager :: replaced flux.last_known_value_metrics Redis set with the newly created analyzer.flux_last_known_value_metrics set')
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sunionstore flux.last_known_value_metrics from analyzer.flux_last_known_value_metrics Redis sets - %s' % e)
# @added 20220428 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
try:
flux_last_known_value_metrics_list = list(self.redis_conn_decoded.smembers('flux.last_known_value_metrics'))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate a list from flux.last_known_value_metrics Redis set for memcache')
try:
success = set_memcache_key(skyline_app, 'flux.last_known_value_metrics', flux_last_known_value_metrics_list)
if success:
logger.info('metrics_manager :: set memcache flux.last_known_value_metrics key')
except Exception as err:
logger.error('error :: metrics_manager :: set_memcache_key failed to set flux.last_known_value_metrics - %s' % (
err))
# @added 20220427 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well
flux_namespaces = []
metrics_manager_flux_namespaces_dict = {}
try:
flux_namespaces = get_flux_namespaces(self)
except Exception as err:
logger.error('error :: metrics_manager :: get_flux_namespaces failed - %s' % str(err))
# @added 20220210 - Feature #4284: flux - telegraf
# Manage the flux.skipped_metrics and flux.not_skipped_metrics Redis hashes
for redis_hash in ['flux.skipped_metrics', 'flux.not_skipped_metrics']:
refresh_after = 86700
if redis_hash == 'flux.skipped_metrics':
refresh_after = 3900
# @modified 20220211 - Feature #4284: flux - telegraf
# By parent_namespaces
for namespace in all_parent_namespaces:
flux_redis_metrics_dict = {}
redis_key = '%s.%s' % (redis_hash, namespace)
try:
flux_redis_metrics_dict = self.redis_conn_decoded.hgetall(redis_key)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hgetall %s Redis hash key - %s' % (
redis_key, err))
if flux_redis_metrics_dict:
removed_entries = 0
for metric in list(flux_redis_metrics_dict.keys()):
remove_entry = False
last_updated_timestamp = 0
try:
last_updated_timestamp_str = flux_redis_metrics_dict[metric]
if last_updated_timestamp_str:
last_updated_timestamp = int(float(last_updated_timestamp_str))
except KeyError:
remove_entry = True
except Exception as err:
logger.error('error :: metrics_manager :: failed to determine last_updated_timestamp for %s from %s - %s' % (
metric, redis_key, err))
if last_updated_timestamp:
if (int(spin_start) - last_updated_timestamp) >= refresh_after:
# Remove to refresh
remove_entry = True
else:
remove_entry = True
# @added 20220427 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well, compare to
# new metrics_manager sets
if redis_hash == 'flux.skipped_metrics':
if metric in metrics_manager_do_not_skip_set:
remove_entry = True
else:
if metric in metrics_manager_skip_set:
remove_entry = True
if remove_entry:
try:
self.redis_conn_decoded.hdel(redis_key, metric)
removed_entries += 1
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hdel %s from %s Redis hash key - %s' % (
metric, redis_key, err))
if removed_entries:
logger.info('metrics_manager :: removing %s metrics from %s Redis hash to be refreshed' % (
redis_key, str(removed_entries)))
# @added 20220427 - Feature #4536: Handle Redis failure
# Add flux required data to memcache as well, compare to
# new metrics_manager sets
if namespace in flux_namespaces:
add_entries = []
if redis_hash == 'flux.skipped_metrics':
namespace_skipped = []
for metric in metrics_manager_skip_set:
if metric.startswith(namespace):
namespace_skipped.append(metric)
for metric in namespace_skipped:
if metric not in list(flux_redis_metrics_dict.keys()):
add_entries.append(metric)
if redis_hash == 'flux.not_skipped_metrics':
namespace_not_skipped = []
for metric in metrics_manager_do_not_skip_set:
if metric.startswith(namespace):
namespace_not_skipped.append(metric)
for metric in namespace_not_skipped:
if metric not in list(flux_redis_metrics_dict.keys()):
add_entries.append(metric)
if add_entries:
add_entries_dict = {}
current_timestamp = int(time())
for metric in add_entries:
add_entries_dict[metric] = current_timestamp
try:
self.redis_conn_decoded.hset(redis_key, mapping=add_entries_dict)
logger.info('metrics_manager :: added %s metrics to %s' % (str(len(add_entries)), redis_key))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to hset mapping for %s Redis hash key - %s' % (
redis_key, err))
# @added 20210513 - Feature #4068: ANALYZER_SKIP
analyzer_skip_metrics = []
if ANALYZER_SKIP:
logger.info('metrics_manager :: determining ANALYZER_SKIP metrics')
for i_metric in unique_metrics:
pattern_match, metric_matched_by = matched_or_regexed_in_list('analyzer', i_metric, ANALYZER_SKIP)
if pattern_match:
analyzer_skip_metrics.append(i_metric)
if analyzer_skip_metrics:
logger.info('metrics_manager :: adding %s metrics to analyzer.metrics_manager.analyzer_skip' % str(len(analyzer_skip_metrics)))
try:
self.redis_conn.sadd('new.analyzer.metrics_manager.analyzer_skip', *set(analyzer_skip_metrics))
self.redis_conn.sunionstore('analyzer.metrics_manager.analyzer_skip', 'new.analyzer.metrics_manager.analyzer_skip')
logger.info('metrics_manager :: replaced analyzer.metrics_manager.analyzer_skip Redis set with the newly created new.analyzer.metrics_manager.analyzer_skip set')
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to sunionstore analyzer.metrics_manager.analyzer_skip from new.analyzer.metrics_manager.analyzer_skip Redis sets - %s' % e)
del analyzer_skip_metrics
# @added 20210519 - Feature #4076: CUSTOM_STALE_PERIOD
custom_stale_periods = []
custom_stale_period_metrics = []
default_metric_stale_period = int(settings.STALE_PERIOD)
if not CUSTOM_STALE_PERIOD:
logger.info('metrics_manager :: no CUSTOM_STALE_PERIOD metrics defined')
if CUSTOM_STALE_PERIOD:
logger.info('metrics_manager :: determining known CUSTOM_STALE_PERIOD metrics from analyzer.metrics_manager.custom_stale_period')
custom_stale_metrics_hash_key = 'analyzer.metrics_manager.custom_stale_periods'
known_custom_stale_metrics_dict = {}
try:
known_custom_stale_metrics_dict = self.redis_conn_decoded.hgetall(custom_stale_metrics_hash_key)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create known_custom_stale_metrics_dict from Redis hash key %s - %s' % (
custom_stale_metrics_hash_key, e))
known_custom_stale_metrics = []
if known_custom_stale_metrics_dict:
try:
known_custom_stale_metrics = list(known_custom_stale_metrics_dict.keys())
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create list of known_custom_stale_metrics from known_custom_stale_metrics_dict - %s' % (
e))
logger.info('metrics_manager :: determined %s known CUSTOM_STALE_PERIOD metrics from analyzer.metrics_manager.custom_stale_period' % str(len(known_custom_stale_metrics)))
logger.info('metrics_manager :: determining unique_base_names matching CUSTOM_STALE_PERIOD')
# @added 20210601 - Feature #4000: EXTERNAL_SETTINGS
# Pass the external_settings as a custom_stale_period argument
if not external_settings:
try:
# debug
# external_settings = get_external_settings(skyline_app)
external_settings = get_external_settings(skyline_app, None, True)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: get_external_settings failed - %s' % (
e))
for i_metric in unique_base_names:
if i_metric in custom_stale_period_metrics:
continue
metric_stale_period = int(default_metric_stale_period)
try:
# @modified 20210601 - Feature #4000: EXTERNAL_SETTINGS
# Pass the external_settings as a custom_stale_period argument
metric_stale_period = custom_stale_period(skyline_app, i_metric, external_settings, log=False)
except Exception as e:
logger.error('error :: failed running custom_stale_period - %s' % e)
metric_stale_period = int(default_metric_stale_period)
if int(float(metric_stale_period)) != default_metric_stale_period:
custom_stale_periods.append([i_metric, int(float(metric_stale_period))])
custom_stale_period_metrics.append(i_metric)
logger.info('metrics_manager :: determined %s custom_stale_period metrics' % (
str(len(custom_stale_periods))))
if custom_stale_periods:
metrics_unchanged = 0
metrics_added_to_hash_key = 0
metrics_updated_in_hash_key = 0
metrics_removed_from_hash_key = 0
logger.info('metrics_manager :: managing %s metrics in analyzer.metrics_manager.custom_stale_period hash key' % (
str(len(custom_stale_periods))))
for i_metric, i_stale_period in custom_stale_periods:
update_hash_key = False
to_add_to_hash_key = False
if i_metric in known_custom_stale_metrics:
if int(float(i_stale_period)) != int(float(known_custom_stale_metrics_dict[i_metric])):
update_hash_key = True
else:
metrics_unchanged += 1
else:
update_hash_key = True
to_add_to_hash_key = True
if update_hash_key:
try:
self.redis_conn.hset(
custom_stale_metrics_hash_key,
i_metric, int(float(i_stale_period)))
if to_add_to_hash_key:
metrics_added_to_hash_key += 1
else:
metrics_updated_in_hash_key += 1
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.hash_key.metrics_data_sparsity for - %s - %s' % (
str(metric_name), e))
metrics_to_remove_from_custom_stale_period_hash = []
for i_metric in known_custom_stale_metrics:
if i_metric not in custom_stale_period_metrics:
metrics_to_remove_from_custom_stale_period_hash.append(i_metric)
if metrics_to_remove_from_custom_stale_period_hash:
try:
self.redis_conn.hdel(custom_stale_metrics_hash_key, *set(metrics_to_remove_from_custom_stale_period_hash))
metrics_removed_from_hash_key = len(set(metrics_to_remove_from_custom_stale_period_hash))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to remove %s metricsfrom the Redis hash key %s - %s' % (
str(len(metrics_to_remove_from_custom_stale_period_hash)),
custom_stale_metrics_hash_key, e))
logger.info('metrics_manager :: managed analyzer.metrics_manager.custom_stale_period - unchanged: %s, added: %s, updated: %s, removed: %s' % (
str(metrics_unchanged), str(metrics_added_to_hash_key),
str(metrics_updated_in_hash_key),
str(metrics_removed_from_hash_key)))
# @added 20210519 - Feature #4076: CUSTOM_STALE_PERIOD
# Branch #1444: thunder
# Prune any entries out of the analyzer.metrics.last_timeseries_timestamp
# has key that are older that FULL_DURATION
logger.info('metrics_manager :: pruning analyzer.metrics.last_timeseries_timestamp Redis hash key')
removed_from_hash = None
prune_hash_key = 'analyzer.metrics.last_timeseries_timestamp'
older_than_timestamp = int(time()) - settings.FULL_DURATION
try:
removed_from_hash = prune_metrics_timestamp_hash_key(skyline_app, prune_hash_key, older_than_timestamp, True)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: prune_metrics_timestamp_hash_key failed - %s' % e)
logger.info('metrics_manager :: removed %s entries from %s hash key' % (
str(removed_from_hash), prune_hash_key))
# @added 20220419 - Feature #4528: metrics_manager - derivative_metric_check
# Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
longterm_non_derivative_metrics = []
try:
longterm_non_derivative_metrics = derivative_metrics_check(self)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: derivative_metrics_check failed - %s' % err)
logger.info('metrics_manager :: identified %s longterm_non_derivative_metrics' % str(len(longterm_non_derivative_metrics)))
del longterm_non_derivative_metrics
# @added 20220410 - Task #4514: Integrate opentelemetry
# Feature #4516: flux - opentelemetry traces
do_not_alert_on_stale_metrics_list = []
logger.info('metrics_manager :: determining do_not_alert_on_stale_metrics')
try:
do_not_alert_on_stale_metrics_list = do_not_alert_on_stale_metrics(self, external_settings, unique_base_names)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: do_not_alert_on_stale_metrics failed - %s' % (
err))
del do_not_alert_on_stale_metrics_list
# @added 20210518 - Branch #1444: thunder
# Determine stale metrics AND custom stale metrics per parent namespace.
# If any are present send them to thunder.
namespace_stale_metrics_dict = {}
namespace_recovered_metrics_dict = {}
try:
namespace_stale_metrics_dict, namespace_recovered_metrics_dict = thunder_stale_metrics(skyline_app, log=True)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: thunder_stale_metrics falied to get get namespace_stale_metrics_dict via - %s' % (
e))
logger.info('metrics_manager :: %s namespaces checked for stale metrics discovered with thunder_stale_metrics' % (
str(len(namespace_stale_metrics_dict))))
if namespace_stale_metrics_dict:
try:
self.redis_conn.set('analyzer.metrics_manager.namespaces.stale_metrics', str(namespace_stale_metrics_dict))
logger.info('metrics_manager :: set analyzer.metrics_manager.namespaces.stale_metrics Redis key')
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set analyzer.metrics_manager.namespaces.stale_metrics Redis key - %s' % e)
# Thunder stale_metrics alerts on recovered metrics
if namespace_recovered_metrics_dict:
try:
self.redis_conn.set('analyzer.metrics_manager.namespaces.recovered.stale_metrics', str(namespace_recovered_metrics_dict))
logger.info('metrics_manager :: set analyzer.metrics_manager.namespaces.recovered.stale_metrics Redis key')
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set analyzer.metrics_manager.namespaces.recovered.stale_metrics Redis key - %s' % e)
logger.info('metrics_manager :: managing thunder alerted on stale metrics hash key')
removed_from_hash = None
try:
removed_from_hash = manage_thunder_alerted_on_stale_metrics_hash_key(skyline_app, True)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: manage_thunder_alerted_on_stale_metrics_hash_key failed - %s' % e)
logger.info('metrics_manager :: removed %s from thunder alerted on stale metrics hash key' % str(removed_from_hash))
# @added 20210518 - Branch #1444: thunder
# Determine no data per parent namespace and if any are found send them
# to thunder
namespaces_no_data_dict = {}
try:
namespaces_no_data_dict = thunder_no_data(skyline_app, log=True)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: thunder_no_data failed - %s' % (
e))
if namespaces_no_data_dict:
logger.info('metrics_manager :: %s namespaces discovered not receiving data with thunder_no_data' % (
str(len(namespaces_no_data_dict))))
else:
logger.info('metrics_manager :: thunder_no_data - all namespaces receiving data OK')
# @added 20220110 - Bug #4364: Prune old thunder.events
# Branch #1444: thunder
# Any thunder events that are not acted on subsequent remain in the
# thunder.events set. Remove entries older than 3 days.
thunder_events_list = []
try:
thunder_events_list = list(self.redis_conn_decoded.smembers('thunder.events'))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: smembers thunder.events falied - %s' % (
e))
if thunder_events_list:
pruned_events = 0
oldest_event_timestamp = (int(spin_start) - (86400 * 3))
thunder_events_list_len = len(thunder_events_list)
logger.info('metrics_manager :: checking %s thunder.events items for pruning' % (
str(thunder_events_list_len)))
for event_item in thunder_events_list:
remove_item = False
event_timestamp = 0
event_expiry = 0
try:
event_data = literal_eval(event_item)
event_timestamp = int(float(event_data['timestamp']))
event_expiry = int(float(event_data['expiry']))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: thunder.events determining event.timestamp failed - %s' % (
err))
if not event_timestamp:
remove_item = True
if event_timestamp < oldest_event_timestamp:
remove_item = True
if event_expiry:
if int(spin_start) > event_timestamp + event_expiry:
remove_item = True
if remove_item:
# Delete the item from the Redis set
try:
update_redis_set(
skyline_app, 'thunder.events', event_item,
'remove', False)
pruned_events += 1
except Exception as err:
logger.error('error :: could not remove item from Redis set thunder.events - %s' % (
err))
logger.info('metrics_manager :: pruned %s old thunder.events items' % (
str(pruned_events)))
# @added 20210720 - Feature #4188: metrics_manager.boundary_metrics
if settings.BOUNDARY_METRICS:
# Build boundary metrics
boundary_metrics_hash_key = 'metrics_manager.boundary_metrics'
boundary_metrics = {}
for base_name in unique_base_names:
try:
for metric in settings.BOUNDARY_METRICS:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, [metric[0]])
if pattern_match:
boundary_metrics[base_name] = {}
algorithm = metric[1]
boundary_metrics[base_name][algorithm] = {}
boundary_metrics[base_name][algorithm]['expiry'] = metric[2]
boundary_metrics[base_name][algorithm]['min_average'] = metric[3]
boundary_metrics[base_name][algorithm]['min_average_seconds'] = metric[4]
boundary_metrics[base_name][algorithm]['trigger_value'] = metric[5]
boundary_metrics[base_name][algorithm]['alert_threshold'] = metric[6]
boundary_metrics[base_name][algorithm]['alert_vias'] = metric[7]
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine boundary_metrics - %s' % e)
boundary_metrics_redis_dict = {}
boundary_metrics_base_names = []
logger.info('metrics_manager :: %s boundary_metrics identified from the %s unique_base_names' % (
str(len(boundary_metrics)), str(len(unique_base_names))))
boundary_metrics_keys_updated = 0
if boundary_metrics:
boundary_metrics_base_names = list(boundary_metrics.keys())
try:
boundary_metrics_redis_dict = self.redis_conn_decoded.hgetall(boundary_metrics_hash_key)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key %s - %s' % (
boundary_metrics_hash_key, e))
for base_name in boundary_metrics_base_names:
try:
self.redis_conn.hset(
boundary_metrics_hash_key, base_name,
str(boundary_metrics[base_name]))
boundary_metrics_keys_updated += 1
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry to %s for - %s - %s' % (
boundary_metrics_hash_key, base_name, e))
# Remove entries not defined in BOUNDARY_METRICS
boundary_metrics_keys_removed = 0
if boundary_metrics and boundary_metrics_redis_dict:
for base_name in list(boundary_metrics_redis_dict.keys()):
if base_name not in boundary_metrics_base_names:
try:
self.redis_conn.hdel(boundary_metrics_hash_key, base_name)
boundary_metrics_keys_removed += 1
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete entry in %s for - %s - %s' % (
boundary_metrics_hash_key, base_name, e))
if boundary_metrics:
logger.info('metrics_manager :: updated: %s, removed: %s entries in the Redis hash key %s' % (
str(boundary_metrics_keys_updated),
str(boundary_metrics_keys_removed), boundary_metrics_hash_key))
# @added 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Create the metrics_manager.last_known_value_metrics Redis set
last_known_value_metrics_list = []
logger.info('metrics_manager :: determining last_known_value_metrics')
try:
last_known_value_metrics_list = last_known_value_metrics(self, external_settings, unique_base_names)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: last_known_value_metrics failed - %s' % (
err))
del last_known_value_metrics_list
# @added 20220406 - Feature #4520: settings - ZERO_FILL_NAMESPACES
# Create the metrics_manager.zero_fill_metrics Redis set
zero_fill_metrics_list = []
logger.info('metrics_manager :: determining zero_fill_metrics')
try:
zero_fill_metrics_list = zero_fill_metrics(self, external_settings, unique_base_names)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: zero_fill_metrics failed - %s' % (
err))
del zero_fill_metrics_list
# @added 20220414 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
non_derivative_metrics_list = []
try:
non_derivative_metrics_list = non_derivative_metrics(self, external_settings, unique_base_names)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: non_derivative_metrics failed - %s' % (
err))
del non_derivative_metrics_list
# @added 20220421 - Task #3800: Handle feedback metrics in Mirage and waterfall alerts
# Create a Redis hash of filesafe_name to base_name
filesafe_names_dict = {}
try:
filesafe_names_dict = filesafe_names(self, unique_base_names)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: filesafe_names failed - %s' % (
err))
del filesafe_names_dict
# @added 20220420 - Feature #4530: namespace.analysed_events
namespace_analysed_events_managed = False
try:
namespace_analysed_events_managed = namespace_analysed_events(self)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: namespace_analysed_events failed - %s' % (
err))
logger.info('metrics_manager :: metrics_manager.namespace.analysed_events managed: %s' % (
str(namespace_analysed_events_managed)))
del unique_base_names
# @added 20201030 - Feature #3808: ANALYZER_DYNAMICALLY_ANALYZE_LOW_PRIORITY_METRICS
# Remove any entries in the Redis low_priority_metrics_hash_key
# that are not in unique_metrics
if ANALYZER_MANAGE_LOW_PRIORITY_METRICS:
logger.info('metrics_manager :: managing the Redis hash key %s and removing any metrics not in unique_metrics' % (
low_priority_metrics_hash_key))
low_priority_metrics_last_analyzed = []
raw_low_priority_metrics_last_analyzed = {}
try:
raw_low_priority_metrics_last_analyzed = self.redis_conn_decoded.hgetall(low_priority_metrics_hash_key)
for base_name_bytes in raw_low_priority_metrics_last_analyzed:
base_name = str(base_name_bytes)
last_analyzed = int(raw_low_priority_metrics_last_analyzed[base_name])
low_priority_metrics_last_analyzed.append([base_name, last_analyzed])
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key %s' % (
low_priority_metrics_hash_key))
low_priority_metrics_last_analyzed = []
del raw_low_priority_metrics_last_analyzed
low_priority_analyzed_metrics = []
if low_priority_metrics_last_analyzed:
try:
low_priority_analyzed_metrics = [item[0] for item in low_priority_metrics_last_analyzed]
logger.info('metrics_manager :: there are %s metrics in the Redis hash key %s' % (
str(len(low_priority_analyzed_metrics)),
low_priority_metrics_hash_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate low_priority_metrics_last_analyzed')
low_priority_analyzed_metrics = []
try:
del low_priority_metrics_last_analyzed
except:
pass
if low_priority_analyzed_metrics:
low_priority_analyzed_metrics_set = None
try:
low_priority_analyzed_metrics_set = set(low_priority_analyzed_metrics)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate low_priority_analyzed_metrics_set')
try:
del low_priority_analyzed_metrics
except:
pass
unique_metrics_set = None
try:
unique_metrics_list = list(unique_metrics)
unique_metrics_set = set(unique_metrics_list)
del unique_metrics_list
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to generate unique_metrics_set')
if low_priority_analyzed_metrics_set and unique_metrics_set:
low_priority_metrics_to_remove = []
try:
set_difference = low_priority_analyzed_metrics_set.difference(unique_metrics_set)
for metric in set_difference:
low_priority_metrics_to_remove.append(metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: determining difference between low_priority_analyzed_metrics_set and unique_metrics_set')
try:
del low_priority_analyzed_metrics_set
except:
pass
try:
del unique_metrics_set
except:
pass
try:
del set_difference
except:
pass
if low_priority_metrics_to_remove:
try:
logger.info('metrics_manager :: removing %s metrics from the Redis hash key %s' % (
str(len(low_priority_metrics_to_remove)),
low_priority_metrics_hash_key))
self.redis_conn.hdel(low_priority_metrics_hash_key, *set(low_priority_metrics_to_remove))
logger.info('metrics_manager :: removed %s metrics from the Redis hash key %s' % (
str(len(low_priority_metrics_to_remove)),
low_priority_metrics_hash_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to remove the low_priority_metrics_to_remove the Redis hash key %s' % (
low_priority_metrics_hash_key))
try:
del low_priority_metrics_to_remove
except:
pass
else:
logger.info('metrics_manager :: no metrics need to be removed from the Redis hash key %s' % (
low_priority_metrics_hash_key))
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Surface the Redis hash key data
added_metric_resolution_keys = 0
updated_metric_resolution_keys = 0
metrics_resolutions_hash_key = 'analyzer.metrics_manager.resolutions'
metrics_resolutions_dict = {}
try:
metrics_resolutions_dict = self.redis_conn_decoded.hgetall(metrics_resolutions_hash_key)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get %s Redis hash key - %s' % (
metrics_resolutions_hash_key, e))
metrics_resolutions_dict = {}
# Set check_metrics before CHECK_DATA_SPARSITY
check_metrics = list(unique_metrics)
# @added 20210621 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
metrics_with_unknown_resolution = []
# @added 20201209 - Feature #3870: metrics_manager - check_data_sparsity
if CHECK_DATA_SPARSITY:
check_data_sparsity_start = time()
logger.info('metrics_manager :: checking data sparsity')
# @added 20201210 - Feature #3870: metrics_manager - check_data_sparsity
# Allow SKIP_CHECK_DATA_SPARSITY_NAMESPACES
check_metrics = list(unique_metrics)
do_not_check_metrics = []
if SKIP_CHECK_DATA_SPARSITY_NAMESPACES:
logger.info('metrics_manager :: determing which metric to skip checking data sparsity on as SKIP_CHECK_DATA_SPARSITY_NAMESPACES has %s namespaces declared' % str(len(SKIP_CHECK_DATA_SPARSITY_NAMESPACES)))
check_metrics = []
skip_check_data_sparsity_error_logged = False
try:
for metric in unique_metrics:
try:
metric_name = str(metric)
if metric_name.startswith(settings.FULL_NAMESPACE):
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric_name
pattern_match = None
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, SKIP_CHECK_DATA_SPARSITY_NAMESPACES)
except Exception as e:
if not skip_check_data_sparsity_error_logged:
logger.error('error :: metrics_manager :: an error occurred while matched_or_regexed_in_list in SKIP_CHECK_DATA_SPARSITY_NAMESPACES - %s' % e)
skip_check_data_sparsity_error_logged = True
pattern_match = False
if pattern_match:
do_not_check_metrics.append([metric_name, metric_matched_by])
else:
check_metrics.append(metric_name)
except Exception as e:
# Only log one error as to not fill the log
if not skip_check_data_sparsity_error_logged:
logger.error('error :: metrics_manager :: an error occurred while evaluating %s for check_metrics in SKIP_CHECK_DATA_SPARSITY_NAMESPACES - %s' % (str(metric), e))
skip_check_data_sparsity_error_logged = True
else:
pass
except Exception as e:
logger.error('error :: metrics_manager :: an error occurred while determining check_metrics in SKIP_CHECK_DATA_SPARSITY_NAMESPACES - %s' % e)
if do_not_check_metrics:
try:
logger.info('metrics_manager :: excluding %s metrics from check_data_sparsity as they match a namespace in SKIP_CHECK_DATA_SPARSITY_NAMESPACES' % str(len(do_not_check_metrics)))
try:
self.redis_conn.set(
'analyzer.metrics_manager.metrics_sparsity.skip_check_data_sparsity_metrics', str(do_not_check_metrics))
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.metrics_sparsity.skip_check_data_sparsity_metrics: %s' % e)
except Exception as e:
logger.error('error :: metrics_manager :: an error occurred while setting Redis analyzer.metrics_manager.metrics_sparsity.skip_check_data_sparsity_metrics - %s' % e)
# Multi get series
try:
# @modified 20201210 - Feature #3870: metrics_manager - check_data_sparsity
# Allow SKIP_CHECK_DATA_SPARSITY_NAMESPACES
# raw_assigned = self.redis_conn.mget(unique_metrics)
raw_assigned = self.redis_conn.mget(check_metrics)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get check_metrics from Redis')
raw_assigned = []
if not raw_assigned:
logger.info('error :: metrics_manager :: No raw_assigned set, returning')
else:
logger.info('metrics_manager :: checking data sparsity on %s metric timeseries from Redis' % str(len(raw_assigned)))
last_metrics_data_sparsity = {}
try:
last_metrics_data_sparsity = self.redis_conn_decoded.hgetall('analyzer.metrics_manager.hash_key.metrics_data_sparsity')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get analyzer.metrics_manager.hash_key.metrics_data_sparsity Redis hash key')
last_metrics_data_sparsity = {}
try:
self.redis_conn.delete('analyzer.metrics_manager.hash_key.metrics_data_sparsity')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to delete analyzer.metrics_manager.hash_key.metrics_data_sparsity')
# @added 20210617 - Feature #3870: metrics_manager - check_data_sparsity
# Branch ##1444: thunder
metrics_timestamp_resolutions_count_dict = {}
try:
metrics_timestamp_resolutions_count_dict = self.redis_conn_decoded.hgetall('analyzer.metrics_manager.hash_key.metrics_timestamp_resolutions')
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get analyzer.metrics_manager.hash_key.metrics_timestamp_resolutions Redis hash key')
metrics_timestamp_resolutions_count_dict = {}
check_sparsity_error_log = False
check_sparsity_error_count = 0
added_data_sparsity_keys = 0
metrics_sparsity = []
metrics_fully_populated = [] # 100% of datapoints
metrics_sparsity_decreasing = [] # becoming more densely populated (good)
metrics_sparsity_increasing = [] # becoming more sparsely populated (bad)
metrics_stale = []
metrics_inactive = []
sparsities = []
metrics_sparsity_use_namespace = skyline_app_graphite_namespace + '.metrics_sparsity'
# Distill timeseries strings into lists
# @modified 20201210 - Feature #3870: metrics_manager - check_data_sparsity
# Allow SKIP_CHECK_DATA_SPARSITY_NAMESPACES
# for i, metric_name in enumerate(unique_metrics):
for check_metric_index, metric_name in enumerate(check_metrics):
try:
try:
raw_series = raw_assigned[check_metric_index]
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
except:
timeseries = []
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Populate Redis hash key data with the base_name
metric_name = str(metric_name)
if metric_name.startswith(settings.FULL_NAMESPACE):
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric_name
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Use determine_data_frequency function
metric_resolution = 0
timestamp_resolutions_count = {}
try:
metric_resolution, timestamp_resolutions_count = determine_data_frequency(skyline_app, timeseries, False)
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: determine_data_frequency failed for %s - %s' % (
base_name, e))
check_sparsity_error_log = True
check_sparsity_error_count += 1
update_metric_timestamp_resolutions_key = False
metric_timestamp_resolutions_count_str = None
try:
metric_timestamp_resolutions_count_str = metrics_timestamp_resolutions_count_dict[metric_name]
except KeyError:
metric_timestamp_resolutions_count_str = None
update_metric_timestamp_resolutions_key = True
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get entry from metrics_timestamp_resolutions_count_dict for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
metric_timestamp_resolutions_count_str = None
update_metric_timestamp_resolutions_key = True
if metric_timestamp_resolutions_count_str:
if metric_timestamp_resolutions_count_str != str(dict(timestamp_resolutions_count)):
update_metric_timestamp_resolutions_key = True
if update_metric_timestamp_resolutions_key:
try:
self.redis_conn.hset(
'analyzer.metrics_manager.hash_key.metrics_timestamp_resolutions',
metric_name, str(dict(timestamp_resolutions_count)))
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.hash_key.metrics_timestamp_resolutions for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Feature #4144: webapp - stale_metrics API endpoint
# Feature #4076: CUSTOM_STALE_PERIOD
update_metrics_resolutions_key = True
last_known_metric_resolution = None
if metric_resolution and metrics_resolutions_dict:
try:
last_known_metric_resolution = metrics_resolutions_dict[base_name]
except KeyError:
update_metrics_resolutions_key = True
last_known_metric_resolution = None
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: update_metrics_resolutions_key - failed to determine metric resolution for %s - %s' % (
base_name, e))
check_sparsity_error_log = True
check_sparsity_error_count += 1
last_known_metric_resolution = None
if last_known_metric_resolution:
if int(float(last_known_metric_resolution)) == metric_resolution:
update_metrics_resolutions_key = False
# @added 20210621 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
if not metric_resolution:
if update_metrics_resolutions_key:
update_metrics_resolutions_key = False
metrics_with_unknown_resolution.append(base_name)
# @added 20210702 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Never set the metric_resolution to a negative int
if update_metrics_resolutions_key:
if int(metric_resolution) < 1:
logger.info('metrics_manager :: update_metrics_resolutions_key - not updating %s resolution from %s to %s (invalid resolution)' % (
base_name, str(last_known_metric_resolution),
str(metric_resolution)))
update_metrics_resolutions_key = False
if update_metrics_resolutions_key:
if last_known_metric_resolution is not None:
logger.info('metrics_manager :: update_metrics_resolutions_key - updating %s resolution from %s to %s' % (
base_name, str(last_known_metric_resolution),
str(metric_resolution)))
updated_metric_resolution_keys += 1
else:
added_metric_resolution_keys += 1
try:
self.redis_conn.hset(
metrics_resolutions_hash_key,
base_name, int(metric_resolution))
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.hash_key.metrics_data_sparsity for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
check_sparsity_error_count += 1
# @added 20210702 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Explicitly set data_sparsity
data_sparsity = None
if metric_resolution:
try:
data_sparsity = determine_data_sparsity(skyline_app, timeseries, resolution=metric_resolution, log=False)
except Exception as e:
if not check_sparsity_error_log:
logger.error('error :: metrics_manager :: determine_data_sparsity failed during check_data_sparsity - %s' % str(e))
check_sparsity_error_log = True
check_sparsity_error_count += 1
previous_sparsity = 0
if last_metrics_data_sparsity:
try:
previous_sparsity = float(last_metrics_data_sparsity[metric_name])
except:
pass
if data_sparsity or data_sparsity == 0:
try:
self.redis_conn.hset(
'analyzer.metrics_manager.hash_key.metrics_data_sparsity',
metric_name, data_sparsity)
added_data_sparsity_keys += 1
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.hash_key.metrics_data_sparsity for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
check_sparsity_error_count += 1
sparsity_change = 0
# if data_sparsity >= 100:
if data_sparsity >= settings.FULLY_POPULATED_PERCENTAGE:
metrics_fully_populated.append(metric_name)
else:
if previous_sparsity < data_sparsity:
metrics_sparsity_decreasing.append(metric_name)
sparsity_change = previous_sparsity - data_sparsity
if previous_sparsity > data_sparsity:
metrics_sparsity_increasing.append(metric_name)
sparsity_change = previous_sparsity - data_sparsity
metric_sparsity_dict = {
'metric': metric_name,
'timestamp': int(timeseries[-1][0]),
'resolution': metric_resolution,
'data_sparsity': data_sparsity,
'last_data_sparsity': previous_sparsity,
'change': sparsity_change,
}
metrics_sparsity.append(str(metric_sparsity_dict))
sparsities.append(data_sparsity)
except Exception as e:
if not check_sparsity_error_log:
logger.error('error :: metrics_manager :: an error occurred during check_data_sparsity - %s' % str(e))
check_sparsity_error_log = True
check_sparsity_error_count += 1
logger.info('metrics_manager :: check_data_sparsity added %s metrics of %s total metrics to analyzer.metrics_manager.hash_key.metrics_data_sparsity Redis hash key' % (
str(added_data_sparsity_keys), str(len(unique_metrics))))
if metrics_fully_populated:
metrics_fully_populated_count = len(metrics_fully_populated)
try:
self.redis_conn.rename('analyzer.metrics_manager.metrics_fully_populated', 'aet.analyzer.metrics_manager.metrics_fully_populated')
logger.info('metrics_manager :: created the aet.analyzer.metrics_manager.metrics_fully_populated Redis set')
except:
logger.error('metrics_manager :: failed to created the aet.analyzer.metrics_manager.metrics_fully_populated Redis set')
try:
self.redis_conn.sadd('analyzer.metrics_manager.metrics_fully_populated', *set(metrics_fully_populated))
logger.info('metrics_manager :: created and added %s metrics to the analyzer.metrics_manager.metrics_fully_populated Redis set' % str(metrics_fully_populated_count))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.metrics_manager.metrics_fully_populated Redis set')
try:
self.redis_conn.set(
'analyzer.metrics_manager.metrics_sparsity.metrics_fully_populated', metrics_fully_populated_count)
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.metrics_sparsity.metrics_fully_populated: %s' % e)
send_metric_name = metrics_sparsity_use_namespace + '.metrics_fully_populated'
try:
send_graphite_metric(skyline_app, send_metric_name, str(metrics_fully_populated_count))
logger.info('metrics_manager :: sent Graphite metric - %s %s' % (send_metric_name, str(metrics_fully_populated_count)))
except Exception as e:
logger.error('error :: metrics_manager :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(metrics_fully_populated_count), e))
if metrics_sparsity_decreasing:
metrics_sparsity_decreasing_count = len(metrics_sparsity_decreasing)
try:
self.redis_conn.rename('analyzer.metrics_manager.metrics_sparsity_decreasing', 'aet.analyzer.metrics_manager.metrics_sparsity_decreasing')
logger.info('metrics_manager :: created the aet.analyzer.metrics_manager.metrics_sparsity_decreasing Redis set')
except:
logger.error('metrics_manager :: failed to created the aet.analyzer.metrics_manager.metrics_sparsity_decreasing Redis set')
try:
self.redis_conn.sadd('analyzer.metrics_manager.metrics_sparsity_decreasing', *set(metrics_sparsity_decreasing))
logger.info('metrics_manager :: created and added %s metrics to the analyzer.metrics_manager.metrics_sparsity_decreasing Redis set' % str(metrics_sparsity_decreasing_count))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.metrics_manager.metrics_sparsity_decreasing Redis set')
try:
self.redis_conn.set(
'analyzer.metrics_manager.metrics_sparsity.metrics_sparsity_decreasing', metrics_sparsity_decreasing_count)
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.metrics_sparsity.metrics_sparsity_decreasing: %s' % e)
send_metric_name = metrics_sparsity_use_namespace + '.metrics_sparsity_decreasing'
try:
send_graphite_metric(skyline_app, send_metric_name, str(metrics_sparsity_decreasing_count))
logger.info('metrics_manager :: sent Graphite metric - %s %s' % (send_metric_name, str(metrics_sparsity_decreasing_count)))
except Exception as e:
logger.error('error :: metrics_manager :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(metrics_sparsity_decreasing_count), e))
if metrics_sparsity_increasing:
metrics_sparsity_increasing_count = len(metrics_sparsity_increasing)
try:
self.redis_conn.rename('analyzer.metrics_manager.metrics_sparsity_increasing', 'aet.analyzer.metrics_manager.metrics_sparsity_increasing')
logger.info('metrics_manager :: created the aet.analyzer.metrics_manager.metrics_sparsity_increasing Redis set')
except:
logger.error('metrics_manager :: failed to created the aet.analyzer.metrics_manager.metrics_sparsity_increasing Redis set')
try:
self.redis_conn.sadd('analyzer.metrics_manager.metrics_sparsity_increasing', *set(metrics_sparsity_increasing))
logger.info('metrics_manager :: created and added %s metrics to the analyzer.metrics_manager.metrics_sparsity_increasing Redis set' % str(metrics_sparsity_increasing_count))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.metrics_manager.metrics_sparsity_increasing Redis set')
try:
self.redis_conn.set(
'analyzer.metrics_manager.metrics_sparsity.metrics_sparsity_increasing', metrics_sparsity_increasing_count)
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.metrics_sparsity.metrics_sparsity_increasing: %s' % e)
send_metric_name = metrics_sparsity_use_namespace + '.metrics_sparsity_increasing'
try:
send_graphite_metric(skyline_app, send_metric_name, str(metrics_sparsity_increasing_count))
logger.info('metrics_manager :: sent Graphite metric - %s %s' % (send_metric_name, str(metrics_sparsity_increasing_count)))
except Exception as e:
logger.error('error :: metrics_manager :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(metrics_sparsity_increasing_count), e))
if metrics_sparsity:
try:
self.redis_conn.rename('analyzer.metrics_manager.metrics_sparsity', 'aet.analyzer.metrics_manager.metrics_sparsity')
logger.info('metrics_manager :: created the aet.analyzer.metrics_manager.metrics_sparsity Redis set')
except:
logger.error('metrics_manager :: failed to created the aet.analyzer.metrics_manager.metrics_sparsity Redis set')
try:
self.redis_conn.sadd('analyzer.metrics_manager.metrics_sparsity', *set(metrics_sparsity))
logger.info('metrics_manager :: created and added %s metrics to the analyzer.metrics_manager.metrics_sparsity Redis set' % str(len(metrics_sparsity)))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.metrics_manager.metrics_sparsity_increasing Redis set')
avg_sparsity = None
if sparsities:
float_sparsities = []
for item in sparsities:
try:
float_sparsities.append(float(item))
except:
pass
try:
avg_sparsity = sum(float_sparsities) / len(sparsities)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to calculate avg_sparsity')
if avg_sparsity:
try:
self.redis_conn.set(
'analyzer.metrics_manager.metrics_sparsity.avg_sparsity', float(avg_sparsity))
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.metrics_sparsity.avg_sparsity: %s' % e)
send_metric_name = metrics_sparsity_use_namespace + '.avg_sparsity'
try:
send_graphite_metric(skyline_app, send_metric_name, str(avg_sparsity))
logger.info('metrics_manager :: sent Graphite metric - %s %s' % (send_metric_name, str(avg_sparsity)))
except Exception as e:
logger.error('error :: metrics_manager :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(avg_sparsity), e))
try:
self.redis_conn.setex(
'analyzer.metrics_manager.sent.metrics_sparsity.metrics', 60, int(time()))
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.sent.metrics_sparsity.metrics: %s' % e)
logger.info('metrics_manager :: check_data_sparsity - of the %s unique_metrics, %s metrics are fully populated' % (
str(len(unique_metrics)), str(len(metrics_fully_populated))))
logger.info('metrics_manager :: check_data_sparsity - of the %s unique_metrics, %s metrics are increasing in sparsity (this is could be bad)' % (
str(len(unique_metrics)), str(len(metrics_sparsity_increasing))))
logger.info('metrics_manager :: check_data_sparsity - of the %s unique_metrics, %s metrics are decreasing in sparsity (this is good)' % (
str(len(unique_metrics)), str(len(metrics_sparsity_decreasing))))
logger.info('metrics_manager :: check_data_sparsity - of the %s unique_metrics, %s metrics are stale' % (
str(len(unique_metrics)), str(len(metrics_stale))))
logger.info('metrics_manager :: check_data_sparsity - of the %s unique_metrics, %s metrics are inactive' % (
str(len(unique_metrics)), str(len(metrics_inactive))))
check_data_sparsity_time = time() - check_data_sparsity_start
logger.info('metrics_manager :: check data sparsity took %.2f seconds' % check_data_sparsity_time)
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
# Also determine resolution of metrics that were not checked for data
# sparsity
unique_metrics_set = set(list(unique_metrics))
check_metrics_set = set(check_metrics)
set_difference = unique_metrics_set.difference(check_metrics_set)
unchecked_resolution_metrics = list(set_difference)
logger.info('metrics_manager :: update_metrics_resolutions_key - determining resolution of %s metric which were skipped from sparsity check' % (
str(len(unchecked_resolution_metrics))))
unchecked_raw_assigned = None
if unchecked_resolution_metrics:
try:
unchecked_raw_assigned = self.redis_conn.mget(unchecked_resolution_metrics)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get unchecked_resolution_metrics from Redis - %s' % e)
unchecked_raw_assigned = []
for i, metric_name in enumerate(unchecked_resolution_metrics):
try:
try:
raw_series = unchecked_raw_assigned[i]
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
except:
timeseries = []
metric_name = str(metric_name)
if metric_name.startswith(settings.FULL_NAMESPACE):
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric_name
metric_resolution = 0
timestamp_resolutions_count = {}
try:
metric_resolution, timestamp_resolutions_count = determine_data_frequency(skyline_app, timeseries, False)
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: unchecked_resolution_metrics - determine_data_frequency failed for %s - %s' % (
base_name, e))
check_sparsity_error_log = True
update_metric_timestamp_resolutions_key = False
metric_timestamp_resolutions_count_str = None
try:
metric_timestamp_resolutions_count_str = metrics_timestamp_resolutions_count_dict[metric_name]
except KeyError:
metric_timestamp_resolutions_count_str = None
update_metric_timestamp_resolutions_key = True
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: unchecked_resolution_metrics - failed to get entry from metrics_timestamp_resolutions_count_dict for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
metric_timestamp_resolutions_count_str = None
update_metric_timestamp_resolutions_key = True
if metric_timestamp_resolutions_count_str:
if metric_timestamp_resolutions_count_str != str(dict(timestamp_resolutions_count)):
update_metric_timestamp_resolutions_key = True
if update_metric_timestamp_resolutions_key:
try:
self.redis_conn.hset(
'analyzer.metrics_manager.hash_key.metrics_timestamp_resolutions',
metric_name, str(dict(timestamp_resolutions_count)))
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.hash_key.metrics_timestamp_resolutions for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
update_metrics_resolutions_key = True
last_known_metric_resolution = None
if metric_resolution and metrics_resolutions_dict:
try:
last_known_metric_resolution = metrics_resolutions_dict[base_name]
except KeyError:
update_metrics_resolutions_key = True
last_known_metric_resolution = None
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: update_metrics_resolutions_key - failed to determine metric resolution for %s - %s' % (
base_name, e))
check_sparsity_error_log = True
last_known_metric_resolution = None
if last_known_metric_resolution:
if int(float(last_known_metric_resolution)) == metric_resolution:
update_metrics_resolutions_key = False
# @added 20210621 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
if not metric_resolution:
if update_metrics_resolutions_key:
update_metrics_resolutions_key = False
metrics_with_unknown_resolution.append(base_name)
if update_metrics_resolutions_key:
if last_known_metric_resolution is not None:
logger.info('metrics_manager :: update_metrics_resolutions_key - updating %s resolution from %s to %s' % (
base_name, str(last_known_metric_resolution),
str(metric_resolution)))
updated_metric_resolution_keys += 1
else:
added_metric_resolution_keys += 1
try:
self.redis_conn.hset(
metrics_resolutions_hash_key,
base_name, int(metric_resolution))
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add entry in analyzer.metrics_manager.hash_key.metrics_data_sparsity for - %s - %s' % (
str(metric_name), e))
check_sparsity_error_log = True
except Exception as e:
if not check_sparsity_error_log:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed unchecked_resolution_metrics - %s' % e)
check_sparsity_error_log = True
logger.info('metrics_manager :: update_metrics_resolutions_key - added %s metric resolutions to %s Redis hash key' % (
str(added_metric_resolution_keys), metrics_resolutions_hash_key))
logger.info('metrics_manager :: update_metrics_resolutions_key - updated %s metric resolution in %s Redis hash key' % (
str(updated_metric_resolution_keys), metrics_resolutions_hash_key))
# @added 20210621 - Feature #4148: analyzer.metrics_manager.resolutions
# Bug #4146: check_data_sparsity - incorrect on low fidelity and inconsistent metrics
logger.info('metrics_manager :: update_metrics_resolutions_key - %s metrics have unknown resolution' % (
str(len(metrics_with_unknown_resolution))))
# @added 20201212 - Feature #3884: ANALYZER_CHECK_LAST_TIMESTAMP
if ANALYZER_CHECK_LAST_TIMESTAMP:
try:
metrics_last_timestamp_dict = self.redis_conn_decoded.hgetall(metrics_last_timestamp_hash_key)
if metrics_last_timestamp_dict:
logger.info('metrics_manager :: ANALYZER_CHECK_LAST_TIMESTAMP - got %s metrics and last analysed timestamps from %s Redis hash key to manage' % (
str(len(metrics_last_timestamp_dict)),
metrics_last_timestamp_hash_key))
else:
logger.warning('warning :: ANALYZER_CHECK_LAST_TIMESTAMP enabled but got no data from the %s Redis hash key' % (
metrics_last_timestamp_hash_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get Redis hash key %s' % (
metrics_last_timestamp_hash_key))
metrics_last_timestamp_dict = {}
removed_old_keys = 0
if metrics_last_timestamp_dict:
for metric in metrics_last_timestamp_dict:
try:
last_analyzed_timestamp = int(metrics_last_timestamp_dict[metric])
except:
last_analyzed_timestamp = None
if last_analyzed_timestamp:
try:
if last_analyzed_timestamp < (int(spin_start) - settings.FULL_DURATION):
self.redis_conn.hdel(metrics_last_timestamp_hash_key, metric)
removed_old_keys += 1
except Exception as e:
logger.error('error :: metrics_manager :: ANALYZER_CHECK_LAST_TIMESTAMP could not remove %s from Redis %s: %s' % (
str(metric), metrics_last_timestamp_hash_key, e))
logger.info('metrics_manager :: ANALYZER_CHECK_LAST_TIMESTAMP - removed %s inactive metrics from the %s Redis hash key' % (
str(removed_old_keys), metrics_last_timestamp_hash_key))
# @added 20210330 - Feature #3994: Panorama - mirage not anomalous
# The mirage.panorama.not_anomalous_metrics is managed here and entries
# older than 7 days are removed.
redis_hash = 'mirage.panorama.not_anomalous_metrics'
mirage_panorama_not_anomalous = {}
try:
mirage_panorama_not_anomalous = self.redis_conn_decoded.hgetall(redis_hash)
logger.info('metrics_manager :: %s entries to check in the %s Redis hash key' % (
str(len(mirage_panorama_not_anomalous)), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key %s' % redis_hash)
mirage_panorama_not_anomalous = {}
timestamp_floats = []
if mirage_panorama_not_anomalous:
timestamp_floats = list(mirage_panorama_not_anomalous.keys())
timestamp_floats_to_remove = []
if timestamp_floats:
try:
timestamp_week_ago = int(time()) - (86400 - 7)
for timestamp_float in timestamp_floats:
if int(float(timestamp_float)) < timestamp_week_ago:
timestamp_floats_to_remove.append(timestamp_float)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine entries to remove from Redis hash key %s' % (
redis_hash))
if timestamp_floats_to_remove:
try:
self.redis_conn.hdel(redis_hash, *set(timestamp_floats_to_remove))
logger.info('metrics_manager :: %s entries were removed from Redis hash %s' % (
str(len(set(timestamp_floats_to_remove))), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to %s remove entries from Redis hash key %s' % (
str(len(timestamp_floats_to_remove)), redis_hash))
else:
logger.info('metrics_manager :: there are no entries that need to be removed from Redis hash %s' % (
redis_hash))
# @added 20210429 - Feature #3994: Panorama - mirage not anomalous
# The ionosphere.panorama.not_anomalous_metrics is managed here and entries
# older than 7 days are removed.
redis_hash = 'ionosphere.panorama.not_anomalous_metrics'
ionosphere_panorama_not_anomalous = {}
try:
ionosphere_panorama_not_anomalous = self.redis_conn_decoded.hgetall(redis_hash)
logger.info('metrics_manager :: %s entries to check in the %s Redis hash key' % (
str(len(ionosphere_panorama_not_anomalous)), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key %s' % redis_hash)
ionosphere_panorama_not_anomalous = {}
ionosphere_timestamp_floats = []
if ionosphere_panorama_not_anomalous:
ionosphere_timestamp_floats = list(ionosphere_panorama_not_anomalous.keys())
ionosphere_timestamp_floats_to_remove = []
if ionosphere_timestamp_floats:
try:
timestamp_week_ago = int(time()) - (86400 - 7)
for timestamp_float in ionosphere_timestamp_floats:
if int(float(timestamp_float)) < timestamp_week_ago:
ionosphere_timestamp_floats_to_remove.append(timestamp_float)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine entries to remove from Redis hash key %s' % (
redis_hash))
if ionosphere_timestamp_floats_to_remove:
try:
self.redis_conn.hdel(redis_hash, *set(ionosphere_timestamp_floats_to_remove))
logger.info('metrics_manager :: %s entries were removed from Redis hash %s' % (
str(len(set(ionosphere_timestamp_floats_to_remove))), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to %s remove entries from Redis hash key %s' % (
str(len(timestamp_floats_to_remove)), redis_hash))
else:
logger.info('metrics_manager :: there are no entries that need to be removed from Redis hash %s' % (
redis_hash))
# @added 20210330 - Feature #3994: Panorama - mirage not anomalous
# The panorama.not_anomalous_plots are managed here and entries
# older than 7 days are removed and files deleted
redis_hash = 'panorama.not_anomalous_plots'
panorama_not_anomalous_plots = {}
try:
panorama_not_anomalous_plots = self.redis_conn_decoded.hgetall(redis_hash)
logger.info('metrics_manager :: %s entries to check in the %s Redis hash key' % (
str(len(panorama_not_anomalous_plots)), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key %s' % redis_hash)
panorama_not_anomalous_plots = {}
timestamp_floats = []
if panorama_not_anomalous_plots:
timestamp_floats = list(panorama_not_anomalous_plots.keys())
timestamp_floats_to_remove = []
if timestamp_floats:
try:
timestamp_week_ago = int(time()) - (86400 - 7)
for timestamp_float in timestamp_floats:
if int(float(timestamp_float)) < timestamp_week_ago:
timestamp_floats_to_remove.append(timestamp_float)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine entries to remove from Redis hash key %s' % (
redis_hash))
if timestamp_floats_to_remove:
# Remove plot files
for timestamp_float in timestamp_floats_to_remove:
file_to_remove = None
try:
file_to_remove = self.redis_conn_decoded.hget(redis_hash, timestamp_float)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine file to remove from %s from Redis hash key %s' % (
str(timestamp_float), redis_hash))
if file_to_remove:
try:
if os.path.exists(file_to_remove):
os.remove(file_to_remove)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to file for remove entries from Redis hash key %s' % (
redis_hash))
# Remove entries from the hash
try:
self.redis_conn.hdel(redis_hash, *set(timestamp_floats_to_remove))
logger.info('metrics_manager :: %s entries were removed from Redis hash %s' % (
str(len(set(timestamp_floats_to_remove))), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to %s remove entries from Redis hash key %s' % (
str(len(timestamp_floats_to_remove)), redis_hash))
else:
logger.info('metrics_manager :: there are no entries that need to be removed from Redis hash %s' % (
redis_hash))
# @added 20210803 - Feature #4164: luminosity - cloudbursts
# Manage the luminosity.cloudbursts.anomalies_processed Redis hash and
# remove keys that are older than 24 hours
redis_hash = 'luminosity.cloudbursts.anomalies_processed'
remove_before = int(time()) - 86400
luminosity_cloudburst_anomalies_processed = {}
try:
luminosity_cloudburst_anomalies_processed = self.redis_conn_decoded.hgetall(redis_hash)
logger.info('metrics_manager :: %s entries to check in the %s Redis hash key' % (
str(len(luminosity_cloudburst_anomalies_processed)), redis_hash))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get Redis hash key %s' % redis_hash)
luminosity_cloudburst_anomalies_processed = {}
remove_keys = []
for key in list(luminosity_cloudburst_anomalies_processed.keys()):
try:
if luminosity_cloudburst_anomalies_processed[key] < remove_before:
remove_keys.append(key)
except Exception as e:
logger.error('error :: metrics_manager :: failed to determine timestamp from %s entry from Redis hash key %s - %s' % (
key, redis_hash, e))
if remove_keys:
# Remove entries from the hash
try:
self.redis_conn.hdel(redis_hash, *set(remove_keys))
logger.info('metrics_manager :: %s entries were removed from Redis hash %s' % (
str(len(set(remove_keys))), redis_hash))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to %s remove entries from Redis hash key %s - %s' % (
str(len(remove_keys)), redis_hash, e))
else:
logger.info('metrics_manager :: there are no entries or files that need to be removed from Redis hash %s' % (
redis_hash))
# @added 20210825 - Feature #4164: luminosity - cloudbursts
metric_names_with_ids = {}
# @added 20211004 - Feature #4264: luminosity - cross_correlation_relationships
ids_with_metric_names = {}
# @added 20210430 - Task #4030: refactoring
metric_names = []
try:
# @modified 20210825 - Feature #4164: luminosity - cloudbursts
# metric_names = get_all_db_metric_names(skyline_app)
with_ids = True
# @modified 20220302 - Feature #4444: webapp - inactive_metrics
# Feature #3828: Add inactive columns to the metrics DB table
# Feature #4468: flux - remove_namespace_quota_metrics
# Use the method to exclude metrics that have been set to inactive
# metric_names, metric_names_with_ids = get_all_db_metric_names(skyline_app, with_ids)
metric_names, metric_names_with_ids = get_all_active_db_metric_names(skyline_app, with_ids)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to get_all_active_db_metric_names - %s' % (
err))
if metric_names:
try:
self.redis_conn.rename('analyzer.metrics_manager.db.metric_names', 'aet.analyzer.metrics_manager.db.metric_names')
logger.info('metrics_manager :: created the aet.analyzer.metrics_manager.db.metric_names Redis set')
except Exception as e:
logger.error('metrics_manager :: failed to created the aet.analyzer.metrics_manager.db.metrics_fully_populated Redis set - %s' % e)
try:
self.redis_conn.sadd('analyzer.metrics_manager.db.metric_names', *set(metric_names))
logger.info('metrics_manager :: created and added %s metrics to the analyzer.metrics_manager.db.metric_names Redis set' % str(len(metric_names)))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the analyzer.metrics_manager.db.metrics_fully_populated Redis set - %s' % e)
# @added 20211011 - Feature #4278: luminosity - related_namespaces
# Create and manage the metrics_manager.namespaces Redis hash but only
# update it if new data
if not metric_names_with_ids:
logger.error('error :: metrics_manager :: metric_names_with_ids is empty')
if metric_names_with_ids:
update_namespaces = False
logger.info('metrics_manager :: %s metric_names_with_ids retrieved from DB' % str(len(metric_names_with_ids)))
aet_metric_names_with_ids = {}
try:
aet_metric_names_with_ids = self.redis_conn_decoded.hgetall('aet.metrics_manager.metric_names_with_ids')
logger.info('metrics_manager :: got %s last aet.metrics_manager.metric_names_with_ids from Redis to check namespaces' % str(len(aet_metric_names_with_ids)))
except Exception as err:
logger.error('metrics_manager :: failed to hgetall aet.metrics_manager.metric_names_with_ids to check namespace - %s' % err)
if len(aet_metric_names_with_ids) == 0 and metric_names_with_ids:
logger.info('metrics_manager :: aet.metrics_manager.metric_names_with_ids from Redis is empty so using metric_names_with_ids to check namespaces')
aet_metric_names_with_ids = dict(metric_names_with_ids)
namespaces = {}
try:
namespaces = self.redis_conn_decoded.hgetall('metrics_manager.namespaces')
logger.info('metrics_manager :: got %s metrics_manager.namespaces from Redis to check namespaces' % str(len(namespaces)))
except Exception as err:
logger.error('metrics_manager :: failed to hgetall metrics_manager.namespaces to check namespaces - %s' % err)
if not namespaces:
update_namespaces = True
logger.info('metrics_manager :: no known namespaces updating namespaces')
if aet_metric_names_with_ids:
last_metric_names = list(set(list(aet_metric_names_with_ids.keys())))
if metric_names:
current_metric_names = list(set(list(aet_metric_names_with_ids.keys())))
if current_metric_names != last_metric_names:
update_namespaces = True
logger.info('metrics_manager :: metrics changes updating namespaces')
if not update_namespaces:
logger.info('metrics_manager :: no changes to metrics not updating namespaces')
if update_namespaces:
logger.info('metrics_manager :: updating namespaces')
namespaces_list = []
for base_name in current_metric_names:
namespace_elements = base_name.split('.')
namespaces_list.append('.'.join(namespace_elements[0:-1]))
namespaces = {}
for namespace in namespaces_list:
if namespace == '':
continue
namespace_element_count = 0
for r_base_name in current_metric_names:
if namespace in r_base_name:
namespace_element_count += 1
if namespace_element_count:
namespaces[namespace] = namespace_element_count
sorted_namespaces = {}
for namespace in sorted(list(namespaces.keys())):
sorted_namespaces[namespace] = namespaces[namespace]
if sorted_namespaces:
try:
self.redis_conn.hset('metrics_manager.namespaces', mapping=sorted_namespaces)
logger.info('metrics_manager :: set %s namespaces in metrics_manager.namespaces Redis hash' % str(len(list(sorted_namespaces.keys()))))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create metrics_manager.namespaces Redis hash - %s' % str(err))
if metric_names_with_ids:
try:
self.redis_conn.rename('metrics_manager.metric_names_with_ids', 'aet.metrics_manager.metric_names_with_ids')
logger.info('metrics_manager :: created the aet.metrics_manager.metric_names_with_ids Redis set')
except Exception as err:
logger.error('metrics_manager :: failed to created the aet.metrics_manager.metric_names_with_ids Redis set - %s' % err)
try:
self.redis_conn.hset('metrics_manager.metric_names_with_ids', mapping=metric_names_with_ids)
logger.info('metrics_manager :: created and added %s metrics to the metrics_manager.metric_names_with_ids Redis hash' % str(len(metric_names_with_ids)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create metrics_manager.metric_names_with_ids Redis hash - %s' % err)
# @added 20211004 - Feature #4264: luminosity - cross_correlation_relationships
ids_with_metric_names = {}
for c_metric_name in list(metric_names_with_ids.keys()):
c_metric_id = int(str(metric_names_with_ids[c_metric_name]))
ids_with_metric_names[c_metric_id] = c_metric_name
try:
self.redis_conn.rename('metrics_manager.ids_with_metric_names', 'aet.metrics_manager.ids_with_metric_names')
logger.info('metrics_manager :: created the aet.metrics_manager.ids_with_metric_names Redis set')
except Exception as err:
logger.error('metrics_manager :: failed to created the aet.metrics_manager.ids_with_metric_names Redis set - %s' % err)
try:
self.redis_conn.hset('metrics_manager.ids_with_metric_names', mapping=ids_with_metric_names)
logger.info('metrics_manager :: created and added %s metrics to the metrics_manager.ids_with_metric_names Redis hash' % str(len(ids_with_metric_names)))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create metrics_manager.ids_with_metric_names Redis hash - %s' % err)
# @added 20211012 - Feature #4280: aet.metrics_manager.derivative_metrics Redis hash
current_derivative_metrics = []
try:
current_derivative_metrics = list(self.redis_conn_decoded.smembers('derivative_metrics'))
except Exception as err:
logger.error('error :: metrics_manager :: failed to get derivative_metrics Redis set - %s' % str(err))
# @added 20220323 - Feature #4502: settings - MONOTONIC_METRIC_NAMESPACES
logger.info('metrics_manager :: determining what metrics match metric namespaces declared as monotonic')
settings_monotonic_metric_namespaces = []
try:
settings_monotonic_metric_namespaces = list(settings.MONOTONIC_METRIC_NAMESPACES)
except AttributeError:
settings_monotonic_metric_namespaces = []
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to determine settings.MONOTONIC_METRIC_NAMESPACES - %s' % str(err))
settings_monotonic_metric_namespaces = []
if external_settings:
for config_id in list(external_settings):
external_settings_monotonic_namespaces = []
try:
namespace = external_settings[config_id]['namespace']
external_settings_monotonic_namespaces = list(external_settings[config_id]['monotonic_namespaces'])
except KeyError:
continue
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: determining monotonic_metric_namespaces from external_setting - %s, err: %s' % (
str(config_id), err))
if external_settings_monotonic_namespaces:
for external_settings_monotonic_namespace in external_settings_monotonic_namespaces:
try:
monotonic_namespace = '%s.%s' % (namespace, external_settings_monotonic_namespace)
settings_monotonic_metric_namespaces.append(monotonic_namespace)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add %s.%s to settings_monotonic_metric_namespaces - %s' % (
str(namespace), str(external_settings_monotonic_namespace), err))
always_derivative_metrics = []
try:
always_derivative_metrics = list(self.redis_conn_decoded.smembers('metrics_manager.always_derivative_metrics'))
except Exception as err:
logger.error('metrics_manager :: failed to get metrics_manager.always_derivative_metrics Redis set - %s' % str(err))
try:
non_derivative_monotonic_metrics = list(settings.NON_DERIVATIVE_MONOTONIC_METRICS)
except:
non_derivative_monotonic_metrics = []
# If a metric is prune out of unique_metrics by roomba remove to from
# the always_derivative_metrics set as well
remove_from_always_derivative_metrics = []
for metric in always_derivative_metrics:
if metric not in unique_metrics:
remove_from_always_derivative_metrics.append(metric)
if metric in non_derivative_monotonic_metrics:
remove_from_always_derivative_metrics.append(metric)
if remove_from_always_derivative_metrics:
try:
self.redis_conn_decoded.srem('metrics_manager.always_derivative_metrics', *set(remove_from_always_derivative_metrics))
except Exception as err:
logger.error('metrics_manager :: failed to remove metrics from metrics_manager.always_derivative_metrics Redis set - %s' % str(err))
for metric in remove_from_always_derivative_metrics:
try:
always_derivative_metrics.remove(metric)
except:
pass
for metric in unique_metrics:
if metric in non_derivative_monotonic_metrics:
if metric in current_derivative_metrics:
current_derivative_metrics.remove(metric)
continue
if metric in current_derivative_metrics:
always_derivative_metrics.append(metric)
continue
if metric in always_derivative_metrics:
current_derivative_metrics.append(metric)
continue
pattern_match, metric_matched_by = matched_or_regexed_in_list('analyzer', metric, settings_monotonic_metric_namespaces)
if pattern_match:
current_derivative_metrics.append(metric)
always_derivative_metrics.append(metric)
always_derivative_metrics = list(set(always_derivative_metrics))
try:
self.redis_conn_decoded.sadd('metrics_manager.always_derivative_metrics', *set(always_derivative_metrics))
except Exception as err:
logger.error('metrics_manager :: failed to set metrics_manager.always_derivative_metrics Redis set - %s' % str(err))
current_derivative_metrics = list(set(current_derivative_metrics))
logger.info('metrics_manager :: determined %s metrics which match namespaces declared as monotonic' % str(len(always_derivative_metrics)))
add_derivative_metrics = {}
derivative_metric_timestamp = int(time())
for derivative_metric in current_derivative_metrics:
add_derivative_metrics[derivative_metric] = derivative_metric_timestamp
if add_derivative_metrics:
try:
self.redis_conn.hset('aet.metrics_manager.derivative_metrics.timestamps', mapping=add_derivative_metrics)
logger.info('metrics_manager :: set %s namespaces in aet.metrics_manager.derivative_metrics.timestamps Redis hash' % str(len(list(add_derivative_metrics.keys()))))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to create aet.metrics_manager.derivative_metrics.timestamps Redis hash - %s' % str(err))
aet_metrics_manager_derivative_metrics_timestamps = {}
try:
aet_metrics_manager_derivative_metrics_timestamps = self.redis_conn_decoded.hgetall('aet.metrics_manager.derivative_metrics.timestamps')
logger.info('metrics_manager :: got aet.metrics_manager.derivative_metrics.timestamps from Redis to check derivative_metric timestamps')
except Exception as err:
logger.error('metrics_manager :: failed to hgetall aet.metrics_manager.derivative_metrics.timestamps to check derivative_metric timestamps - %s' % str(err))
derivative_metrics_to_remove = []
new_current_derivative_metrics = []
remove_derivative_metric_older_than = derivative_metric_timestamp - 3600
if aet_metrics_manager_derivative_metrics_timestamps:
for derivative_metric in list(aet_metrics_manager_derivative_metrics_timestamps.keys()):
# @added 20220225 - Feature #4468: flux - remove_namespace_quota_metrics
# Feature #4464: flux - quota - cluster_sync
if 'skyline_set_as_of_' in derivative_metric:
derivative_metrics_to_remove.append(derivative_metric)
continue
try:
derivative_metric_last_updated = int(str(aet_metrics_manager_derivative_metrics_timestamps[derivative_metric]))
if derivative_metric_last_updated < remove_derivative_metric_older_than:
derivative_metrics_to_remove.append(derivative_metric)
else:
new_current_derivative_metrics.append(derivative_metric)
except Exception as err:
logger.error('metrics_manager :: failed to determine derivative_metric_last_updated - %s' % str(err))
if not derivative_metrics_to_remove:
logger.info('metrics_manager :: not removing any metrics from aet.metrics_manager.derivative_metrics.timestamps Redis hash as all have been updated in last hour')
if derivative_metrics_to_remove:
logger.info('metrics_manager :: removing %s metrics from aet.metrics_manager.derivative_metrics.timestamps Redis hash as older than 1 hour' % str(len(derivative_metrics_to_remove)))
try:
self.redis_conn.hdel('aet.metrics_manager.derivative_metrics.timestamps', *set(derivative_metrics_to_remove))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to remove %s metrics from the Redis hash key aet.metrics_manager.derivative_metrics.timestamps - %s' % (
str(len(derivative_metrics_to_remove)), str(err)))
logger.info('metrics_manager :: removing %s metrics from aet.metrics_manager.derivative_metrics Redis set as older than 1 hour' % str(len(derivative_metrics_to_remove)))
try:
self.redis_conn.srem('aet.metrics_manager.derivative_metrics', *set(derivative_metrics_to_remove))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to remove %s metrics from the Redis hash key aet.metrics_manager.derivative_metrics - %s' % (
str(len(derivative_metrics_to_remove)), str(err)))
if new_current_derivative_metrics:
logger.info('metrics_manager :: adding %s metrics to aet.metrics_manager.derivative_metrics Redis set' % str(len(new_current_derivative_metrics)))
try:
self.redis_conn.sadd('aet.metrics_manager.derivative_metrics', *set(new_current_derivative_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to add multiple members to the aet.metrics_manager.derivative_metrics Redis set')
# @added 20210624 - Feature #4150: metrics_manager - roomba batch processing metrics
# Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# When there are batch processing metrics and ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# is enabled analyzer_batch only roombas metrics when they are received.
# If batch metrics stop, they never get roomba'ed again and will remain
# in Redis and never get purged.
# Once an hour make metrics_manager just remove any batch processing
# metrics that are older than FULL_DURATION + (FULL_DURATION / 2)
if settings.BATCH_PROCESSING:
inactive_batch_metrics_timestamp = int(time()) - (settings.FULL_DURATION + (settings.FULL_DURATION / 2))
roomba_cleaned = False
roomba_cache_key = 'analyzer.metrics_manager.roomba.batch_metrics'
try:
roomba_cleaned = self.redis_conn.get(roomba_cache_key)
if roomba_cleaned:
logger.info('metrics_manager :: roomba batch_processing_metrics Redis key %s exists, nothing to do' % (
roomba_cache_key))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: roomba batch_processing_metrics failed to get Redis key %s - %s' % (
roomba_cache_key, e))
batch_processing_base_names = []
if not roomba_cleaned:
try:
batch_processing_base_names = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: roomba batch_processing_metrics failed to get Redis set aet.analyzer.batch_processing_metrics - %s' % (
e))
batch_processing_metrics = []
if batch_processing_base_names:
for base_name in batch_processing_base_names:
metric_name = '%s%s' % (settings.FULL_NAMESPACE, base_name)
batch_processing_metrics.append(metric_name)
raw_assigned = None
if batch_processing_metrics:
logger.info('metrics_manager :: roomba checking %s batch_processing_metrics' % (
str(len(batch_processing_metrics))))
try:
raw_assigned = self.redis_conn.mget(batch_processing_metrics)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: roomba batch_processing_metrics failed to get raw_assigned from Redis - %s' % e)
roomba_removed = []
if raw_assigned:
# Distill timeseries strings into lists
for i, metric_name in enumerate(batch_processing_metrics):
timeseries = []
try:
raw_series = raw_assigned[i]
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
except Exception as e:
logger.error('error :: metrics_manager :: roomba batch_processing_metrics failed to unpack %s timeseries - %s' % (
str(metric_name), e))
timeseries = []
# To ensure that there are no unordered timestamps in the time
# series which are artefacts of the collector or carbon-relay, sort
# all time series by timestamp before analysis.
original_timeseries = timeseries
if original_timeseries:
try:
timeseries = sort_timeseries(original_timeseries)
del original_timeseries
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: roomba batch_processing_metrics failed to determine whther to remove %s timeseries - %s' % (
metric_name, e))
remove_timeseries = False
if timeseries:
try:
last_timeseries_timestamp = int(timeseries[-1][0])
if last_timeseries_timestamp < inactive_batch_metrics_timestamp:
remove_timeseries = True
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: roomba batch_processing_metrics failed to determine whther to remove %s timeseries - %s' % (
metric_name, e))
if remove_timeseries:
try:
self.redis_conn.delete(metric_name)
logger.info('metrics_manager :: roomba removed batch processing metric %s' % metric_name)
roomba_removed.append(metric_name)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: roomba falied to remove batch processing metric %s - %s' % (
metric_name, e))
# TODO - set inactive
if not roomba_cleaned:
try:
self.redis_conn.setex(roomba_cache_key, 3600, str(roomba_removed))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to set Redis key %s - %s' % (
roomba_cache_key, e))
# @added 20220128 - Feature #4404: flux - external_settings - aggregation
# Feature #4324: flux - reload external_settings
# Feature #4376: webapp - update_external_settings
if do_reload_flux:
try:
self.redis_conn.delete('skyline.external_settings.update.metrics_manager')
logger.info('metrics_manager ::: deleted skyline.external_settings.update.metrics_manager from Redis')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager ::: failed to delete skyline.external_settings.update.metrics_manager from Redis, err: %s' % err)
flux_pid_file = '%s/flux.pid' % settings.PID_PATH
if os.path.isfile(flux_pid_file):
logger.info('metrics_manager :: initiating reload_flux')
try:
flux_pids = reload_flux(skyline_app, flux_pid_file)
if flux_pids:
logger.info('metrics_manager :: reload_flux reports %s flux pids' % str(len(flux_pids)))
except Exception as err:
logger.error('error :: metrics_manager :: reload_flux error - %s' % err)
try:
self.redis_conn.delete('skyline.external_settings.update.flux')
logger.info('metrics_manager ::: deleted skyline.external_settings.update.flux from Redis')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager ::: failed to delete skyline.external_settings.update.flux from Redis, err: %s' % err)
spin_end = time() - spin_start
# @added 20210619 - Feature #4148: analyzer.metrics_manager.resolutions
try:
self.redis_conn.set(
'analyzer.metrics_manager.run_time', spin_end)
except Exception as e:
logger.error('error :: metrics_manager :: could not set Redis analyzer.metrics_manager.run_time: %s' % e)
send_metric_name = '%s.metrics_manager_run_time' % skyline_app_graphite_namespace
try:
send_graphite_metric(skyline_app, send_metric_name, str(spin_end))
logger.info('metrics_manager :: sent Graphite metric - %s %s' % (send_metric_name, str(spin_end)))
except Exception as e:
logger.error('error :: metrics_manager :: could not send send_graphite_metric %s %s: %s' % (
send_metric_name, str(spin_end), e))
logger.info('metrics_manager :: metric_management_process took %.2f seconds' % spin_end)
return
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up
- Spawn a process to manage metrics lists and Redis sets
- Wait for the process to finish.
- Log the details about the run to the skyline analyzer log.
- Send skyline.analyzer.metrics_manager metrics to `GRAPHITE_HOST`
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('metrics_manager :: starting %s metrics_manager' % skyline_app)
# @added 20190417 - Feature #2950: Report defaulted settings to log
# Added all the globally declared settings to enable reporting in the
# log the state of each setting.
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except Exception as e:
SERVER_METRIC_PATH = ''
logger.warning('warning :: metrics_manager :: settings.SERVER_METRICS_NAME is not declared in settings.py, defaults to \'\' - %s' % e)
try:
ANALYZER_ENABLED = settings.ANALYZER_ENABLED
logger.info('metrics_manager :: ANALYZER_ENABLED is set to %s' % str(ANALYZER_ENABLED))
except Exception as e:
ANALYZER_ENABLED = True
logger.info('warning :: metrics_manager :: ANALYZER_ENABLED is not declared in settings.py, defaults to True - %s' % e)
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager cannot connect to redis at socket path %s - %s' % (
settings.REDIS_SOCKET_PATH, e))
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager cannot connect to get_redis_conn - %s' % e)
continue
# Report app up
try:
self.redis_conn.setex('analyzer.metrics_manager', 120, now)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: could not update the Redis analyzer.metrics_manager key')
# Discover unique metrics
unique_metrics_count = 0
try:
raw_unique_metrics_count = self.redis_conn_decoded.scard(full_uniques)
unique_metrics_count = int(raw_unique_metrics_count)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager ::: could not get the count of %s from Redis' % full_uniques)
sleep(10)
continue
if unique_metrics_count == 0:
logger.info('metrics_manager :: no metrics in redis. try adding some - see README')
sleep(10)
continue
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
try:
p = Process(target=self.metric_management_process, args=(0,))
pids.append(p)
pid_count += 1
logger.info('metrics_manager :: starting metric_management_process')
p.start()
spawned_pids.append(p.pid)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to spawn process')
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any metric_management_process has run
# for longer than 180 seconds - 20160512 @earthgecko
p_starts = time()
# TESTING p.join removal
# while time() - p_starts <= 1:
while time() - p_starts <= RUN_EVERY:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('metrics_manager :: metric_management_process completed in %.2f seconds' % (time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('metrics_manager :: timed out, killing metric_management_process process')
for p in pids:
logger.info('metrics_manager :: killing metric_management_process process')
p.terminate()
# p.join()
logger.info('metrics_manager :: killed metric_management_process process')
for p in pids:
if p.is_alive():
try:
logger.info('metrics_manager :: stopping metric_management_process - %s' % (str(p.is_alive())))
# p.join()
p.terminate()
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to stop spawn process')
process_runtime = time() - now
if process_runtime < RUN_EVERY:
sleep_for = (RUN_EVERY - process_runtime)
# @added 20201213 - Feature #3890: metrics_manager - sync_cluster_files
# If this is a clustered Skyline instance and SYNC_CLUSTER_FILES
# is enabled run the sync_cluster_files process every minute
run_sync_cluster_files = False
if REMOTE_SKYLINE_INSTANCES:
if SYNC_CLUSTER_FILES:
run_sync_cluster_files = True
if FAKE_CLUSTER_SYNC:
run_sync_cluster_files = True
logger.info('metrics_manager :: run_sync_cluster_files as FAKE_CLUSTER_SYNC')
logger.info('metrics_manager :: run_sync_cluster_files is set to %s' % str(run_sync_cluster_files))
if run_sync_cluster_files:
logger.info('metrics_manager :: running sync_cluster_files')
try:
sync_until = now + (sleep_for - 60)
sync_time = time()
last_sync_cluster_time = None
while sync_time < sync_until:
if last_sync_cluster_time:
check_sync_time = time()
sync_sleep_for = 60 - (check_sync_time - last_sync_cluster_time)
# @ modified 20220224 - Feature #4464: flux - quota - cluster_sync
# ValueError: sleep length must be non-negative
# if sync_sleep_for:
if sync_sleep_for > 0:
logger.info('metrics_manager :: sleeping for %.2f seconds between sync_cluster_files runs' % sync_sleep_for)
sleep(sync_sleep_for)
last_sync_cluster_time = time()
# Spawn sync_cluster_files
pids = []
spawned_pids = []
pid_count = 0
try:
p = Process(target=self.sync_cluster_files, args=(0,))
pids.append(p)
pid_count += 1
logger.info('metrics_manager :: starting sync_cluster_files')
p.start()
spawned_pids.append(p.pid)
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to spawn sync_cluster_files process')
p_starts = time()
while time() - p_starts <= 60:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('metrics_manager :: sync_cluster_files completed in %.2f seconds' % (time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('metrics_manager :: timed out, killing sync_cluster_files process')
for p in pids:
logger.info('metrics_manager :: killing sync_cluster_files process')
p.terminate()
# p.join()
logger.info('metrics_manager :: killed sync_cluster_files process')
for p in pids:
if p.is_alive():
try:
logger.info('metrics_manager :: stopping sync_cluster_files - %s' % (str(p.is_alive())))
# p.join()
p.terminate()
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: failed to stop sync_cluster_files process')
sync_time = time()
except:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager :: sync_cluster_files spawning failed')
process_runtime_now = time() - now
sleep_for = (RUN_EVERY - process_runtime_now)
logger.info('metrics_manager :: sleeping for %.2f seconds due to low run time...' % sleep_for)
# @modified 20220128 - Feature #4404: flux - external_settings - aggregation
# Feature #4324: flux - reload external_settings
# Feature #4376: webapp - update_external_settings
# sleep(sleep_for)
sleep_start = int(time())
update_error_logged = False
while int(time()) < (sleep_start + sleep_for):
external_settings_updated = None
try:
external_settings_updated = self.redis_conn_decoded.get('skyline.external_settings.update.metrics_manager')
except Exception as err:
if not update_error_logged:
logger.error(traceback.format_exc())
logger.error('error :: metrics_manager ::: could not get skyline.external_settings.update.metrics_manager from Redis, err: %s' % err)
external_settings_updated = False
update_error_logged = True
if external_settings_updated:
logger.info('metrics_manager :: breaking sleep as skyline.external_settings.update.metrics_manager Redis was found')
break
sleep(1)
try:
del sleep_for
except:
logger.error('error :: metrics_manager :: failed to del sleep_for')
try:
del process_runtime
except:
logger.error('error :: metrics_manager :: failed to del process_runtime')