import logging
import traceback
import settings
import skyline_version
from sqlalchemy.sql import select
from database import get_engine, snab_table_meta
# @added 20230802 - Feature #5038: snab_results_algorithms
# Feature #5008: webapp - snab report page
from functions.database.queries.get_snab_algorithms_results import get_snab_algorithms_results
# @added 20230915 - Feature #5070: snab - anomaly_detection_metrics
from functions.snab.anomaly_detection_metrics import anomaly_detection_metrics
skyline_version = skyline_version.__absolute_version__
skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
[docs]
def get_snab_engine():
try:
engine, fail_msg, trace = get_engine(skyline_app)
return engine, fail_msg, trace
except:
trace = traceback.format_exc()
logger.error('%s' % trace)
fail_msg = 'error :: get_snab_report :: failed to get MySQL engine for snab table'
logger.error('%s' % fail_msg)
# return None, fail_msg, trace
raise # to webapp to return in the UI
[docs]
def snab_engine_disposal(engine):
if engine:
try:
engine.dispose()
except:
logger.error(traceback.format_exc())
logger.error('error :: get_snab_report :: calling engine.dispose()')
return
# @added 20230718 - Feature #5008: webapp - snab report page
[docs]
def get_snab_report(filter_on):
"""
Get the relevant data from the snab table.
:param filter_on: the filter_on dictionary
:param algorithms: the algorithms dictionary
:param algorithm_groups: the algorithm_groups dictionary
:type filter_on: dict
:type algorithms: dict
:type algorithm_groups: dict
:return: report_data
:rtype: dict
"""
# @added 20230915 - Feature #5008: webapp - snab report page
# Add heatmap to snab report page
def all_channels2(func):
def wrapper(channel1, channel2, *args, **kwargs):
try:
return func(channel1, channel2, *args, **kwargs)
except TypeError:
return tuple(func(c1, c2, *args, **kwargs) for c1,c2 in zip(channel1, channel2))
return wrapper
@all_channels2
def lerp(color1, color2, frac):
return color1 * (1 - frac) + color2 * frac
def hexcolor(rgb):
return f'#{int(rgb[0]):0>2X}{int(rgb[1]):0>2X}{int(rgb[2]):0>2X}'
report_data = {}
logger.info('get_snab_report :: filter_on: %s' % (str(filter_on)))
algorithm = None
algorithm_id = 0
if filter_on['algorithm']:
algorithm = filter_on['algorithm']
algorithm_id = filter_on['algorithms'][algorithm]
report_data['algorithm'] = algorithm
algorithms_by_name = filter_on['algorithms']
algorithms_by_id = {}
for algo in list(algorithms_by_name.keys()):
algo_id = algorithms_by_name[algo]
algorithms_by_id[algo_id] = algo
algorithm_groups_by_name = filter_on['algorithm_groups']
algorithm_groups_by_id = {}
for algorithm_group_name in list(algorithm_groups_by_name.keys()):
algorithm_group_id = algorithm_groups_by_name[algorithm_group_name]
algorithm_groups_by_id[algorithm_group_id] = algorithm_group_name
# @added 20230722 - Feature #5008: webapp - snab report page
# algorithm_group_ids = []
# algorithm_ids = []
# if 'all_algorithms_by_id' in list(filter_on.keys()):
# algorithm_groups_by_id = filter_on['all_algorithms_by_id']
# if 'all_algorithm_groups_by_id' in list(filter_on.keys()):
# algorithm_groups_by_id = filter_on['all_algorithm_groups_by_id']
# if algorithm_groups_by_id:
# algorithm_group_ids = list(algorithm_groups_by_id.keys())
# algorithm_ids = []
from_timestamp = filter_on['from_timestamp']
until_timestamp = filter_on['until_timestamp']
# @added 20230919 - Feature #5008: webapp - snab report page
# Added the ability to exclude algorithms
exclude_algorithms = filter_on['exclude_algorithms']
logger.info('get_snab_report :: getting report on algorithm: %s, algorithm_id: %s, with filter_on: %s' % (
algorithm, str(algorithm_id), str(filter_on)))
logger.info('get_snab_report :: getting MySQL engine')
try:
engine, fail_msg, trace = get_snab_engine()
logger.info(fail_msg)
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
logger.error('%s' % fail_msg)
logger.error('error :: get_snab_report :: could not get a MySQL engine to get snab table - %s' % str(err))
raise # to webapp to return in the UI
try:
snab_table, log_msg, trace = snab_table_meta(skyline_app, engine)
logger.info(log_msg)
logger.info('get_snab_report :: snab_table OK')
except:
logger.error(traceback.format_exc())
logger.error('error :: get_snab_report :: failed to get snab_table meta')
if engine:
snab_engine_disposal(engine)
raise # to webapp to return in the UI
anomaly_ids = []
algorithm_results = {}
try:
connection = engine.connect()
stmt = select([snab_table]).\
where(snab_table.c.algorithm_id == algorithm_id)
if from_timestamp:
stmt = select([snab_table]).\
where(snab_table.c.algorithm_id == algorithm_id).\
where(snab_table.c.snab_timestamp >= int(from_timestamp))
if until_timestamp:
stmt = select([snab_table]).\
where(snab_table.c.algorithm_id == algorithm_id).\
where(snab_table.c.snab_timestamp <= int(until_timestamp))
if from_timestamp and until_timestamp:
stmt = select([snab_table]).\
where(snab_table.c.algorithm_id == algorithm_id).\
where(snab_table.c.snab_timestamp >= int(from_timestamp)).\
where(snab_table.c.snab_timestamp <= int(until_timestamp))
results = connection.execute(stmt)
for row in results:
snab_id = row['id']
algorithm_results[snab_id] = dict(row)
anomaly_ids.append(row['anomaly_id'])
connection.close()
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: could not determine algorithm_results - %s' % str(err)
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
logger.info('get_snab_report :: determined %s SNAB results for %s' % (
str(len(algorithm_results)), algorithm))
anomaly_ids_set = set(anomaly_ids)
anomaly_ids = list(set(anomaly_ids))
snab_records = {}
algorithm_groups = []
all_sql_results = {}
try:
connection = engine.connect()
# @modified 20230922 -
# Improve query performance because the WHERE IN query is inefficient
if not anomaly_ids:
stmt = select([snab_table], snab_table.c.anomaly_id.in_(anomaly_ids))
else:
stmt = select([snab_table]).\
where(snab_table.c.anomaly_id >= min(anomaly_ids)).\
where(snab_table.c.anomaly_id <= max(anomaly_ids))
logger.info('get_snab_report :: querying snab table for %s anomaly_ids' % (
str(len(anomaly_ids))))
results = connection.execute(stmt)
logger.info('get_snab_report :: queried snab table for anomaly_ids')
for row in results:
# @added 20230922 -
# Improve query performance because the WHERE IN query is inefficient
all_sql_results[row['id']] = dict(row)
continue
snab_id = row['id']
algorithm_group_id = row['algorithm_group_id']
algorithm_groups.append(algorithm_group_id)
snab_records[snab_id] = dict(row)
connection.close()
logger.info('get_snab_report :: created all_sql_results dict with %s items' % str(len(all_sql_results)))
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: could not determine snab_records - %s' % str(err)
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
# @added 20230922 -
# Improve query performance because the WHERE IN query is inefficient
for key, value in all_sql_results.items():
# membership of the list is slower than membership of a set
# if all_sql_results[key]['anomaly_id'] in anomaly_ids:
if value['anomaly_id'] in anomaly_ids_set:
algorithm_group_id = value['algorithm_group_id']
algorithm_groups.append(algorithm_group_id)
snab_records[key] = value
logger.info('get_snab_report :: determined %s SNAB results to report on' % (
str(len(snab_records))))
algorithm_groups = list(set(algorithm_groups))
logger.info('get_snab_report :: reporting on algorithm_groups: %s' % (
str(algorithm_groups)))
# @added 20230802 - Feature #5038: snab_results_algorithms
# Feature #5008: webapp - snab report page
snab_checked_snab_ids = []
snab_checked_snab_ids_results = {}
# @added 20230915 - Feature #5070: snab - anomaly_detection_metrics
ad_metric_keys = [
'f1_score', 'precision', 'recall', 'accuracy', 'fP_rate', 'fN_rate',
# @added 20230916 - Feature #5070: snab - anomaly_detection_metrics
# Feature #5008: webapp - snab report page
# Added auc_roc
'tP_rate', 'tN_rate', 'auc_roc',
]
report_data['results'] = {}
report_data['algorithms'] = []
snab_ids = list(snab_records.keys())
for algorithm_group_id in algorithm_groups:
try:
try:
algorithm_group_name = algorithm_groups_by_id[algorithm_group_id]
except:
continue
# @added 20230919 - Feature #5008: webapp - snab report page
# Added the ability to exclude algorithms
if algorithm_group_name in exclude_algorithms:
continue
for snab_id in snab_ids:
if snab_records[snab_id]['algorithm_group_id'] != algorithm_group_id:
continue
if snab_records[snab_id]['algorithm_id']:
i_algorithm_id = int(snab_records[snab_id]['algorithm_id'])
i_algorithm = algorithms_by_id[i_algorithm_id]
else:
i_algorithm = algorithm_group_name
i_algorithm_id = None
break
# @added 20230919 - Feature #5008: webapp - snab report page
# Added the ability to exclude algorithms
if i_algorithm in exclude_algorithms:
continue
report_data['algorithms'].append(i_algorithm)
report_data['results'][i_algorithm] = {}
report_data['results'][i_algorithm]['algorithm_id'] = i_algorithm_id
report_data['results'][i_algorithm]['algorithm_group_name'] = algorithm_group_name
report_data['results'][i_algorithm]['algorithm_group_id'] = algorithm_group_id
total = 0
total_evaluated = 0
percent_evaluated = 0
total_fP = 0
percent_fP = 0
total_tP = 0
percent_tP = 0
total_fN = 0
percent_fN = 0
total_tN = 0
percent_tN = 0
total_unsure = 0
percent_unsure = 0
runtimes = []
for snab_id in snab_ids:
if snab_records[snab_id]['algorithm_group_id'] != algorithm_group_id:
continue
# @added 20230802 - Feature #5038: snab_results_algorithms
# Feature #5008: webapp - snab report page
snab_checked_snab_ids.append(int(snab_id))
runtimes.append(snab_records[snab_id]['runtime'])
total += 1
if snab_records[snab_id]['fP']:
total_fP += 1
total_evaluated += 1
snab_checked_snab_ids_results[snab_id] = 'fP'
continue
if snab_records[snab_id]['tP']:
total_tP += 1
total_evaluated += 1
snab_checked_snab_ids_results[snab_id] = 'tP'
continue
if snab_records[snab_id]['fN']:
total_fN += 1
total_evaluated += 1
snab_checked_snab_ids_results[snab_id] = 'fN'
continue
if snab_records[snab_id]['tN']:
total_tN += 1
total_evaluated += 1
snab_checked_snab_ids_results[snab_id] = 'tN'
continue
if snab_records[snab_id]['unsure']:
total_unsure += 1
total_evaluated += 1
snab_checked_snab_ids_results[snab_id] = 'unsure'
continue
report_data['results'][i_algorithm]['analysed'] = total
report_data['results'][i_algorithm]['evaluated'] = total_evaluated
if total and total_evaluated:
percent_evaluated = round(((total_evaluated / total) * 100), 2)
report_data['results'][i_algorithm]['evaluated %'] = percent_evaluated
report_data['results'][i_algorithm]['tP'] = total_tP
if total_tP and total_evaluated:
percent_tP = round(((total_tP / total_evaluated) * 100), 2)
report_data['results'][i_algorithm]['tP %'] = percent_tP
report_data['results'][i_algorithm]['fP'] = total_fP
if total_fP and total_evaluated:
percent_fP = round(((total_fP / total_evaluated) * 100), 2)
report_data['results'][i_algorithm]['fP %'] = percent_fP
report_data['results'][i_algorithm]['tN'] = total_tN
if total_tN and total_evaluated:
percent_tN = round(((total_tN / total_evaluated) * 100), 2)
report_data['results'][i_algorithm]['tN %'] = percent_tN
report_data['results'][i_algorithm]['fN'] = total_fN
percent_fN = 0
if total_fN and total_evaluated:
percent_fN = round(((total_fN / total_evaluated) * 100), 2)
report_data['results'][i_algorithm]['fN %'] = percent_fN
report_data['results'][i_algorithm]['unsure'] = total_unsure
if total_unsure and total_evaluated:
percent_unsure = round(((total_unsure / total_evaluated) * 100), 2)
report_data['results'][i_algorithm]['unsure %'] = percent_unsure
try:
incorrect = (((total_fP + total_fN) / (total_evaluated - total_unsure)) * 100)
except:
incorrect = 0
report_data['results'][i_algorithm]['incorrect % ((fP+fN)/(evaluated-unsure)*100)'] = round(incorrect, 2)
try:
accuracy = (((total_tP + total_tN) / (total_evaluated - total_unsure)) * 100)
except:
accuracy = 0
report_data['results'][i_algorithm]['accuracy % ((tP+tN)/(evaluated-unsure)*100)'] = round(accuracy, 2)
try:
accuracy_fn_penalty = (((total_tP + total_tN) / (total_evaluated - total_unsure)) * 100) - percent_fN
except:
accuracy_fn_penalty = 0
report_data['results'][i_algorithm]['fN penalty accuracy % ((tP+tN)/(evaluated-unsure)*100)-fN%'] = round(accuracy_fn_penalty, 2)
# @added 20230915 - Feature #5070: snab - anomaly_detection_metrics
ad_metrics = {
'algorithm': i_algorithm, 'evaluated': total_evaluated, 'tP': total_tP, 'fP': total_fP,
'tN': total_tN, 'fN': total_fN,
'f1_score': None, 'precision': None, 'recall': None,
'accuracy': None, 'fP_rate': None, 'fN_rate': None,
# @added 20230916 - Feature #5070: snab - anomaly_detection_metrics
# Feature #5008: webapp - snab report page
# Added auc_roc
'tP_rate': None, 'tN_rate': None, 'auc_roc': None,
}
if i_algorithm == report_data['algorithm']:
try:
ad_metrics = anomaly_detection_metrics(skyline_app, algorithm=i_algorithm, tP=total_tP, fP=total_fP,
tN=total_tN, fN=total_fN,
evaluated=(total_evaluated - total_unsure))
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: anomaly_detection_metrics failed on algorithm: %s - %s' % (
i_algorithm, str(err))
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
if ad_metrics:
for ad_key in ad_metric_keys:
report_data['results'][i_algorithm][ad_key] = ad_metrics[ad_key]
# Original SNAB was not recording runtime so only calculate if
# present
sum_runtimes = None
try:
sum_runtimes = sum(runtimes)
except:
sum_runtimes = None
if sum_runtimes:
avg_runtime = round((sum_runtimes / len(runtimes)), 2)
else:
avg_runtime = 'not recorded'
report_data['results'][i_algorithm]['avg_runtime'] = avg_runtime
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: could not determine report_data - %s' % str(err)
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
if len(report_data['algorithms']) > 1:
report_data['algorithms'] = list(set(report_data['algorithms']))
# @added 20230802 - Feature #5038: snab_results_algorithms
# Feature #5008: webapp - snab report page
# Add a summary of snab_algorithms_results reporting the number of tP, fP,
# tN, fN per algorithm and consensus group
snab_checked_snab_ids = list(set(snab_checked_snab_ids))
algorithms_results = {}
consensuses = {}
algorithm_results = {}
algorithm_group_id_results = {}
logger.info('get_snab_report :: %s snab results to determine algorithm_results on' % str(len(snab_checked_snab_ids)))
logger.info('get_snab_report :: %s snab_checked_snab_ids_results' % str(len(snab_checked_snab_ids_results)))
remove_algo_keys = []
if snab_checked_snab_ids:
try:
algorithms_results = get_snab_algorithms_results(skyline_app, snab_id=0, snab_ids=snab_checked_snab_ids)
logger.info('get_snab_report :: get_snab_algorithms_results got %s algorithms_results' % str(len(algorithms_results)))
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: get_snab_algorithms_results failed - %s' % str(err)
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
if algorithms_results:
for snab_id in list(algorithms_results.keys()):
algorithm_group_id = algorithms_results[snab_id]['algorithm_group_id']
algorithm = algorithms_results[snab_id]['algorithm']
snab_algorithm = str(algorithm)
# @added 20230919 - Feature #5008: webapp - snab report page
# Added the ability to exclude algorithms
if snab_algorithm in exclude_algorithms:
continue
snab_evaluated_result = False
try:
snab_evaluated_result = snab_checked_snab_ids_results[snab_id]
except:
snab_evaluated_result = False
consensus_achieved = algorithms_results[snab_id]['consensus_achieved']
consensus_achieved_str = None
if consensus_achieved:
consensus_achieved_str = str(algorithms_results[snab_id]['consensus_achieved'])
# @added 20230919 - Feature #5008: webapp - snab report page
# Added the ability to exclude algorithms
skip = False
for ex_algorithm in exclude_algorithms:
if ex_algorithm in consensus_achieved:
skip = True
break
if skip:
continue
if consensus_achieved_str not in consensuses:
consensuses[consensus_achieved_str] = {
# 'algorithm_group_id': algorithm_group_id,
# 'algorithm': snab_algorithm,
'evaluated': 0,
'tP': 0, 'tP %': 0,
'fP': 0, 'fP %': 0,
'tN': 0, 'tN %': 0,
'fN': 0, 'fN %': 0,
'unsure': 0, 'unsure %': 0,
}
if snab_evaluated_result and consensus_achieved_str:
consensuses[consensus_achieved_str]['evaluated'] += 1
consensuses[consensus_achieved_str][snab_evaluated_result] += 1
for algorithms_results_id in algorithms_results[snab_id]['algorithms_results']:
algorithm_group_id = algorithms_results[snab_id]['algorithms_results'][algorithms_results_id]['algorithm_group_id']
algorithm = algorithms_results[snab_id]['algorithms_results'][algorithms_results_id]['algorithm']
# @added 20230919 - Feature #5008: webapp - snab report page
# Added the ability to exclude algorithms
if algorithm in exclude_algorithms:
continue
if algorithm_group_id not in algorithm_group_id_results:
algorithm_group_id_results[algorithm_group_id] = {
# 'algorithm_group_id': algorithm_group_id,
'algorithm': algorithm,
'evaluated': 0,
'tP': 0, 'tP %': 0,
'fP': 0, 'fP %': 0,
'tN': 0, 'tN %': 0,
'fN': 0, 'fN %': 0,
'unsure': 0, 'unsure %': 0,
}
if algorithm not in algorithm_results:
algorithm_results[algorithm] = {
# 'algorithm_group_id': algorithm_group_id,
# 'algorithm': algorithm,
'evaluated': 0,
'tP': 0, 'tP %': 0,
'fP': 0, 'fP %': 0,
'tN': 0, 'tN %': 0,
'fN': 0, 'fN %': 0,
'unsure': 0, 'unsure %': 0,
'runtimes': [],
}
if snab_evaluated_result:
algorithm_results[algorithm]['evaluated'] += 1
anomalyScore = algorithms_results[snab_id]['algorithms_results'][algorithms_results_id]['anomalyScore']
runtime = algorithms_results[snab_id]['algorithms_results'][algorithms_results_id]['runtime']
if algorithm in ['single_value_anomaly']:
if snab_evaluated_result:
use_evaluated_result = 'tN'
in_consensus_achieved_str = False
if consensus_achieved_str:
if algorithm in consensus_achieved_str:
in_consensus_achieved_str = True
if anomalyScore and snab_evaluated_result == 'tP':
use_evaluated_result = snab_evaluated_result
if anomalyScore and snab_evaluated_result == 'fN':
use_evaluated_result = snab_evaluated_result
algorithm_results[algorithm][use_evaluated_result] += 1
algorithm_results[algorithm]['runtimes'].append(runtime)
continue
if anomalyScore and snab_evaluated_result == 'fP':
algorithm_results[algorithm][snab_evaluated_result] += 1
if anomalyScore and snab_evaluated_result == 'tP':
algorithm_results[algorithm][snab_evaluated_result] += 1
if anomalyScore and snab_evaluated_result == 'fN':
# algorithm_results[algorithm][snab_evaluated_result] += 1
algorithm_results[algorithm]['tP'] += 1
if anomalyScore and snab_evaluated_result == 'tN':
# algorithm_results[algorithm][snab_evaluated_result] += 1
algorithm_results[algorithm]['fP'] += 1
if not anomalyScore and snab_evaluated_result == 'fP':
# algorithm_results[algorithm][snab_evaluated_result] += 1
algorithm_results[algorithm]['tN'] += 1
if not anomalyScore and snab_evaluated_result == 'tP':
# algorithm_results[algorithm][snab_evaluated_result] += 1
algorithm_results[algorithm]['fN'] += 1
if not anomalyScore and snab_evaluated_result == 'fN':
algorithm_results[algorithm][snab_evaluated_result] += 1
if not anomalyScore and snab_evaluated_result == 'tN':
algorithm_results[algorithm][snab_evaluated_result] += 1
if snab_evaluated_result == 'unsure':
algorithm_results[algorithm][snab_evaluated_result] += 1
algorithm_results[algorithm]['runtimes'].append(runtime)
consensuses_keys = []
for consensus in list(consensuses.keys()):
evaluated = consensuses[consensus]['evaluated']
# if evaluated == 0:
# del consensuses[consensus]
# continue
tP = consensuses[consensus]['tP']
fP = consensuses[consensus]['fP']
tN = consensuses[consensus]['tN']
fN = consensuses[consensus]['fN']
unsure = consensuses[consensus]['unsure']
if tP:
consensuses[consensus]['tP %'] = round(((tP / evaluated) * 100), 2)
if fP:
consensuses[consensus]['fP %'] = round(((fP / evaluated) * 100), 2)
if tN:
consensuses[consensus]['tN %'] = round(((tN / evaluated) * 100), 2)
percent_fN = 0
if fN:
percent_fN = round(((fN / evaluated) * 100), 2)
consensuses[consensus]['fN %'] = percent_fN
if unsure:
consensuses[consensus]['unsure %'] = round(((unsure / evaluated) * 100), 2)
try:
incorrect = (((fP + fN) / (evaluated - unsure)) * 100)
except:
incorrect = 0
consensuses[consensus]['incorrect % ((fP+fN)/(evaluated-unsure)*100)'] = round(incorrect, 2)
try:
accuracy = (((tP + tN) / (evaluated - unsure)) * 100)
except:
accuracy = 0
consensuses[consensus]['accuracy % ((tP+tN)/(evaluated-unsure)*100)'] = round(accuracy, 2)
try:
accuracy_fn_penalty = (((tP + tN) / (evaluated - unsure)) * 100) - percent_fN
except:
accuracy_fn_penalty = 0
consensuses[consensus]['fN penalty accuracy % ((tP+tN)/(evaluated-unsure)*100)-fN%'] = round(accuracy_fn_penalty, 2)
# @added 20230915 - Feature #5070: snab - anomaly_detection_metrics
ad_metrics = {
'consensus': consensus, 'evaluated': evaluated, 'tP': tP, 'fP': fP, 'tN': tN,
'fN': fN,
'f1_score': None, 'precision': None, 'recall': None,
'accuracy': None, 'fP_rate': None, 'fN_rate': None,
# @added 20230916 - Feature #5070: snab - anomaly_detection_metrics
# Feature #5008: webapp - snab report page
# Added auc_roc
'tP_rate': None, 'tN_rate': None, 'auc_roc': None,
}
try:
ad_metrics = anomaly_detection_metrics(skyline_app, algorithm=consensus, tP=tP, fP=fP,
tN=tN, fN=fN,
evaluated=(evaluated - unsure))
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: anomaly_detection_metrics failed on consensus: %s - %s' % (
consensus, str(err))
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
if ad_metrics:
for ad_key in ad_metric_keys:
consensuses[consensus][ad_key] = ad_metrics[ad_key]
if not consensuses_keys:
consensuses_keys = list(consensuses[consensus].keys())
algorithm_results_keys = []
for algorithm in list(algorithm_results.keys()):
evaluated = algorithm_results[algorithm]['evaluated']
if evaluated == 0:
remove_algo_keys.append(algorithm)
# continue
tP = algorithm_results[algorithm]['tP']
fP = algorithm_results[algorithm]['fP']
tN = algorithm_results[algorithm]['tN']
fN = algorithm_results[algorithm]['fN']
unsure = algorithm_results[algorithm]['unsure']
if tP:
algorithm_results[algorithm]['tP %'] = round(((tP / evaluated) * 100), 2)
if fP:
algorithm_results[algorithm]['fP %'] = round(((fP / evaluated) * 100), 2)
if tN:
algorithm_results[algorithm]['tN %'] = round(((tN / evaluated) * 100), 2)
percent_fN = 0
if fN:
percent_fN = round(((fN / evaluated) * 100), 2)
algorithm_results[algorithm]['fN %'] = percent_fN
if unsure:
algorithm_results[algorithm]['unsure %'] = round(((unsure / evaluated) * 100), 2)
try:
incorrect = (((fP + fN) / (evaluated - unsure)) * 100)
except:
incorrect = 0
algorithm_results[algorithm]['incorrect % ((fP+fN)/(evaluated-unsure)*100)'] = round(incorrect, 2)
try:
accuracy = (((tP + tN) / (evaluated - unsure)) * 100)
except:
accuracy = 0
algorithm_results[algorithm]['accuracy % ((tP+tN)/(evaluated-unsure)*100)'] = round(accuracy, 2)
try:
accuracy_fn_penalty = (((tP + tN) / (evaluated - unsure)) * 100) - percent_fN
except:
accuracy_fn_penalty = 0
algorithm_results[algorithm]['fN penalty accuracy % ((tP+tN)/(evaluated-unsure)*100)-fN%'] = round(accuracy_fn_penalty, 2)
# @added 20230915 - Feature #5070: snab - anomaly_detection_metrics
ad_metrics = {
'algorithm': algorithm, 'evaluated': evaluated, 'tP': tP, 'fP': fP, 'tN': tN,
'fN': fN,
'f1_score': None, 'precision': None, 'recall': None,
'accuracy': None, 'fP_rate': None, 'fN_rate': None,
# @added 20230916 - Feature #5070: snab - anomaly_detection_metrics
# Feature #5008: webapp - snab report page
# Added auc_roc
'tP_rate': None, 'tN_rate': None, 'auc_roc': None,
}
try:
ad_metrics = anomaly_detection_metrics(skyline_app, algorithm=algorithm, tP=tP, fP=fP,
tN=tN, fN=fN,
evaluated=(evaluated - unsure))
except Exception as err:
trace = traceback.format_exc()
logger.error(trace)
fail_msg = 'error :: get_snab_report :: anomaly_detection_metrics failed on algorithm: %s - %s' % (
algorithm, str(err))
logger.error(fail_msg)
if engine:
try:
connection.close()
except:
pass
snab_engine_disposal(engine)
raise
# @modified 20230920
# Now evaluated == 0 are being removed only add if the consensus
# has not been removed
# if ad_metrics:
if ad_metrics and consensus in list(consensuses.keys()):
consensuses[consensus]['anomaly_detection_metrics'] = ad_metrics
# algorithm_results[algorithm]
if ad_metrics:
for ad_key in ad_metric_keys:
algorithm_results[algorithm][ad_key] = ad_metrics[ad_key]
runtimes = list(algorithm_results[algorithm]['runtimes'])
del algorithm_results[algorithm]['runtimes']
algorithm_results[algorithm]['avg_runtime'] = round((sum(runtimes) / len(runtimes)), 4)
if not algorithm_results_keys:
algorithm_results_keys = list(algorithm_results[algorithm].keys())
report_data['consensuses'] = {}
report_data['consensuses']['keys'] = consensuses_keys
report_data['consensuses']['results'] = consensuses
report_data['algorithm_results'] = {}
report_data['algorithm_results']['keys'] = algorithm_results_keys
report_data['algorithm_results']['results'] = algorithm_results
remove_keys = False
if remove_keys:
remove_algo_keys = list(set(remove_algo_keys))
for algo in list(set(remove_algo_keys)):
try:
del report_data['algorithm_results']['results'][algo]
except:
pass
if 'algorithm_results' in report_data:
if 'results' in report_data['algorithm_results']:
if len(report_data['algorithm_results']['results']) == 0:
del report_data['algorithm_results']
if 'consensuses' in report_data:
if 'results' in report_data['consensuses']:
if len(report_data['consensuses']['results']) == 0:
del report_data['consensuses']
logger.info('get_snab_report :: report_data: %s' % str(report_data))
# @added 20230915 - Feature #5008: webapp - snab report page
# Add heatmap to snab report page
coloured_key_strs = ['fP %', 'fN %', 'incorrect', 'accuracy'] + ad_metric_keys
inverted_key_strs = ['fP %', 'fN %', 'incorrect', 'fP_rate', 'fN_rate']
for algo in list(report_data['results'].keys()):
for key in list(report_data['results'][algo].keys()):
# if '%' in key and 'heatmap' not in key:
if ('%' in key and 'heatmap' not in key) or (key in ad_metric_keys and 'heatmap' not in key):
h_key = 'heatmap_html_color_code_%s' % key
value = report_data['results'][algo][key]
if value is not None:
original_value = float(value)
else:
original_value = value
coloured_key = False
for key_str in coloured_key_strs:
if key_str in key:
coloured_key = True
break
if not coloured_key:
continue
if key in ad_metric_keys:
if not isinstance(value, float):
continue
inverted_key_colour = False
for key_str in inverted_key_strs:
if key_str in key:
inverted_key_colour = True
break
if inverted_key_colour:
if key in ad_metric_keys:
value = 1 - value
else:
value = 100 - value
if value < 25:
colour = hexcolor(lerp((0xff, 0x00, 0x00), (0xff, 0x80, 0x00), value / 25))
elif value < 50:
colour = hexcolor(lerp((0xFF, 0x80, 0x00), (0xFF, 0xFF, 0x00), (value - 25) / 25))
elif value < 75:
colour = hexcolor(lerp((0xFF, 0xFF, 0x00), (0x53, 0xd9, 0x26), (value - 50) / 25))
else:
colour = hexcolor(lerp((0x53, 0xd9, 0x26), (0x00, 0xff, 0x00), (value - 75) / 25))
if key in ad_metric_keys:
if value < 0.25:
colour = hexcolor(lerp((0xff, 0x00, 0x00), (0xff, 0x80, 0x00), (value * 100) / 25))
elif value < 0.50:
colour = hexcolor(lerp((0xFF, 0x80, 0x00), (0xFF, 0xFF, 0x00), ((value * 100) - 25) / 25))
elif value < 0.75:
colour = hexcolor(lerp((0xFF, 0xFF, 0x00), (0x53, 0xd9, 0x26), ((value * 100) - 50) / 25))
else:
colour = hexcolor(lerp((0x53, 0xd9, 0x26), (0x00, 0xff, 0x00), ((value * 100) - 75) / 25))
if key == 'auc_roc':
if original_value == 1:
colour = '#00FF00'
if key == 'fN %':
if value > 98:
colour = '#00FF00'
elif value > 94:
colour = '#FFFF00'
elif value > 90:
colour = '#ffa500'
else:
colour = '#FF0000'
report_data['results'][algo][h_key] = colour
if 'algorithm_results' in list(report_data.keys()):
for algo in list(report_data['algorithm_results']['results'].keys()):
for key in list(report_data['algorithm_results']['results'][algo].keys()):
# if '%' in key and 'heatmap' not in key:
if ('%' in key and 'heatmap' not in key) or (key in ad_metric_keys and 'heatmap' not in key):
h_key = 'heatmap_html_color_code_%s' % key
value = report_data['algorithm_results']['results'][algo][key]
coloured_key = False
for key_str in coloured_key_strs:
if key_str in key:
coloured_key = True
break
if not coloured_key:
continue
if key in ad_metric_keys:
if not isinstance(value, float):
continue
inverted_key_colour = False
for key_str in inverted_key_strs:
if key_str in key:
inverted_key_colour = True
break
if inverted_key_colour:
if key in ad_metric_keys:
value = 1 - value
else:
value = 100 - value
if value < 25:
colour = hexcolor(lerp((0xff, 0x00, 0x00), (0xff, 0x80, 0x00), value / 25))
elif value < 50:
colour = hexcolor(lerp((0xFF, 0x80, 0x00), (0xFF, 0xFF, 0x00), (value - 25) / 25))
elif value < 75:
colour = hexcolor(lerp((0xFF, 0xFF, 0x00), (0x53, 0xd9, 0x26), (value - 50) / 25))
else:
colour = hexcolor(lerp((0x53, 0xd9, 0x26), (0x00, 0xff, 0x00), (value - 75) / 25))
if key in ad_metric_keys:
if value < 0.25:
colour = hexcolor(lerp((0xff, 0x00, 0x00), (0xff, 0x80, 0x00), (value * 100) / 25))
elif value < 0.50:
colour = hexcolor(lerp((0xFF, 0x80, 0x00), (0xFF, 0xFF, 0x00), ((value * 100) - 25) / 25))
elif value < 0.75:
colour = hexcolor(lerp((0xFF, 0xFF, 0x00), (0x53, 0xd9, 0x26), ((value * 100) - 50) / 25))
else:
colour = hexcolor(lerp((0x53, 0xd9, 0x26), (0x00, 0xff, 0x00), ((value * 100) - 75) / 25))
if key == 'fN %':
if value > 98:
colour = '#00FF00'
elif value > 94:
colour = '#FFFF00'
elif value > 90:
colour = '#ffa500'
else:
colour = '#FF0000'
report_data['algorithm_results']['results'][algo][h_key] = colour
if 'consensuses' in list(report_data.keys()):
for consensus in list(report_data['consensuses']['results'].keys()):
for key in list(report_data['consensuses']['results'][consensus].keys()):
# if '%' in key and 'heatmap' not in key:
if ('%' in key and 'heatmap' not in key) or (key in ad_metric_keys and 'heatmap' not in key):
h_key = 'heatmap_html_color_code_%s' % key
value = report_data['consensuses']['results'][consensus][key]
coloured_key = False
for key_str in coloured_key_strs:
if key_str in key:
coloured_key = True
break
if not coloured_key:
continue
if key in ad_metric_keys:
if not isinstance(value, float):
continue
inverted_key_colour = False
for key_str in inverted_key_strs:
if key_str in key:
inverted_key_colour = True
break
if inverted_key_colour:
if key in ad_metric_keys:
value = 1 - value
else:
value = 100 - value
if value < 25:
colour = hexcolor(lerp((0xff, 0x00, 0x00), (0xff, 0x80, 0x00), value / 25))
elif value < 50:
colour = hexcolor(lerp((0xFF, 0x80, 0x00), (0xFF, 0xFF, 0x00), (value - 25) / 25))
elif value < 75:
colour = hexcolor(lerp((0xFF, 0xFF, 0x00), (0x53, 0xd9, 0x26), (value - 50) / 25))
else:
colour = hexcolor(lerp((0x53, 0xd9, 0x26), (0x00, 0xff, 0x00), (value - 75) / 25))
if key in ad_metric_keys:
if value < 0.25:
colour = hexcolor(lerp((0xff, 0x00, 0x00), (0xff, 0x80, 0x00), (value * 100) / 25))
elif value < 0.50:
colour = hexcolor(lerp((0xFF, 0x80, 0x00), (0xFF, 0xFF, 0x00), ((value * 100) - 25) / 25))
elif value < 0.75:
colour = hexcolor(lerp((0xFF, 0xFF, 0x00), (0x53, 0xd9, 0x26), ((value * 100) - 50) / 25))
else:
colour = hexcolor(lerp((0x53, 0xd9, 0x26), (0x00, 0xff, 0x00), ((value * 100) - 75) / 25))
if key == 'fN %':
if value > 98:
colour = '#00FF00'
elif value > 94:
colour = '#FFFF00'
elif value > 90:
colour = '#ffa500'
else:
colour = '#FF0000'
report_data['consensuses']['results'][consensus][h_key] = colour
if connection:
try:
connection.close()
except:
pass
if engine:
snab_engine_disposal(engine)
return report_data