import logging
import os
import traceback
import pandas as pd
from sqlalchemy.sql import select
from adtk.visualization import plot
import settings
from functions.database.queries.get_cloudburst_row import get_cloudburst_row
from functions.graphite.get_metrics_timeseries import get_metrics_timeseries
from database import get_engine, engine_disposal, cloudburst_table_meta
# @added 20221103 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
from functions.victoriametrics.get_victoriametrics_metric import get_victoriametrics_metric
skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
[docs]def get_cloudburst_plot(cloudburst_id, base_name, shift, all_in_period=False):
"""
Create a plot of the cloudburst and return the path and filename
:param cloudburst_id: the cloudburt id
:param base_name: the name of the metric
:param shift: the number of indice to shift the plot
:type cloudburst_id: int
:type base_name: str
:type shift: int
:return: path and file
:rtype: str
"""
function_str = 'get_cloudburst_plot'
logger.info(
'get_cloudburst_plot - cloudburst_id: %s, base_name: %s' % (
str(cloudburst_id), str(base_name)))
save_to_file = '%s/cloudburst_id.%s.%s.shift.%s.png' % (
settings.SKYLINE_TMP_DIR, str(cloudburst_id), base_name, str(shift))
if all_in_period:
save_to_file = '%s/cloudburst_id.%s.all.%s.shift.%s.png' % (
settings.SKYLINE_TMP_DIR, str(cloudburst_id), base_name, str(shift))
cloudburst_dict = {}
try:
cloudburst_dict = get_cloudburst_row(skyline_app, cloudburst_id)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: get_cloudburst_row failed - %s' % (
function_str, err))
raise
if not cloudburst_dict:
logger.error('error :: %s :: no cloudburst_dict - %s' % function_str)
return None, None
if os.path.isfile(save_to_file):
return cloudburst_dict, save_to_file
try:
from_timestamp = cloudburst_dict['from_timestamp']
until_timestamp = from_timestamp + cloudburst_dict['full_duration']
resolution = cloudburst_dict['resolution']
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed parse values from cloudburst_dict - %s' % (
function_str, err))
raise
metrics_functions = {}
metrics_functions[base_name] = {}
metrics_functions[base_name]['functions'] = None
if resolution > 60:
resolution_minutes = int(resolution / 60)
summarize_intervalString = '%smin' % str(resolution_minutes)
summarize_func = 'median'
metrics_functions[base_name]['functions'] = {'summarize': {'intervalString': summarize_intervalString, 'func': summarize_func}}
# @added 20221103 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
labelled_metric = False
if base_name.startswith('labelled_metrics.'):
labelled_metric = True
timeseries = []
try:
timeseries = get_victoriametrics_metric(
skyline_app, base_name, from_timestamp, until_timestamp,
'list', 'object')
except Exception as err:
logger.error('error :: %s :: get_victoriametrics_metric failed for %s - %s' % (
function_str, base_name, err))
raise
if timeseries:
metrics_timeseries = {}
metrics_timeseries[base_name] = {}
metrics_timeseries[base_name]['timeseries'] = timeseries
else:
logger.warning('warning :: %s :: failed to retrieve timeseries from VictoriaMetrics for %s' % (
function_str, base_name))
if not labelled_metric:
try:
metrics_timeseries = get_metrics_timeseries(skyline_app, metrics_functions, from_timestamp, until_timestamp, log=False)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: get_metrics_timeseries failed - %s' % (
function_str, err))
raise
try:
timeseries = metrics_timeseries[base_name]['timeseries']
timeseries_length = len(timeseries)
timeseries = timeseries[1:(timeseries_length - 2)]
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to determine timeseries - %s' % (
function_str, err))
raise
anomalies_in_period = []
if all_in_period:
try:
engine, fail_msg, trace = get_engine(skyline_app)
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: %s :: could not get a MySQL engine - %s' % (function_str, err)
logger.error('%s' % fail_msg)
if engine:
engine_disposal(skyline_app, engine)
raise
try:
cloudburst_table, log_msg, trace = cloudburst_table_meta(skyline_app, engine)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to get cloudburst_table meta for cloudburst id %s - %s' % (
function_str, str(cloudburst_id), err))
if engine:
engine_disposal(engine)
raise
try:
connection = engine.connect()
stmt = select([cloudburst_table]).\
where(cloudburst_table.c.metric_id == cloudburst_dict['metric_id']).\
where(cloudburst_table.c.timestamp >= from_timestamp).\
where(cloudburst_table.c.timestamp <= until_timestamp).\
where(cloudburst_table.c.id != cloudburst_id)
result = connection.execute(stmt)
for row in result:
anomalies_in_period.append([row['timestamp'], row['end']])
connection.close()
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: could not get cloudburst row for cloudburst id %s - %s' % (
function_str, str(cloudburst_id), err))
if engine:
engine_disposal(engine)
raise
if engine:
engine_disposal(skyline_app, engine)
anomalies = []
if anomalies_in_period:
logger.info(
'get_cloudburst_plot - adding %s all_in_period anomalies to cloudburst plot' % (
str(len(anomalies_in_period))))
for period_anomalies in anomalies_in_period:
new_anomalies = [item for item in timeseries if int(item[0]) >= period_anomalies[0] and int(item[0]) <= period_anomalies[1]]
if new_anomalies:
anomalies = anomalies + new_anomalies
try:
cloudburst_anomalies = [item for item in timeseries if int(item[0]) >= cloudburst_dict['timestamp'] and int(item[0]) <= cloudburst_dict['end']]
anomalies = anomalies + cloudburst_anomalies
logger.info(
'get_cloudburst_plot - adding %s cloudbursts: %s' % (
str(len(anomalies)), str(anomalies)))
aligned_timeseries = []
for ts, value in timeseries:
# aligned_timeseries.append([int(int(ts) // 600 * 600), value])
aligned_timeseries.append([int(int(ts) // resolution * resolution), value])
if aligned_timeseries:
timeseries = list(aligned_timeseries)
df = pd.DataFrame(timeseries, columns=['date', 'value'])
df['date'] = pd.to_datetime(df['date'], unit='s')
datetime_index = pd.DatetimeIndex(df['date'].values)
df = df.set_index(datetime_index)
df.drop('date', axis=1, inplace=True)
anomalies_data = []
# @modified 20210831
# Align periods
# anomaly_timestamps = [int(item[0]) for item in anomalies]
# anomaly_timestamps = [(int(item[0]) + (resolution * 2)) for item in anomalies]
# anomaly_timestamps = [(int(item[0]) + (resolution * 6)) for item in anomalies]
# anomaly_timestamps = [(int(item[0]) + (resolution * 4)) for item in anomalies]
# anomaly_timestamps = [(int(item[0]) + (resolution * 3)) for item in anomalies]
anomaly_timestamps = [(int(item[0]) + (resolution * shift)) for item in anomalies]
# anomaly_timestamps = [int(item[0]) for item in anomalies]
anomaly_timestamps = [int(int(ts) // resolution * resolution) for ts in anomaly_timestamps]
for item in timeseries:
if int(item[0]) in anomaly_timestamps:
anomalies_data.append(1)
else:
anomalies_data.append(0)
df['anomalies'] = anomalies_data
title = '%s\ncloudburst id: %s' % (base_name, str(cloudburst_id))
if all_in_period:
title = '%s (all in period)' % title
plot(df['value'], anomaly=df['anomalies'], anomaly_color='red', title=title, save_to_file=save_to_file)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: %s :: failed to plot cloudburst - %s' % (
function_str, err))
raise
if not os.path.isfile(save_to_file):
return cloudburst_dict, None
return cloudburst_dict, save_to_file