import logging
import os
from ast import literal_eval
import traceback
import settings
import skyline_version
from skyline_functions import get_redis_conn_decoded
from matched_or_regexed_in_list import matched_or_regexed_in_list
from sqlalchemy.sql import select
from database import get_engine, cloudburst_table_meta
# @added 20221103 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
from functions.metrics.get_base_name_from_labelled_metrics_name import get_base_name_from_labelled_metrics_name
skyline_version = skyline_version.__absolute_version__
skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
try:
ENABLE_WEBAPP_DEBUG = settings.ENABLE_WEBAPP_DEBUG
except EnvironmentError as outer_err:
logger.error('error :: cannot determine ENABLE_WEBAPP_DEBUG from settings - %s' % outer_err)
ENABLE_WEBAPP_DEBUG = False
this_host = str(os.uname()[1])
[docs]def get_filtered_metrics(redis_conn_decoded, namespaces):
"""
Get create a list of filter_by_metrics.
:param redis_conn_decoded: the redis_conn_decoded object
:param namespaces: the namespaces to match
:param from_timestamp: the from_timestamp
:param until_timestamp: the until_timestamp
:type redis_conn_decoded: str
:return: list of metrics
:rtype: list
"""
function_str = 'get_cloudbursts :: get_filtered_metrics'
filter_by_metrics = []
redis_key = 'analyzer.metrics_manager.db.metric_names'
unique_base_names = []
try:
unique_base_names = list(redis_conn_decoded.smembers(redis_key))
if unique_base_names:
logger.info('%s :: got %s unique_base_names' % (
function_str, str(len(unique_base_names))))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get Redis key %s - %s' % (
function_str, redis_key, err))
raise
for base_name in unique_base_names:
try:
pattern_match, metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, namespaces)
if pattern_match:
filter_by_metrics.append(base_name)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get Redis key %s - %s' % (
function_str, redis_key, err))
return filter_by_metrics
[docs]def get_metric_ids(redis_conn_decoded, filter_by_metrics):
"""
Get create a list of metric ids and dict of metric_names_with_ids.
:param redis_conn_decoded: the redis_conn_decoded object
:param namespaces: the namespaces to match
:param from_timestamp: the from_timestamp
:param until_timestamp: the until_timestamp
:type redis_conn_decoded: str
:return: (list of metrics, dict of metric_names_with_ids)
:rtype: (list, dict)
"""
function_str = 'get_cloudbursts :: get_metric_ids'
metric_ids = []
try:
metric_names_with_ids = redis_conn_decoded.hgetall('aet.metrics_manager.metric_names_with_ids')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get Redis hash aet.metrics_manager.metric_names_with_ids - %s' % (
function_str, err))
raise
for base_name in filter_by_metrics:
try:
metric_ids.append(int(metric_names_with_ids[base_name]))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to add metric id to metric_ids for %s from metric_names_with_ids - %s' % (
function_str, base_name, err))
return (metric_ids, metric_names_with_ids)
[docs]def get_cloudbursts(metric, namespaces, from_timestamp, until_timestamp):
"""
Get create a dict of all the cloudbursts.
:param metric: the name of the metric
:param namespaces: the namespaces to match
:param from_timestamp: the from_timestamp
:param until_timestamp: the until_timestamp
:type metric: str
:type namespaces: list
:type from_timestamp: int
:type until_timestamp: int
:return: dict of cloudbursts
:rtype: {}
Returns a dict of cloudbursts
{
"cloudbursts": {
<id>: {
'metric_id': <int>,
'metric': <str>,
'timestamp': <int>,
'end': <int>,
'duration': <int>,
'duration': <int>,
'from_timestamp': <int>,
'resolution': <int>,
'full_duration': <int>,
'anomaly_id': <int>,
'match_id': <int>,
'fp_id': <int>,
'layer_id': <int>,
'added_at': <int>,
},
}
}
"""
function_str = 'get_cloudbursts'
cloudbursts_dict = {}
engine = None
metric_ids = []
use_filter_by_metrics = False
filter_by_metrics = []
metric_names_with_ids = {}
ids_with_metric_names = {}
logger.info(
'get_cloudbursts - metric: %s, namespaces: %s, from_timestamp: %s, until_timestamp: %s' % (
str(metric), str(namespaces), str(from_timestamp),
str(until_timestamp)))
if metric != 'all':
# @added 20221103 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
use_metric = str(metric)
if metric.startswith('labelled_metrics.'):
use_metric = str(metric)
try:
metric = get_base_name_from_labelled_metrics_name(skyline_app, use_metric)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: get_base_name_from_labelled_metrics_name failed with %s - %s' % (
function_str, metric, err))
raise
logger.info(
'get_cloudbursts - looked up %s to metric: %s' % (
use_metric, str(metric)))
if not metric:
logger.error('error :: %s :: failed to look up metric' % function_str)
raise ValueError('failed to look up metric')
filter_by_metrics = [metric]
use_filter_by_metrics = True
if not namespaces:
namespaces = [metric]
if namespaces == ['all']:
namespaces = [metric]
try:
redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: get_redis_conn_decoded failed - %s' % (
function_str, err))
raise
filter_by_metrics = get_filtered_metrics(redis_conn_decoded, namespaces)
if namespaces:
use_filter_by_metrics = True
if metric != 'all':
try:
metric_id = int(redis_conn_decoded.hget('aet.metrics_manager.metric_names_with_ids', metric))
if metric_id:
metric_names_with_ids[metric] = metric_id
metric_ids.append(metric_id)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get %s from Redis hash aet.metrics_manager.metric_names_with_ids - %s' % (
function_str, metric, err))
raise
metric_ids, metric_names_with_ids = get_metric_ids(redis_conn_decoded, filter_by_metrics)
if len(filter_by_metrics) > 1:
use_filter_by_metrics = True
for base_name in list(metric_names_with_ids.keys()):
metric_id = int(metric_names_with_ids[base_name])
ids_with_metric_names[metric_id] = base_name
try:
engine, log_msg, trace = get_engine(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get engine - %s' % (
function_str, err))
raise
try:
cloudburst_table, log_msg, trace = cloudburst_table_meta(skyline_app, engine)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get cloudburst_table - %s' % (
function_str, err))
raise
try:
connection = engine.connect()
if use_filter_by_metrics:
stmt = select([cloudburst_table], cloudburst_table.c.metric_id.in_(metric_ids))
if from_timestamp > 0 and until_timestamp == 0:
stmt = select([cloudburst_table], cloudburst_table.c.metric_id.in_(metric_ids)).\
where(cloudburst_table.c.timestamp >= from_timestamp)
if from_timestamp == 0 and until_timestamp > 0:
stmt = select([cloudburst_table], cloudburst_table.c.metric_id.in_(metric_ids)).\
where(cloudburst_table.c.timestamp <= until_timestamp)
if from_timestamp > 0 and until_timestamp > 0:
stmt = select([cloudburst_table], cloudburst_table.c.metric_id.in_(metric_ids)).\
where(cloudburst_table.c.timestamp >= from_timestamp).\
where(cloudburst_table.c.timestamp <= until_timestamp)
else:
stmt = select([cloudburst_table])
if from_timestamp > 0 and until_timestamp == 0:
stmt = select([cloudburst_table]).\
where(cloudburst_table.c.timestamp >= from_timestamp)
if from_timestamp == 0 and until_timestamp > 0:
stmt = select([cloudburst_table]).\
where(cloudburst_table.c.timestamp <= until_timestamp)
if from_timestamp > 0 and until_timestamp > 0:
stmt = select([cloudburst_table]).\
where(cloudburst_table.c.timestamp >= from_timestamp).\
where(cloudburst_table.c.timestamp <= until_timestamp)
results = connection.execute(stmt)
for row in results:
# @modified 20230126
# Wrapped in try except
try:
cloudburst_id = row['id']
metric_id = row['metric_id']
cloudbursts_dict[cloudburst_id] = dict(row)
try:
cloudbursts_dict[cloudburst_id]['metric'] = ids_with_metric_names[metric_id]
except KeyError:
use_metric_name = 'labelled_metrics.%s' % str(metric_id)
logger.warning('warning :: %s :: failed to find metric name for metric id in ids_with_metric_names, using %s' % (
function_str, use_metric_name))
cloudbursts_dict[cloudburst_id]['metric'] = use_metric_name
except Exception as err:
logger.error('error :: %s :: failed to iterate row from cloudburst_table - %s' % (
function_str, err))
connection.close()
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get cloudburst_table - %s' % (
function_str, err))
raise
# Reorder the dict keys for the page
cloudbursts_dict_keys = []
key_ordered_cloudbursts_dict = {}
if cloudbursts_dict:
cloudburst_ids = list(cloudbursts_dict.keys())
first_cloudburst_id = cloudburst_ids[0]
for key in list(cloudbursts_dict[first_cloudburst_id].keys()):
cloudbursts_dict_keys.append(key)
for cloudburst_id in cloudburst_ids:
key_ordered_cloudbursts_dict[cloudburst_id] = {}
for key in cloudbursts_dict[cloudburst_id]:
if key == 'id':
key_ordered_cloudbursts_dict[cloudburst_id][key] = cloudbursts_dict[cloudburst_id][key]
key_ordered_cloudbursts_dict[cloudburst_id]['metric'] = cloudbursts_dict[cloudburst_id]['metric']
for key in cloudbursts_dict[cloudburst_id]:
if key not in ['id', 'metric']:
key_ordered_cloudbursts_dict[cloudburst_id][key] = cloudbursts_dict[cloudburst_id][key]
cloudbursts_dict = key_ordered_cloudbursts_dict
cloudburst_ids = list(cloudbursts_dict.keys())
cloudburst_ids.reverse()
desc_cloudbursts_dict = {}
for c_id in cloudburst_ids:
desc_cloudbursts_dict[c_id] = cloudbursts_dict[c_id]
cloudbursts_dict = desc_cloudbursts_dict
logger.info('%s :: found %s cloudbursts' % (
function_str, str(len(list(cloudbursts_dict.keys())))))
return cloudbursts_dict