"""
analyzer_batch.py
"""
from __future__ import division
import logging
try:
from Queue import Empty
except:
from queue import Empty
from time import time, sleep, strftime, gmtime
from threading import Thread
from collections import defaultdict
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list to reduce memory and number of
# processes
# from multiprocessing import Process, Manager, Queue
from multiprocessing import Process, Queue
import os
from os import kill, getpid
import traceback
import re
from sys import version_info
from sys import exit as sys_exit
# import os.path
from ast import literal_eval
from msgpack import Unpacker
import settings
from skyline_functions import (
write_data_to_file, send_anomalous_metric_to, mkdir_p,
filesafe_metricname,
# @added 20170602 - Feature #2034: analyse_derivatives
nonNegativeDerivative, in_list,
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
get_redis_conn, get_redis_conn_decoded,
# @added 20200506 - Feature #3532: Sort all time series
sort_timeseries)
# @added 20170602 - Feature #2034: analyse_derivatives
# @modified 20220419 - Feature #4528: metrics_manager - derivative_metric_check
# Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Use strictly_increasing_monotonicity shared function
from functions.timeseries.strictly_increasing_monotonicity import strictly_increasing_monotonicity
# @added 20200425 - Feature #3512: matched_or_regexed_in_list function
# Feature #3508: ionosphere_untrainable_metrics
# Feature #3486: analyzer_batch
from matched_or_regexed_in_list import matched_or_regexed_in_list
# @added 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
from functions.metrics.last_known_value_metrics_list import last_known_value_metrics_list
from functions.metrics.zero_fill_metrics_list import zero_fill_metrics_list
# @added 20220407 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
from functions.timeseries.full_duration_timeseries_fill import full_duration_timeseries_fill
# @added 20220421 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
from functions.metrics.non_derivative_metrics_list import non_derivative_metrics_list
# @added 20220504 - Feature #2580: illuminance
from functions.illuminance.add_illuminance_entries import add_illuminance_entries
# @modified 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Changed to algoritms_batch so there is no pollution and
# analyzer and analyzer_batch are totally independent
# from algorithms import run_selected_algorithm
from algorithms_batch import run_selected_batch_algorithm
from algorithm_exceptions import TooShort, Stale, Boring
# TODO if settings.ENABLE_CRUCIBLE: and ENABLE_PANORAMA
# from spectrum import push_to_crucible
skyline_app = 'analyzer_batch'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
# @added 20190410 - Feature #2916: ANALYZER_ENABLED setting
try:
ANALYZER_ENABLED = settings.ANALYZER_ENABLED
logger.info('ANALYZER_ENABLED is set to %s' % str(ANALYZER_ENABLED))
except:
ANALYZER_ENABLED = True
logger.info('warning :: ANALYZER_ENABLED is not declared in settings.py, defaults to True')
try:
from settings import BATCH_PROCESSING
except:
BATCH_PROCESSING = None
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
KNOWN_NEGATIVE_METRICS = list(settings.KNOWN_NEGATIVE_METRICS)
except:
KNOWN_NEGATIVE_METRICS = []
# @added 20200607 - Feature #3566: custom_algorithms
try:
CUSTOM_ALGORITHMS = settings.CUSTOM_ALGORITHMS
except:
CUSTOM_ALGORITHMS = None
try:
DEBUG_CUSTOM_ALGORITHMS = settings.DEBUG_CUSTOM_ALGORITHMS
except:
DEBUG_CUSTOM_ALGORITHMS = False
# @added 20200727 - Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
try:
from settings import ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
except:
ROOMBA_DO_NOT_PROCESS_BATCH_METRICS = False
if ROOMBA_DO_NOT_PROCESS_BATCH_METRICS:
try:
from types import TupleType
except ImportError:
eliminated_in_python3 = True
from redis import WatchError
from msgpack import packb
# @added 20200817 - Feature #3684: ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS
# Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# Allow for custom durations on namespaces
ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS = []
if ROOMBA_DO_NOT_PROCESS_BATCH_METRICS:
try:
from settings import ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS
except:
ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS = False
# @added 20200815 - Feature #3678: SNAB - anomalyScore
try:
SNAB_DATA_DIR = settings.SNAB_DATA_DIR
except:
SNAB_DATA_DIR = '/opt/skyline/SNAB'
try:
SNAB_anomalyScore = settings.SNAB_anomalyScore
except:
SNAB_anomalyScore = {}
# @added 20201017 - Feature #3818: ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED
try:
ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED = settings.ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED
except:
ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED = False
try:
BATCH_MODE = settings.BATCH_PROCESSING_BATCH_MODE
except:
BATCH_MODE = True
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
LOCAL_DEBUG = False
[docs]class AnalyzerBatch(Thread):
"""
The AnalyzerBatch class which controls the analyzer.batch thread and spawned
processes.
Made with love to the analyzer_batch playlist:
https://soundcloud.com/earthgecko/sets/analyzer_batch
https://soundcloud.com/thedeltariggs/ode-to-jeremiah (I can't tell what I've seen..)
https://soundcloud.com/egroove/premiere-francesco-chiocci-feat-black-soda-musumeci-remix-connaisseur-recordings (picking up pieces of my weary mind)
https://soundcloud.com/when-we-dip/premiere-francesco-chiocci-ft-black-soda-black-sunrise-peter-pardeike-remix
https://soundcloud.com/timgreen/atelier-francesco-manuel-feat-astrid-dead-end-tim-green-remixcityfox-1
https://soundcloud.com/imbernonmusic/edu-imbernon-fixing-fires
https://soundcloud.com/deep-house-amsterdam/oliver-koletzki-deep-house-amsterdam-dgtl-podcast-007
https://soundcloud.com/crosstownrebels/crm140-damian-lazarus-the-ancient-moons-vermillion-agoria-remix-1
https://soundcloud.com/wiewouwat/joy-wellboy-before-the-sunrise
https://soundcloud.com/agoria/damian-lazarus-the-ancent-moons-vermillion-agoria-remix
https://soundcloud.com/wearesoundspace/premiere-just-her-feat-kieran-fowkes-let-myself-go
https://soundcloud.com/watergaterecords/matthias-meyer-november-rain
https://soundcloud.com/musicthatmakesmewannasurf/mixtape-2-w-kosson
"""
def __init__(self, parent_pid):
"""
Initialize the AnalyzerBatch
Create the :obj:`self.batch_exceptions_q` queue
Create the :obj:`self.batch_anomaly_breakdown_q` queue
"""
super(AnalyzerBatch, self).__init__()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.batch_exceptions_q = Queue()
self.batch_anomaly_breakdown_q = Queue()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
sys_exit(0)
[docs] def spin_batch_process(self, i, run_timestamp, metric_name, last_analyzed_timestamp, batch=[]):
"""
Assign a metric and last_analyzed_timestamp for a process to analyze.
:param i: python process id
:param run_timestamp: the epoch timestamp at which this process was called
:param metric_name: the FULL_NAMESPACE metric name as keyed in Redis
:param last_analyzed_timestamp: the last analysed timestamp as recorded
in the Redis key last_timestamp.basename key.
:return: returns True
"""
spin_start = time()
child_batch_process_pid = os.getpid()
LOCAL_DEBUG = False
metrics_processed = 0
if not batch:
batch_mode = False
metrics = [[metric_name, last_analyzed_timestamp]]
logger.info('child_batch_process_pid - %s, processing %s from %s' % (
str(child_batch_process_pid), metric_name, str(last_analyzed_timestamp)))
else:
batch_mode = True
metrics = batch
number_of_metrics = len(batch)
logger.info('child_batch_process_pid - %s, processing %s metrics in batch mode' % (
str(child_batch_process_pid), str(number_of_metrics)))
# Make process-specific dicts
exceptions = defaultdict(int)
anomaly_breakdown = defaultdict(int)
# Determine the unique Mirage and Ionosphere metrics once, which are
# used later to determine how Analyzer should handle/route anomalies
try:
mirage_unique_metrics = list(self.redis_conn_decoded.smembers('mirage.unique_metrics'))
except:
mirage_unique_metrics = []
try:
ionosphere_unique_metrics = list(self.redis_conn_decoded.smembers('ionosphere.unique_metrics'))
except:
ionosphere_unique_metrics = []
# In order to convert monotonic, incrementing metrics to a deriative
# metric
try:
# @modified 20211012 - Feature #4280: aet.metrics_manager.derivative_metrics Redis hash
# derivative_metrics = list(self.redis_conn_decoded.smembers('derivative_metrics'))
derivative_metrics = list(self.redis_conn_decoded.smembers('aet.metrics_manager.derivative_metrics'))
except:
derivative_metrics = []
# @added 20220323 - Feature #4502: settings - MONOTONIC_METRIC_NAMESPACES
always_derivative_metrics = []
try:
always_derivative_metrics = list(self.redis_conn_decoded.smembers('metrics_manager.always_derivative_metrics'))
except Exception as err:
logger.error('error :: failed to get metrics_manager.always_derivative_metrics Redis set - %s' % str(err))
if always_derivative_metrics:
all_derivative_metrics = derivative_metrics + always_derivative_metrics
derivative_metrics = list(set(all_derivative_metrics))
try:
non_derivative_metrics = list(self.redis_conn_decoded.smembers('non_derivative_metrics'))
except:
non_derivative_metrics = []
# @added 20220419 - Feature #4528: metrics_manager - derivative_metric_check
# Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
longterm_non_derivative_metrics = []
try:
longterm_non_derivative_metrics_dict = self.redis_conn_decoded.hgetall('metrics_manager.longterm_non_derivative_metrics')
longterm_non_derivative_metrics = list(longterm_non_derivative_metrics_dict.keys())
non_derivative_metrics = list(set(non_derivative_metrics + longterm_non_derivative_metrics))
except Exception as err:
logger.error('error :: metrics_manager :: derivative_metric_check :: failed to hgetall metrics_manager.longterm_non_derivative_metrics - %s' % str(err))
try:
# @modified 20200606 - Bug #3572: Apply list to settings import
non_derivative_monotonic_metrics = list(settings.NON_DERIVATIVE_MONOTONIC_METRICS)
except:
non_derivative_monotonic_metrics = []
# @added 20220421 - Feature #4528: metrics_manager - derivative_metric_check
# Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
for longterm_non_derivative_metric in longterm_non_derivative_metrics:
if longterm_non_derivative_metric.startswith(settings.FULL_NAMESPACE):
longterm_non_derivative_base_name = longterm_non_derivative_metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
longterm_non_derivative_base_name = str(longterm_non_derivative_metric)
non_derivative_monotonic_metrics.append(longterm_non_derivative_base_name)
non_smtp_alerter_metrics = []
try:
non_smtp_alerter_metrics = list(self.redis_conn_decoded.smembers('analyzer.non_smtp_alerter_metrics'))
except:
non_smtp_alerter_metrics = []
# @added 20220406 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
last_known_value_metrics = []
try:
last_known_value_metrics = last_known_value_metrics_list(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: last_known_value_metrics_list failed - %s' % err)
zero_fill_metrics = []
try:
zero_fill_metrics = zero_fill_metrics_list(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: zero_fill_metrics_list failed - %s' % err)
# @added 20220421 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
user_non_derivative_metrics = []
try:
user_non_derivative_metrics = non_derivative_metrics_list(skyline_app)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: non_derivative_metrics_list failed - %s' % err)
all_non_derivative_metrics = list(set(user_non_derivative_metrics + non_derivative_monotonic_metrics))
# @added 20220420 - Feature #4530: namespace.analysed_events
analysed_metrics = []
# @added 20220504 - Feature #2580: illuminance
illuminance_dict = {}
for item in metrics:
metric_name = item[0]
last_analyzed_timestamp = item[1]
if batch_mode:
metrics_processed += 1
logger.info('processing metric %s of %s' % (
str(metrics_processed), str(number_of_metrics)))
# Identify last timestamp
metric_timestamp = None
# Identify anomalies
# Handle EXPIRATION_TIME
# Ship to Analyzer, Mirage or Ionosphere
# @added 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# If multiple work items exist and the timestamp in the work item is
# older than the last analyzed timestamp reported by Redis key, just
# skip and remove the work item
if metric_name.startswith(settings.FULL_NAMESPACE):
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric_name
# Check the last_timestamp metric Redis key
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
redis_key_set = None
last_redis_timestamp = 0
try:
last_redis_timestamp_data = self.redis_conn_decoded.get(last_metric_timestamp_key)
last_redis_timestamp = int(last_redis_timestamp_data)
except:
logger.error('error :: failed to get Redis key %s' % last_metric_timestamp_key)
get_raw_series = True
if last_redis_timestamp:
if last_redis_timestamp > last_analyzed_timestamp:
get_raw_series = False
logger.info('The %s is %s, the passed last_analyzed_timestamp is %s, not getting raw_series returning' % (
last_metric_timestamp_key, str(last_redis_timestamp),
str(last_analyzed_timestamp)))
if LOCAL_DEBUG:
logger.debug('debug :: getting Redis time series data for %s, last_analyzed_timestamp: %s' % (base_name, str(last_analyzed_timestamp)))
raw_series = None
# @modified 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Only resurface the timeseries if the work item timestamp is greater
# than the last analyzed timestamp reported by Redis key
if get_raw_series:
try:
raw_series = self.redis_conn.get(metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get %s from Redis' % metric_name)
raw_series = None
if not raw_series:
logger.info('No raw_series defined, returning')
# Remove for work list
redis_set = 'analyzer.batch'
data = [metric_name, int(last_analyzed_timestamp)]
try:
self.redis_conn.srem(redis_set, str(data))
logger.info('analyzer_batch :: removed batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_batch :: failed to remove batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
if batch_mode:
continue
return
try:
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
except:
timeseries = []
# @added 20200506 - Feature #3532: Sort all time series
# To ensure that there are no unordered timestamps in the time
# series which are artefacts of the collector or carbon-relay, sort
# all time series by timestamp before analysis.
original_timeseries = timeseries
if original_timeseries:
timeseries = sort_timeseries(original_timeseries)
del original_timeseries
try:
del raw_series
except:
pass
try:
first_timeseries_timestamp = timeseries[0][0]
last_timeseries_timestamp = [-1][0]
except:
first_timeseries_timestamp = None
last_timeseries_timestamp = None
if LOCAL_DEBUG:
logger.debug('debug :: got Redis time series data for %s, from: %s, until: %s' % (
base_name, str(first_timeseries_timestamp),
str(last_timeseries_timestamp)))
# @added 20200727 - Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# euthanize keys if not done in roomba, allows for backfill processing
# via analyzer_batch
roombaed = False
if ROOMBA_DO_NOT_PROCESS_BATCH_METRICS:
if LOCAL_DEBUG:
logger.debug('debug :: checking if roomba needs to be run on %s' % (base_name))
now = int(time())
duration = settings.FULL_DURATION + settings.ROOMBA_GRACE_TIME
key = metric_name
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# Enable pruning at custom_duration
custom_duration = False
# @added 20200817 - Feature #3684: ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS
# Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# Allow for custom durations on namespaces, this is for testing to
# allow the Redis key to have data at a different resolution than
# FULL_DURATION, which allows for feeding a metric at 1 data point
# per 10 mins (ala fake Mirage)
try:
if ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS:
for metric_namespace, custom_full_duration in ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS:
if metric_namespace in base_name:
duration = custom_full_duration + settings.ROOMBA_GRACE_TIME
logger.info('batch_processing :: %s found in ROOMBA_BATCH_METRICS_CUSTOM_DURATIONS, duration for roomba set to %s' % (
base_name, str(duration)))
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
custom_duration = True
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_batch :: failed to remove batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
namespace_unique_metrics = '%sunique_metrics' % str(settings.FULL_NAMESPACE)
euthanized = 0
trimmed_keys = 0
active_keys = 0
try:
# Put pipe back in multi mode
pipe = self.redis_conn.pipeline()
# WATCH the key
pipe.watch(key)
pipe.multi()
# There's one value. Purge if it's too old
last_timestamp = int(timeseries[-1][0])
# Do not purge if it has not been analyzed
if (last_timestamp - duration) > last_analyzed_timestamp:
logger.info('batch_processing :: last_timestamp is %s, but for roomba setting to the last_analyzed_timestamp (%s) as it has not been analyzed' % (
str(last_timestamp), str(last_analyzed_timestamp)))
last_timestamp = last_analyzed_timestamp
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# I am not 100% certain how this will affect historical
# batch processing
drop_old_data = False
if custom_duration and last_timestamp < (int(timeseries[-1][0]) - duration):
last_timestamp = int(time()) - duration
logger.info('batch_processing :: custom_duration set last_timestamp to %s' % (
str(last_timestamp)))
drop_old_data = True
now = int(last_analyzed_timestamp)
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
if drop_old_data:
now = last_timestamp
logger.info('batch_processing :: doing roomba on %s with %s data points' % (key, str(len(timeseries))))
roombaed = True
# Delete the key if it is not a list of tuples and the
# last timestamp is old, e.g. bad data
try:
if python_version == 2:
if not isinstance(timeseries[0], TupleType):
if timeseries[0] < last_timestamp - duration:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
timeseries = []
if python_version == 3:
if not isinstance(timeseries[0], tuple):
if timeseries[0] < now - duration:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
timeseries = []
except IndexError:
timeseries = []
# Check if the last value is too old and purge
if timeseries[-1][0] < now - duration:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
timeseries = []
# Remove old datapoints and duplicates from timeseries
temp = set()
temp_add = temp.add
delta = now - duration
trimmed = [
tuple for tuple in timeseries
if tuple[0] > delta and
tuple[0] not in temp and not
temp_add(tuple[0])
]
# Purge if everything was deleted, set key otherwise
if len(trimmed) > 0:
# Serialize and turn key back into not-an-array
btrimmed = packb(trimmed)
if len(trimmed) <= 15:
value = btrimmed[1:]
elif len(trimmed) <= 65535:
value = btrimmed[3:]
trimmed_keys += 1
else:
value = btrimmed[5:]
trimmed_keys += 1
pipe.set(key, value)
active_keys += 1
else:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
euthanized += 1
pipe.execute()
except WatchError:
logger.info('batch_processing :: blocked from euthanizing %s' % (key))
except Exception as e:
# If something bad happens, zap the key and hope it goes away
# pipe.delete(key)
# pipe.srem(namespace_unique_metrics, key)
# pipe.execute()
# euthanized += 1
logger.info(e)
logger.info('batch_processing :: something bad happened but not euthanizing %s' % (key))
finally:
pipe.reset()
raw_series = None
try:
raw_series = self.redis_conn.get(metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get %s from Redis' % metric_name)
raw_series = None
if not raw_series:
logger.info('No raw_series defined after euthanizing %s, returning' % (key))
# Remove for work list
redis_set = 'analyzer.batch'
data = [metric_name, int(last_analyzed_timestamp)]
try:
self.redis_conn.srem(redis_set, str(data))
logger.info('analyzer_batch :: removed batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_batch :: failed to remove batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
if batch_mode:
continue
return
try:
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = list(unpacker)
if roombaed:
logger.info('batch_processing :: after roomba %s has %s data points' % (key, str(len(timeseries))))
except:
timeseries = []
# @added 20200506 - Feature #3532: Sort all time series
# To ensure that there are no unordered timestamps in the time
# series which are artefacts of the collector or carbon-relay, sort
# all time series by timestamp before analysis.
original_timeseries = timeseries
if original_timeseries:
timeseries = sort_timeseries(original_timeseries)
del original_timeseries
try:
del raw_series
except:
pass
try:
first_timeseries_timestamp_after_roomba = timeseries[0][0]
last_timeseries_timestamp_after_roomba = timeseries[-1][0]
except:
first_timeseries_timestamp_after_roomba = None
last_timeseries_timestamp_after_roomba = None
if LOCAL_DEBUG:
logger.debug('debug :: got Redis time series data for %s after roomba, from: %s, until: %s' % (
base_name, str(first_timeseries_timestamp_after_roomba),
str(last_timeseries_timestamp_after_roomba)))
# @added 20211124 - Task #4322: Handle historical batch metrics full duration
# first_last_duration_timestamp = timeseries[-1][0] - settings.FULL_DURATION
first_last_duration_timestamp = last_analyzed_timestamp - settings.FULL_DURATION
timeseries_length = len(timeseries)
full_duration_timeseries = []
try:
full_duration_timeseries = [item for item in timeseries if item[0] >= first_last_duration_timestamp]
if full_duration_timeseries:
full_duration_timeseries_length = len(full_duration_timeseries)
timeseries = list(full_duration_timeseries)
del full_duration_timeseries
if timeseries_length != full_duration_timeseries_length:
logger.info('timeseries has been pruned from %s datapoints to its FULL_DURATION %s datapoints' % (
str(timeseries_length), str(full_duration_timeseries_length)))
except:
timeseries = []
timestamps_to_analyse = []
# Reverse the time series so that only the first (last) items now to be
# iterated and break after the necessary iterations so the entire
# time series is not iterated over.
reversed_timeseries = list(reversed(timeseries))
for timestamp, value in reversed_timeseries:
if int(timestamp) > last_analyzed_timestamp:
timestamps_to_analyse.append(int(timestamp))
else:
break
del reversed_timeseries
timestamps_to_analyse = list(reversed(timestamps_to_analyse))
# @added 20200413 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Handle there being no timestamps_to_analyse and report such as
# otherwise the only info logged is that the work key just gets removed
# 2020-04-14 12:57:25 :: 3222 :: there are 1 metrics to process in the analyzer.batch Redis set
# 2020-04-14 12:57:25 :: 3222 :: processing - ['vista.demo_robustperception_io.prometheus.node_disk_read_time_seconds_total', 1586868000]
# 2020-04-14 12:57:25 :: 3222 :: starting 1 of 1 spin_batch_process
# 2020-04-14 12:57:25 :: 7852 :: batch :: child_batch_process_pid - 7852, processing vista.demo_robustperception_io.prometheus.node_disk_read_time_seconds_total from 1586868000
# 2020-04-14 12:57:25 :: 7852 :: analyzer_batch :: removed work item - ['vista.demo_robustperception_io.prometheus.node_disk_read_time_seconds_total', 1586868000] - from Redis set - analyzer.batch
# 2020-04-14 12:57:25 :: 7852 :: spin_batch_process took 0.04 seconds
# 2020-04-14 12:57:25 :: 3222 :: 1 spin_batch_process completed in 0.10 seconds
# 2020-04-14 12:57:25 :: 3222 :: exceptions - Stale: 9, Boring: 6, TooShort: 0, Other: 0
# 2020-04-14 12:57:25 :: 3222 :: anomaly_breakdown - histogram_bins: 0, first_hour_average: 0, stddev_from_average: 0, grubbs: 0, ks_test: 0, mean_subtraction_cumulation: 0, median_absolute_deviation: 0, stddev_from_moving_average: 0, least_squares: 0
number_of_timestamps_to_analyze = len(timestamps_to_analyse)
if number_of_timestamps_to_analyze == 0:
logger.info('no timestamps were found to analyze for %s from %s, nothing to do' % (
metric_name, str(last_analyzed_timestamp)))
# @added 20200424 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Feature #3504: Handle airgaps in batch metrics
# If there are no data points to analyze remove from the set
redis_set = 'analyzer.batch'
data = [metric_name, int(last_analyzed_timestamp)]
try:
self.redis_conn.srem(redis_set, str(data))
logger.info('analyzer_batch :: removed batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_batch :: failed to remove batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
# Clean up and return
try:
del timeseries
except:
pass
try:
del timestamps_to_analyse
except:
pass
try:
del batch_timeseries
except:
pass
if batch_mode:
continue
try:
del mirage_unique_metrics
except:
pass
try:
del ionosphere_unique_metrics
except:
pass
try:
del derivative_metrics
except:
pass
try:
del non_derivative_metrics
except:
pass
try:
del non_derivative_monotonic_metrics
except:
pass
try:
del non_smtp_alerter_metrics
except:
pass
return
last_redis_data_timestamp = timestamps_to_analyse[-1]
logger.info('%s timestamps were found to analyze for %s from %s to %s' % (
str(number_of_timestamps_to_analyze), metric_name,
str(last_analyzed_timestamp), str(last_redis_data_timestamp)))
if LOCAL_DEBUG:
logger.debug('debug :: %s - timestamps_to_analyse: %s' % (
base_name, str(timestamps_to_analyse)))
# @modified 20200728 - Bug #3652: Handle multiple metrics in base_name conversion
# base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
if metric_name.startswith(settings.FULL_NAMESPACE):
base_name = metric_name.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric_name
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere_untrainable_metrics Redis set
run_negatives_present = False
if settings.IONOSPHERE_ENABLED:
run_negatives_present = True
try:
known_negative_metric, known_negative_metric_matched_by = matched_or_regexed_in_list(skyline_app, base_name, KNOWN_NEGATIVE_METRICS)
del known_negative_metric_matched_by
if known_negative_metric:
run_negatives_present = False
except:
run_negatives_present = True
# @added 20170602 - Feature #2034: analyse_derivatives
# In order to convert monotonic, incrementing metrics to a deriative
# metric
known_derivative_metric = False
unknown_deriv_status = True
# @modified 20200601 - Feature #3480: batch_processing
# Bug #2050: analyse_derivatives - change in monotonicity
# Switch the order in which they are checked and do not check if
# not manage_derivative_metrics as will only be set to True anyway
# if metric_name in non_derivative_metrics:
# unknown_deriv_status = False
# if unknown_deriv_status:
# if metric_name in derivative_metrics:
# known_derivative_metric = True
# unknown_deriv_status = False
if metric_name in derivative_metrics:
known_derivative_metric = True
unknown_deriv_status = False
if unknown_deriv_status:
if metric_name in non_derivative_metrics:
unknown_deriv_status = False
# @added 20220421 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
if metric_name in all_non_derivative_metrics:
unknown_deriv_status = False
if base_name in all_non_derivative_metrics:
unknown_deriv_status = False
# First check if it has its own Redis z.derivative_metric key
# that has not expired
derivative_metric_key = 'z.derivative_metric.%s' % str(base_name)
# @added 20200601 - Feature #3480: batch_processing
# Bug #2050: analyse_derivatives - change in monotonicity
# When a monotonic metric changes in the last run before a
# manage_derivative_metrics run, when manage_derivative_metrics runs
# it classifies it and adds it to non_derivative_metrics the only
# way to stop this is check the key for each metric
last_derivative_metric_key = None
try:
last_derivative_metric_key = self.redis_conn_decoded.get(derivative_metric_key)
except Exception as e:
logger.error('error :: could not query Redis for last_derivative_metric_key: %s' % e)
if last_derivative_metric_key:
known_derivative_metric = True
# @added 20220421 - Feature #3866: MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
# Task #3868: POC MIRAGE_ENABLE_HIGH_RESOLUTION_ANALYSIS
identified_non_derivative_metric = False
if base_name in all_non_derivative_metrics:
identified_non_derivative_metric = True
if metric_name in all_non_derivative_metrics:
identified_non_derivative_metric = True
if metric_name in longterm_non_derivative_metrics:
identified_non_derivative_metric = True
if identified_non_derivative_metric:
try:
del derivative_metrics[metric_name]
except:
pass
unknown_deriv_status = False
is_strictly_increasing_monotonically = False
known_derivative_metric = False
if unknown_deriv_status:
# @added 20170617 - Bug #2050: analyse_derivatives - change in monotonicity
# @modified 20200601 - Feature #3480: batch_processing
# Bug #2050: analyse_derivatives - change in monotonicity
# Always check moved to above
# last_derivative_metric_key = False
# try:
# last_derivative_metric_key = self.redis_conn.get(derivative_metric_key)
# except Exception as e:
# logger.error('error :: could not query Redis for last_derivative_metric_key: %s' % e)
# @modified 20200601 - Feature #3480: batch_processing
# Bug #2050: analyse_derivatives - change in monotonicity
# Apply skip_derivative
skip_derivative = in_list(base_name, non_derivative_monotonic_metrics)
is_strictly_increasing_monotonically = False
if not skip_derivative:
# @added 20220323 - Feature #4502: settings - MONOTONIC_METRIC_NAMESPACES
if metric_name in always_derivative_metrics:
is_strictly_increasing_monotonically = True
if not is_strictly_increasing_monotonically:
is_strictly_increasing_monotonically = strictly_increasing_monotonicity(timeseries)
if is_strictly_increasing_monotonically:
try:
last_expire_set = int(time())
self.redis_conn.setex(
derivative_metric_key, settings.FULL_DURATION, last_expire_set)
except Exception as e:
logger.error('error :: could not set Redis derivative_metric key: %s' % e)
else:
is_strictly_increasing_monotonically = False
# Determine if it is a strictly increasing monotonically metric
# or has been in last FULL_DURATION via its z.derivative_metric
# key
if last_derivative_metric_key:
# Until the z.derivative_metric key expires, it is classed
# as such
is_strictly_increasing_monotonically = True
if skip_derivative:
is_strictly_increasing_monotonically = False
if is_strictly_increasing_monotonically:
known_derivative_metric = True
try:
self.redis_conn.sadd('derivative_metrics', metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add metric to Redis derivative_metrics set')
try:
self.redis_conn.sadd('new_derivative_metrics', metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add metric to Redis new_derivative_metrics set')
try:
last_expire_set = int(time())
self.redis_conn.setex(
derivative_metric_key, settings.FULL_DURATION, last_expire_set)
except Exception as e:
logger.error('error :: could not set Redis derivative_metric key: %s' % e)
# @added 20210325 - Feature #3480: batch_processing
# Bug #2050: analyse_derivatives - change in monotonicity
# Remove from non_derivative_metrics as per analyzer
try:
self.redis_conn.srem('non_derivative_metrics', metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add metric to Redis non_derivative_metrics set')
else:
try:
self.redis_conn.sadd('non_derivative_metrics', metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add metric to Redis non_derivative_metrics set')
try:
self.redis_conn.sadd('new_non_derivative_metrics', metric_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add metric to Redis new_non_derivative_metrics set')
not_anomalous_count = 0
# @added 20200815 - Feature #3678: SNAB - anomalyScore
record_anomalyScore = False
if SNAB_anomalyScore:
SNAB_metrics = []
try:
SNAB_all_metrics = SNAB_anomalyScore['all']
if SNAB_all_metrics:
for SNAB_metric in SNAB_all_metrics:
SNAB_metrics.append(SNAB_metric)
except:
SNAB_all_metrics = []
try:
SNAB_app_metrics = SNAB_anomalyScore[skyline_app]
if SNAB_app_metrics:
for SNAB_metric in SNAB_app_metrics:
SNAB_metrics.append(SNAB_metric)
except:
SNAB_app_metrics = []
if SNAB_metrics:
for SNAB_metric_namespace in list(set(SNAB_metrics)):
if SNAB_metric_namespace in base_name:
record_anomalyScore = True
break
test_anomaly = False
test_anomaly_at = None
try:
test_anomaly_key = 'analyzer_batch.test.%s' % base_name
try:
test_anomaly = self.redis_conn.get(test_anomaly_key)
test_anomaly_at = int(test_anomaly)
logger.info('test_anomaly - testing anomly on %s at %s' % (metric_name, str(test_anomaly_at)))
except:
test_anomaly = None
except:
test_anomaly = False
if LOCAL_DEBUG:
logger.debug('debug :: %s - timeseries sample (last 4): %s' % (
base_name, str(timeseries[-4:])))
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
timestamps_processed = 0
# Distill timeseries strings into lists
for i, batch_timestamp in enumerate(timestamps_to_analyse):
self.check_if_parent_is_alive()
batch_timeseries = []
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# Reset anomalous for every iteration, print iterations and
# break if approaching spin_batch_process limit
anomalous = False
timestamps_processed += 1
if not timestamps_processed % 500:
logger.info('%s of %s timestamps processed for %s' % (
str(timestamps_processed),
str(len(timestamps_to_analyse)), base_name))
if int(time()) > (int(spin_start) + 290):
logger.info('%s of %s timestamps processed, approaching time limit, breaking' % (
str(timestamps_processed),
str(len(timestamps_to_analyse))))
break
for timestamp, value in timeseries:
if int(timestamp) <= batch_timestamp:
batch_timeseries.append([timestamp, value])
if known_derivative_metric:
try:
derivative_timeseries = nonNegativeDerivative(batch_timeseries)
batch_timeseries = derivative_timeseries
except Exception as err:
logger.error('error :: nonNegativeDerivative failed - %s' % err)
batch_timeseries = []
# @added 20220407 - Feature #4518: settings - LAST_KNOWN_VALUE_NAMESPACES
# Feature #4520: settings - ZERO_FILL_NAMESPACES
if base_name in zero_fill_metrics:
if not known_derivative_metric:
try:
timeseries = full_duration_timeseries_fill(self, skyline_app, base_name, timeseries, 'zero')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: full_duration_timeseries_fill failed - %s' % err)
else:
# Fix the badly defined metric if a metric has been defined
# as a zero_fill metric but is a derivative_metric apply
# last_known_value AFTER nonNegativeDerivative because
# zero filling derivative metrics does not have the desired
# effect
last_known_value_metrics.append(base_name)
if base_name in last_known_value_metrics:
try:
timeseries = full_duration_timeseries_fill(self, skyline_app, base_name, timeseries, 'last_known_value')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: full_duration_timeseries_fill failed - %s' % err)
if LOCAL_DEBUG:
logger.debug('debug :: %s - batch_timestamp: %s, batch_timeseries sample (last 2): %s' % (
base_name, str(batch_timestamp), str(batch_timeseries[-2:])))
try:
# Allow for testing. If you want to test a metric and then stop
# the metric sending data to carbon-relay (use a vista metric).
# Determine a timestamp that will fall into the stopped period
# Add the timestamp to a Redis key called
# analyzer_batch.test.<metric_name>
# Start the metric sending data again (re-enable in vista)
# vista/flux will fill the missing data, when analyzer pushes
# the metric to analyzer_batch to process, if analyzer_batch
# is set to test_anomaly True and finds the key, if the
# timestamp matches the timestamp in the key, analyzer_batch
# will multiply the timestamp data point by 15, this should
# trigger an anomaly. Ensure you use a metric which will
# trigger, a load related metric is usually adequate.
# test_anomaly = False
test_anomaly_at = None
test_anomaly_batch_timeseries = []
if test_anomaly:
test_anomaly_at = None
test_anomaly_key = 'analyzer_batch.test.%s' % base_name
try:
test_anomaly_at = self.redis_conn.get(test_anomaly_key)
except:
test_anomaly_at = None
if test_anomaly_at:
if int(test_anomaly_at) == int(batch_timeseries[-1][0]):
for timestamp, value in batch_timeseries:
if int(timestamp) == int(test_anomaly_at):
anomaly_value = value * 100
logger.info('test_anomaly - replacing value %s with anomaly_value of %s at %s in %s timeseries' % (
str(value), str(anomaly_value),
str(test_anomaly_at), metric_name))
value = anomaly_value
test_anomaly_batch_timeseries.append([timestamp, value])
if test_anomaly_batch_timeseries:
batch_timeseries = test_anomaly_batch_timeseries
logger.info('test_anomaly - replaced %s timeseries with anomaly value in it' % (
metric_name))
try:
self.redis_conn.delete(test_anomaly_key)
logger.info('test_anomaly - deleted test_anomaly Redis key - %s' % str(test_anomaly_key))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to delete test_anomaly Redis key - %s' % str(test_anomaly_key))
# @modified 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Changed to algoritms_batch so there is no pollution and
# analyzer and analyzer_batch are totally independent
# metric_airgaps = []
# anomalous, ensemble, datapoint = run_selected_algorithm(batch_timeseries, metric_name, metric_airgaps)
# @modified 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Added run_negatives_present and added negatives_found
# anomalous, ensemble, datapoint = run_selected_batch_algorithm(batch_timeseries, metric_name)
# @modified 20200607 - Feature #3566: custom_algorithms
# Added algorithms_run
# @modified 20200815 - Feature #3678: SNAB - anomalyScore
# Added the number_of_algorithms to calculate anomalyScore from
anomalous, ensemble, datapoint, negatives_found, algorithms_run, number_of_algorithms = run_selected_batch_algorithm(batch_timeseries, metric_name, run_negatives_present)
# @added 20220420 - Feature #4530: namespace.analysed_events
analysed_metrics.append(base_name)
if LOCAL_DEBUG:
logger.debug('debug :: %s - anomalous: %s, datapoint: %s, timestamp: %s, ensemble: %s, algorithms_run: %s, len(batch_timeseries): %s' % (
base_name, str(anomalous), str(datapoint),
str(batch_timeseries[-1][0]), str(ensemble),
str(algorithms_run), str(len(batch_timeseries))))
if anomalous:
logger.info('anomalous: %s, ensemble: %s' % (
str(anomalous), str(ensemble)))
if test_anomaly_batch_timeseries:
logger.info('test_anomaly - analyzed %s data with anomaly value in it and anomalous = %s' % (
metric_name, str(anomalous)))
# @added 20200815 - Feature #3678: SNAB - anomalyScore
if record_anomalyScore:
anomalyScore_file = '%s/%s/%s/skyline.SNAB.%s.anomalyScore.csv' % (
SNAB_DATA_DIR, skyline_app, base_name, base_name)
# Get the anomaly breakdown - who returned True?
triggered_algorithms = []
run_debug = False
if ensemble.count(True) and algorithms_run:
run_debug = True
if (int(batch_timestamp) % 20000) == 0:
run_debug = True
if run_debug:
logger.debug('debug :: ensemble to calculate anomalyScore - %s' % str(ensemble))
logger.debug('debug :: algorithms_run to calculate anomalyScore - %s' % str(algorithms_run))
for index, value in enumerate(ensemble):
if value:
algorithm = algorithms_run[index]
triggered_algorithms.append(algorithm)
if run_debug:
logger.debug('debug :: triggered_algorithms to calculate anomalyScore - %s' % str(triggered_algorithms))
anomalyScore = 0.0
try:
if len(triggered_algorithms) > 0 and number_of_algorithms > 0:
if len(triggered_algorithms) > settings.CONSENSUS:
anomalyScore = 1.0
else:
anomalyScore = len(triggered_algorithms) / settings.CONSENSUS
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to calculate anomalyScore')
if not os.path.isfile(anomalyScore_file):
data = 'timestamp,value,anomalyScore,triggered_algorithms\n'
write_data_to_file(skyline_app, anomalyScore_file, 'w', data)
data = '%s,%s,%s,%s\n' % (str(int(batch_timestamp)), str(datapoint), str(anomalyScore), str(triggered_algorithms))
write_data_to_file(skyline_app, anomalyScore_file, 'a', data)
if run_debug:
logger.debug('%s,%s,%s,%s' % (str(int(batch_timestamp)), str(datapoint), str(anomalyScore), str(triggered_algorithms)))
# Update the last_timestamp metric Redis key
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
redis_key_set = None
try:
int_metric_timestamp = int(batch_timestamp)
# @modified 20200503 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Set the last_timestamp expiry time to 1 month rather than
# settings.FULL_DURATION
self.redis_conn.setex(
last_metric_timestamp_key, 2592000, int_metric_timestamp)
redis_key_set = True
except:
logger.error('error :: failed to set Redis key %s' % last_metric_timestamp_key)
if anomalous:
if redis_key_set:
logger.info('anomalous :: anomaly detected on %s at %s with %s, set Redis key %s to %s' % (
base_name, str(int_metric_timestamp), str(datapoint),
last_metric_timestamp_key, str(int_metric_timestamp)))
else:
logger.info('anomalous :: anomaly detected on %s at %s with %s' % (
base_name, str(int_metric_timestamp),
str(datapoint)))
# @added 20200425 - Feature #3508: ionosphere.untrainable_metrics
# Determine if any metrcs have negatives values some they can be
# added to the ionosphere.untrainable_metrics Redis set
if run_negatives_present and negatives_found:
redis_set = 'ionosphere.untrainable_metrics'
try:
last_negative_timestamp = int(negatives_found[-1][0])
last_negative_value = negatives_found[-1][1]
remove_after_timestamp = int(last_negative_timestamp + settings.FULL_DURATION)
data = str([metric_name, batch_timestamp, datapoint, last_negative_timestamp, last_negative_value, settings.FULL_DURATION, remove_after_timestamp])
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add data to Redis set %s' % (
str(redis_set)))
# Added a Redis key for Mirage, Panorama and Ionosphere to
# query to identify if an anomaly has been added by
# analyzer_batch and set a longish TTL as if multiple
# anomalies for multiple metrics in a batch are sent to
# Ionosphere it could take Ionosphere a while to analyze
# them all. This key circumvents the requirement of each
# app to determine if a metric is a batch metric, as this
# is only created for batch metric anomalies.
analyzer_batch_metric_anomaly_key = '%s.anomaly.%s.%s' % (
skyline_app, str(int_metric_timestamp), base_name)
try:
int_metric_timestamp = int(batch_timestamp)
self.redis_conn.setex(
analyzer_batch_metric_anomaly_key,
86400, int_metric_timestamp)
logger.info('set Redis key %s with %s for other apps to identify this as an analyzer_batch anomaly' % (
analyzer_batch_metric_anomaly_key,
str(int_metric_timestamp)))
except:
logger.error('error :: failed to set Redis key %s' % analyzer_batch_metric_anomaly_key)
else:
if redis_key_set:
not_anomalous_count += 1
if LOCAL_DEBUG:
logger.debug('debug :: not anomalous :: %s at %s with %s' % (
base_name, str(int_metric_timestamp), str(datapoint)))
# @modified 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Only log on the last data point, not on all
if int_metric_timestamp == int(last_redis_data_timestamp):
logger.info('not anomalous :: %s at %s with %s (along with %s other not anomalous data points), set Redis key %s to %s' % (
base_name, str(int_metric_timestamp), str(datapoint),
str(not_anomalous_count),
last_metric_timestamp_key, str(int_metric_timestamp)))
else:
logger.info('not anomalous :: %s at %s with %s' % (
base_name, str(int_metric_timestamp),
str(datapoint)))
# @added 20190408 - Feature #2882: Mirage - periodic_check
# Add for Mirage periodic - is really anomalous add to
# real_anomalous_metrics and if in mirage_periodic_check_metric_list
# add as anomalous
if anomalous:
metric_timestamp = batch_timeseries[-1][0]
metric = [datapoint, base_name, metric_timestamp]
# Get the anomaly breakdown - who returned True?
triggered_algorithms = []
for index, value in enumerate(ensemble):
if value:
# @modified 20200607 - Feature #3566: custom_algorithms
# algorithm = settings.ALGORITHMS[index]
algorithm = algorithms_run[index]
anomaly_breakdown[algorithm] += 1
triggered_algorithms.append(algorithm)
# @added 20220504 - Feature #2580: illuminance
try:
illuminance_dict[base_name] = {
'timestamp': int(metric_timestamp),
'value': float(datapoint),
'triggered_algorithms_count': len(triggered_algorithms)}
except Exception as err:
logger.error('error :: failed to add %s to illuminance_dict' % (
str(base_name)))
# @added 20170206 - Bug #1904: Handle non filesystem friendly metric names in check files
sane_metricname = filesafe_metricname(str(base_name))
# If Panorama is enabled determine details
determine_anomaly_details = False
if settings.PANORAMA_ENABLED:
determine_anomaly_details = True
# If Ionosphere is enabled determine details
try:
ionosphere_enabled = settings.IONOSPHERE_ENABLED
if settings.IONOSPHERE_ENABLED:
determine_anomaly_details = True
except:
ionosphere_enabled = False
if determine_anomaly_details:
metric_timestamp = str(int(batch_timeseries[-1][0]))
from_timestamp = str(int(batch_timeseries[1][0]))
timeseries_dir = base_name.replace('.', '/')
send_back_to_analyzer = None
# @added 20161119 - Branch #922: ionosphere
# Task #1718: review.tsfresh
# Set defaults which can be used later to determine how
# Analyzer should handle/route anomalies
analyzer_metric = True
mirage_metric = False
ionosphere_metric = False
send_to_ionosphere = False
if metric_name in ionosphere_unique_metrics:
ionosphere_metric = True
send_to_ionosphere = True
if metric_name in mirage_unique_metrics:
analyzer_metric = False
ionosphere_metric = False
mirage_metric = True
send_to_ionosphere = False
# @added 20170108 - Feature #1830: Ionosphere alerts
# Only send smtp_alerter_metrics to Ionosphere
smtp_alert_enabled_metric = True
if base_name in non_smtp_alerter_metrics:
smtp_alert_enabled_metric = False
if ionosphere_enabled:
if analyzer_metric:
# We do not want send all anomalous metrics to
# Ionosphere if they are not being alerted on as
# they will be pointless they will have no alert if
# it is within the EXPIRATION_TIME and there will be
# no reference graphs from an alert for the user to
# action.
cache_key = 'last_alert.smtp.%s' % (base_name)
last_alert = False
try:
last_alert = self.redis_conn.get(cache_key)
except Exception as e:
logger.error('error :: could not query Redis for cache_key: %s' % e)
if not last_alert:
send_to_ionosphere = True
else:
send_to_ionosphere = False
if ionosphere_metric:
logger.info('not sending to Ionosphere - alert key exists - %s' % (base_name))
else:
if mirage_metric:
logger.info('not sending to Ionosphere - Mirage metric - %s' % (base_name))
send_to_ionosphere = False
# analyzer_batch sends Analyzer and Mirage
# metrics back to analyzer
send_back_to_analyzer = True
# @added 20170306 - Feature #1960: ionosphere_layers
# Ionosphere layers require the timeseries at
# FULL_DURATION so if this is a Mirage and
# Ionosphere metric, Analyzer needs to provide
# the timeseries file for later (within 60
# seconds) analysis, however we want the data
# that triggered the anomaly, as before this was
# only created by Mirage if an alert was
# triggered, but Ionosphere layers now require
# this file before an alert is triggered
timeseries_dir = base_name.replace('.', '/')
training_dir = '%s/%s/%s' % (
settings.IONOSPHERE_DATA_FOLDER, str(metric_timestamp),
str(timeseries_dir))
if not os.path.exists(training_dir):
mkdir_p(training_dir)
full_duration_in_hours = int(settings.FULL_DURATION) / 3600
ionosphere_json_file = '%s/%s.mirage.redis.%sh.json' % (
training_dir, base_name,
str(int(full_duration_in_hours)))
if not os.path.isfile(ionosphere_json_file):
timeseries_json = str(batch_timeseries).replace('[', '(').replace(']', ')')
try:
write_data_to_file(skyline_app, ionosphere_json_file, 'w', timeseries_json)
logger.info('%s added Ionosphere Mirage %sh Redis data timeseries json file :: %s' % (
skyline_app, str(int(full_duration_in_hours)), ionosphere_json_file))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s Ionosphere Mirage Redis data timeseries json file - %s' % (skyline_app, ionosphere_json_file))
# @modified 20170108 - Feature #1830: Ionosphere alerts
# Only send smtp_alerter_metrics to Ionosphere
# if send_to_ionosphere:
if send_to_ionosphere and smtp_alert_enabled_metric:
if metric_name in ionosphere_unique_metrics:
logger.info('sending an ionosphere metric to Ionosphere - %s' % (base_name))
else:
logger.info('sending an analyzer metric to Ionosphere for training - %s' % (base_name))
try:
# @modified 20161228 Feature #1828: ionosphere - mirage Redis data features
# Added full_duration
# @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity
# Added ionosphere_parent_id, always zero from Analyzer and Mirage
ionosphere_parent_id = 0
send_anomalous_metric_to(
skyline_app, 'ionosphere', timeseries_dir,
metric_timestamp, base_name, str(datapoint),
from_timestamp, triggered_algorithms,
batch_timeseries, str(settings.FULL_DURATION),
str(ionosphere_parent_id))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved to Redis key block below
# self.sent_to_ionosphere.append(base_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to send_anomalous_metric_to to ionosphere')
# @added 20190522 - Task #3034: Reduce multiprocessing Manager list usage
redis_set = 'analyzer.batch.sent_to_ionosphere'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
# @added 20170403 - Feature #1994: Ionosphere training_dir keys
# Feature #2000: Ionosphere - validated
# Feature #1996: Ionosphere - matches page
# The addition of this key data could be done in
# skyline_function.py, however that would introduce
# Redis requirements in the send_anomalous_metric_to
# function, which is not desirable I think. So this is
# a non-KISS pattern that is replicated in mirage.py as
# well.
# Each training_dir and data set is now Redis keyed to increase efficiency
# in terms of disk I/O for ionosphere.py and making keyed data
# available for each training_dir data set so that transient matched data
# can be surfaced for the webapp along with directory paths, etc
ionosphere_training_data_key = 'ionosphere.training_data.%s.%s' % (str(metric_timestamp), base_name)
ionosphere_training_data_key_data = [
['metric_timestamp', int(metric_timestamp)],
['base_name', str(base_name)],
['timeseries_dir', str(timeseries_dir)],
['added_by', str(skyline_app)]
]
try:
self.redis_conn.setex(
ionosphere_training_data_key,
settings.IONOSPHERE_KEEP_TRAINING_TIMESERIES_FOR,
# @modified 20190413 - Task #2824: Test redis-py upgrade
# Task #2926: Update dependencies
# redis-py 3.x only accepts user data as bytes, strings or
# numbers (ints, longs and floats). All 2.X users should
# make sure that the keys and values they pass into redis-py
# are either bytes, strings or numbers. Use str
str(ionosphere_training_data_key_data))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to set Redis key %s' % ionosphere_training_data_key)
try:
del ionosphere_training_data_key_data
except:
pass
if ionosphere_metric:
analyzer_metric = False
# Only send Analyzer metrics
if analyzer_metric and settings.PANORAMA_ENABLED:
if not os.path.exists(settings.PANORAMA_CHECK_PATH):
mkdir_p(settings.PANORAMA_CHECK_PATH)
# Note:
# The values are enclosed is single quoted intentionally
# as the imp.load_source used results in a shift in the
# decimal position when double quoted, e.g.
# value = "5622.0" gets imported as
# 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2
# single quoting results in the desired,
# 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0
added_at = str(int(time()))
source = 'graphite'
panorama_anomaly_data = 'metric = \'%s\'\n' \
'value = \'%s\'\n' \
'from_timestamp = \'%s\'\n' \
'metric_timestamp = \'%s\'\n' \
'algorithms = %s\n' \
'triggered_algorithms = %s\n' \
'app = \'%s\'\n' \
'source = \'%s\'\n' \
'added_by = \'%s\'\n' \
'added_at = \'%s\'\n' \
% (base_name, str(datapoint), from_timestamp,
# @modified 20200603 - Feature #3566: custom_algorithms
# metric_timestamp, str(settings.ALGORITHMS),
metric_timestamp, str(algorithms_run),
triggered_algorithms, skyline_app, source,
this_host, added_at)
# Create an anomaly file with details about the anomaly
panorama_anomaly_file = '%s/%s.%s.txt' % (
settings.PANORAMA_CHECK_PATH, added_at,
sane_metricname)
try:
write_data_to_file(
skyline_app, panorama_anomaly_file, 'w',
panorama_anomaly_data)
logger.info('added panorama anomaly file :: %s' % (panorama_anomaly_file))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Moved to Redis set block below
# self.sent_to_panorama.append(base_name)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add panorama anomaly file :: %s' % (panorama_anomaly_file))
try:
del panorama_anomaly_data
except:
pass
redis_set = 'analyzer_batch.sent_to_panorama'
data = str(base_name)
try:
self.redis_conn.sadd(redis_set, data)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add %s to Redis set %s' % (
str(data), str(redis_set)))
else:
# @modified 20160207 - Branch #922: Ionosphere
# Handle if all other apps are not enabled
other_app = 'none'
if mirage_metric:
other_app = 'Mirage'
if ionosphere_metric:
other_app = 'Ionosphere'
logger.info('not adding panorama anomaly file for %s - %s' % (other_app, metric))
# Send back to Analyzer to alert
if analyzer_metric:
send_back_to_analyzer = True
if send_back_to_analyzer:
cache_key = '%s.alert.%s.%s' % (skyline_app, metric_timestamp, base_name)
# @modified 20201008 - Feature #3772: Add the anomaly_id to the http_alerter json
# Branch #3068: SNAB
# Added algorithms_run
cache_key_value = [float(datapoint), base_name, int(metric_timestamp), triggered_algorithms, algorithms_run]
try:
self.redis_conn.setex(
cache_key, 300,
str(cache_key_value))
logger.info(
'add Redis alert key - %s - %s' %
(cache_key, str(cache_key_value)))
except Exception as e:
logger.error(traceback.format_exc())
logger.error(
'error :: failed to add Redis key - %s - [%s, \'%s\', %s, %s, %s] - %s' %
(cache_key, str(datapoint), base_name,
str(int(metric_timestamp)),
str(triggered_algorithms),
str(algorithms_run), e))
# It could have been deleted by the Roomba
except TypeError:
# @added 20200430 - Feature #3480: batch_processing
# Added logging here as the DeletedByRoomba exception is
# generally not related to that but related to some other fail
# in the processing of the run algorithms phase
logger.error(traceback.format_exc())
logger.error('error :: added as DeletedByRoomba but possibly not see traceback above')
exceptions['DeletedByRoomba'] += 1
# @added 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Handle analyzer_batch work being added over and over every
# minute by also updating the last_timestamp key if stale,
# boring, etc
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
try:
int_metric_timestamp = int(time())
self.redis_conn.setex(
last_metric_timestamp_key, 2592000, int_metric_timestamp)
logger.info('set Redis key %s to %s, even though it has been deleted by Roomba' % (
last_metric_timestamp_key, str(int_metric_timestamp)))
except:
logger.error('error :: failed to set Redis key %s, even though it is has been deleted by Roomba' % last_metric_timestamp_key)
except TooShort:
exceptions['TooShort'] += 1
# @added 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
try:
int_metric_timestamp = int(batch_timeseries[-1][0])
self.redis_conn.setex(
last_metric_timestamp_key, 2592000,
int_metric_timestamp)
# @modified 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Only log on the last data point, not on all
if int_metric_timestamp == int(last_redis_data_timestamp):
logger.info('set Redis key %s to %s, even though it is too short' % (
last_metric_timestamp_key, str(int_metric_timestamp)))
except Exception as err:
logger.error('error :: failed to set Redis key %s, even though it is too short - %s' % (
last_metric_timestamp_key, err))
except Stale:
exceptions['Stale'] += 1
# @added 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
try:
int_metric_timestamp = int(batch_timeseries[-1][0])
self.redis_conn.setex(
last_metric_timestamp_key, 2592000,
int_metric_timestamp)
# @modified 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Only log on the last data point, not on all
if int_metric_timestamp == int(last_redis_data_timestamp):
logger.info('set Redis key %s to %s, even though it is stale' % (
last_metric_timestamp_key, str(int_metric_timestamp)))
except:
logger.error('error :: failed to set Redis key %s, even though it is stale' % last_metric_timestamp_key)
except Boring:
exceptions['Boring'] += 1
# @added 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
try:
int_metric_timestamp = int(batch_timeseries[-1][0])
self.redis_conn.setex(
last_metric_timestamp_key, 2592000,
int_metric_timestamp)
# @modified 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# Only log on the last data point, not on all
if int_metric_timestamp == int(last_redis_data_timestamp):
logger.info('set Redis key %s to %s, even though it is boring' % (
last_metric_timestamp_key, str(int_metric_timestamp)))
except:
logger.error('error :: failed to set Redis key %s, even though it is boring' % last_metric_timestamp_key)
except:
logger.error(traceback.format_exc())
logger.error('error - Other error reported')
exceptions['Other'] += 1
# @added 20200423 - Feature #3504: Handle airgaps in batch metrics
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
last_metric_timestamp_key = 'last_timestamp.%s' % base_name
try:
int_metric_timestamp = int(time())
self.redis_conn.setex(
last_metric_timestamp_key, 2592000,
int_metric_timestamp)
logger.error('error :: set Redis key %s to %s, even though it an other error has been thrown' % (
last_metric_timestamp_key, str(int_metric_timestamp)))
except:
logger.error('error :: failed to set Redis key %s, when other exception was thrown' % last_metric_timestamp_key)
# @added 20220113 - Feature #3566: custom_algorithms
# Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# break if approaching spin_batch_process limit
if int(time()) > (int(spin_start) + 290):
logger.info('approaching time limit, exiting')
break
# Remove for work list
redis_set = 'analyzer.batch'
data = [metric_name, int(last_analyzed_timestamp)]
try:
self.redis_conn.srem(redis_set, str(data))
logger.info('analyzer_batch :: removed batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_batch :: failed to remove batch metric item - %s - from Redis set - %s' % (str(data), redis_set))
try:
del timeseries
except:
pass
try:
del timestamps_to_analyse
except:
pass
try:
del batch_timeseries
except:
pass
if not batch_mode:
try:
del mirage_unique_metrics
except:
pass
try:
del ionosphere_unique_metrics
except:
pass
try:
del derivative_metrics
except:
pass
try:
del non_derivative_metrics
except:
pass
try:
del non_derivative_monotonic_metrics
except:
pass
try:
del non_smtp_alerter_metrics
except:
pass
# Add values to the queue so the parent process can collate
for key, value in anomaly_breakdown.items():
self.batch_anomaly_breakdown_q.put((key, value))
for key, value in exceptions.items():
self.batch_exceptions_q.put((key, value))
# @added 20220420 - Feature #4530: namespace.analysed_events
namespace_analysed = defaultdict(int)
for base_name in analysed_metrics:
parent_namespace = base_name.split('.')[0]
namespace_analysed[parent_namespace] += 1
date_string = str(strftime('%Y-%m-%d', gmtime()))
namespace_analysed_events_hash = 'namespace.analysed_events.%s.%s' % (skyline_app, date_string)
for namespace in list(namespace_analysed.keys()):
try:
self.redis_conn.hincrby(namespace_analysed_events_hash, namespace, namespace_analysed[namespace])
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to increment %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
try:
self.redis_conn.expire(namespace_analysed_events_hash, (86400 * 15))
logger.info('updated %s Redis hash' % namespace_analysed_events_hash)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: failed to set expire %s Redis hash - %s' % (
namespace_analysed_events_hash, err))
# @added 20220504 - Feature #2580: illuminance
if len(illuminance_dict) > 0:
logger.info('calling add_illuminance_entries with %s entries to add' % (
str(len(illuminance_dict))))
current_illuminance_dict = {}
try:
current_illuminance_dict = add_illuminance_entries(self, skyline_app, int(run_timestamp), illuminance_dict)
except Exception as err:
logger.error('error :: add_illuminance_entries failed - %s' % (
err))
logger.info('illuminance Redis hash now has %s entries' % (
str(len(current_illuminance_dict))))
LOCAL_DEBUG = False
spin_end = time() - spin_start
logger.info('spin_batch_process took %.2f seconds' % spin_end)
return
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up and discover the number of `unique metrics`.
- Divide the `unique_metrics` between the number of `ANALYZER_PROCESSES`
and assign each process a set of metrics to analyse for anomalies.
- Wait for the processes to finish.
- Determine whether if any anomalous metrics require:
- Alerting on (and set `EXPIRATION_TIME` key in Redis for alert).
- Feed to another module e.g. mirage.
- Alert to syslog.
- Populate the webapp json with the anomalous_metrics details.
- Log the details about the run to the skyline analyzer log.
- Send skyline.analyzer metrics to `GRAPHITE_HOST`
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
else:
logger.info('bin/%s.d log management done' % skyline_app)
# @added 20190417 - Feature #2950: Report defaulted settings to log
# Added all the globally declared settings to enable reporting in the
# log the state of each setting.
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
logger.info('SERVER_METRIC_PATH is set from settings.py to %s' % str(SERVER_METRIC_PATH))
except:
SERVER_METRIC_PATH = ''
logger.info('warning :: SERVER_METRIC_PATH is not declared in settings.py, defaults to \'\'')
logger.info('skyline_app_graphite_namespace is set to %s' % str(skyline_app_graphite_namespace))
try:
ANALYZER_ENABLED = settings.ANALYZER_ENABLED
logger.info('ANALYZER_ENABLED is set to %s' % str(ANALYZER_ENABLED))
except:
ANALYZER_ENABLED = True
logger.info('warning :: ANALYZER_ENABLED is not declared in settings.py, defaults to True')
if not os.path.exists(settings.SKYLINE_TMP_DIR):
# @modified 20160803 - Adding additional exception handling to Analyzer
try:
mkdir_p(settings.SKYLINE_TMP_DIR)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to create %s' % settings.SKYLINE_TMP_DIR)
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except:
logger.error(traceback.format_exc())
logger.error('error :: Analyzer cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
except:
logger.error(traceback.format_exc())
# logger.error('error :: Analyzer cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
logger.error('error :: Analyzer cannot connect to get_redis_conn')
continue
try:
self.redis_conn_decoded.ping()
except:
logger.error(traceback.format_exc())
logger.error('error :: Analyzer batch cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
try:
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except:
logger.error(traceback.format_exc())
# logger.error('error :: Analyzer cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
logger.error('error :: Analyzer batch cannot connect to get_redis_conn')
continue
# Determine if any metric has been added to process
while True:
# Report app up
try:
self.redis_conn.setex(skyline_app, 120, int(now))
except:
logger.error(traceback.format_exc())
logger.error('error :: Analyzer batch could not update the Redis %s key' % skyline_app)
# Discover metrics to analyze
analyzer_batch_work = None
redis_set = 'analyzer.batch'
try:
analyzer_batch_work = self.redis_conn_decoded.smembers(redis_set)
except Exception as e:
logger.error('error :: could not query Redis for set %s - %s' % (redis_set, e))
if analyzer_batch_work:
analyzer_batch_work_queue_items = len(analyzer_batch_work)
if analyzer_batch_work_queue_items > 0:
logger.info('there are %s metrics to process in the %s Redis set' % (
str(analyzer_batch_work_queue_items), redis_set))
break
else:
logger.info('there are no batch metrics to process')
sleep(1)
metric_name = None
last_analyzed_timestamp = None
for analyzer_batch in analyzer_batch_work:
try:
batch_processing_metric = literal_eval(analyzer_batch)
metric_name = str(batch_processing_metric[0])
last_analyzed_timestamp = int(batch_processing_metric[1])
break
except:
logger.error(traceback.format_exc())
logger.error('error :: could not determine details from analyzer_batch entry')
metric_name = None
last_analyzed_timestamp = None
batch_processing_metric = None
sleep(1)
# @added 20200728 - Feature #3480: batch_processing
# Feature #3486: analyzer_batch
# If multiple work items exist sort them by oldest timestamp and
# process the item with the oldest timestamp first
if analyzer_batch_work:
unsorted_analyzer_batch_work = []
for analyzer_batch in analyzer_batch_work:
try:
batch_processing_metric = literal_eval(analyzer_batch)
metric_name = str(batch_processing_metric[0])
last_analyzed_timestamp = int(batch_processing_metric[1])
unsorted_analyzer_batch_work.append([metric_name, last_analyzed_timestamp])
except:
logger.error(traceback.format_exc())
logger.error('error :: could not determine details from analyzer_batch entry')
sorted_analyzer_batch_work = sorted(unsorted_analyzer_batch_work, key=lambda x: x[1])
logger.info('there are %s work items in the sorted_analyzer_batch_work list' % (str(len(sorted_analyzer_batch_work))))
# @added 20201017 - Feature #3818: ANALYZER_BATCH_PROCESSING_OVERFLOW_ENABLED
# Remove multiple entries for metrics and only add the latest
# timestamp item per metric
original_work_queue_length = len(sorted_analyzer_batch_work)
metrics = list(set([item[0] for item in sorted_analyzer_batch_work]))
logger.info('there are %s unique metrics with work items in the sorted_analyzer_batch_work list' % (str(len(metrics))))
if len(metrics) < original_work_queue_length:
new_analyzer_batch_work = []
for metric in metrics:
work_timestamps = []
for item in sorted_analyzer_batch_work:
if item[0] == metric:
timestamp = item[1]
work_timestamps.append(timestamp)
new_analyzer_batch_work.append([metric, timestamp])
if len(work_timestamps) > 1:
last_work_timestamp = work_timestamps[-1]
for work_timestamp in work_timestamps:
if work_timestamp != last_work_timestamp:
# Remove from work list
redis_set = 'analyzer.batch'
data = [metric, int(work_timestamp)]
try:
self.redis_conn.srem('analyzer.batch', str(data))
logger.info('analyzer_batch :: newer work exists, removed older work item - %s - from Redis set - %s' % (str(data), redis_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: analyzer_batch :: failed to remove older work item - %s - from Redis set - %s' % (str(data), redis_set))
sorted_analyzer_batch_work = sorted(new_analyzer_batch_work, key=lambda x: x[1])
new_work_queue_length = len(sorted_analyzer_batch_work)
if original_work_queue_length != new_work_queue_length:
pruned_item_count = original_work_queue_length - new_work_queue_length
logger.info('the analyzer.batch Redis set was pruned of %s older items which have newer work items' % str(pruned_item_count))
metric_name = str(sorted_analyzer_batch_work[0][0])
last_analyzed_timestamp = int(sorted_analyzer_batch_work[0][1])
batch_processing_metric = [metric_name, last_analyzed_timestamp]
if not metric_name:
break
# @added 20200904 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Task #3730: Validate Mirage running multiple processes
# Remove any existing algorithm.error and timing files from any
# previous runs
pattern = '%s.*.algorithm.error' % skyline_app
try:
for f in os.listdir(settings.SKYLINE_TMP_DIR):
if re.search(pattern, f):
try:
os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f))
logger.info('cleaning up old error file - %s' % (str(f)))
except OSError:
pass
except:
logger.error('error :: failed to cleanup algorithm.error files')
logger.info(traceback.format_exc())
# @modified 20220506 - Feature #3486: analyzer_batch
# pattern = '%s.*.algorithm.timings' % skyline_app
pattern = '%s.*.(timings|count)' % skyline_app
try:
for f in os.listdir(settings.SKYLINE_TMP_DIR):
if re.search(pattern, f):
try:
os.remove(os.path.join(settings.SKYLINE_TMP_DIR, f))
logger.info('cleaning up old timings/count file - %s' % (str(f)))
except OSError:
pass
except:
logger.error('error :: failed to cleanup algorithm.timing files')
logger.info(traceback.format_exc())
logger.info('processing - %s' % str(batch_processing_metric))
# Spawn processes
batch_pids = []
spawned_batch_pids = []
batch_pid_count = 0
run_timestamp = now
for i in range(1, 2):
if BATCH_MODE:
batch_p = Process(target=self.spin_batch_process, args=(i, run_timestamp, 'batch_mode', 0, sorted_analyzer_batch_work[0:300]))
else:
batch_p = Process(target=self.spin_batch_process, args=(i, run_timestamp, metric_name, last_analyzed_timestamp))
batch_pids.append(batch_p)
batch_pid_count += 1
logger.info('starting 1 of %s spin_batch_process' % (str(batch_pid_count)))
batch_p.start()
spawned_batch_pids.append(batch_p.pid)
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_batch_process
# that has run for longer than 300 seconds
p_starts = time()
while time() - p_starts <= 300:
if any(p.is_alive() for p in batch_pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('1 spin_batch_process completed in %.2f seconds' % (time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('timed out, killing all spin_batch_process processes')
for p in batch_pids:
p.terminate()
# p.join()
for p in batch_pids:
if p.is_alive():
logger.info('stopping spin_process - %s' % (str(p.is_alive())))
p.join()
# Grab data from the queue and populate dictionaries
exceptions = {}
anomaly_breakdown = {}
while 1:
try:
key, value = self.batch_anomaly_breakdown_q.get_nowait()
if key not in anomaly_breakdown.keys():
anomaly_breakdown[key] = value
else:
anomaly_breakdown[key] += value
except Empty:
break
while 1:
try:
key, value = self.batch_exceptions_q.get_nowait()
if key not in exceptions.keys():
exceptions[key] = value
else:
exceptions[key] += value
except Empty:
break
# @added 20200904 - Feature #3486: analyzer_batch
# Feature #3480: batch_processing
# Task #3730: Validate Mirage running multiple processes
# Report any algorithm errors
pattern = '%s.*.algorithm.error' % skyline_app
try:
for f in os.listdir(settings.SKYLINE_TMP_DIR):
if re.search(pattern, f):
try:
algorithm_error_file = os.path.join(settings.SKYLINE_TMP_DIR, f)
if os.path.isfile(algorithm_error_file):
logger.error('error :: error reported in %s' % (
algorithm_error_file))
try:
with open(algorithm_error_file, 'r') as f:
error_string = f.read()
logger.error('%s' % str(error_string))
except:
logger.error('error :: failed to read error file - %s' % algorithm_error_file)
try:
os.remove(algorithm_error_file)
except OSError:
pass
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check algorithm errors')
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to check algorithm errors')
# @added 20191021 - Bug #3288: Always send anomaly_breakdown and exception metrics
# Branch #3262: py3
exceptions_metrics = ['Boring', 'Stale', 'TooShort', 'Other']
try:
for i_exception in exceptions_metrics:
if i_exception not in exceptions.keys():
exceptions[i_exception] = 0
# @added 20200607 - Feature #3566: custom_algorithms
anomaly_breakdown_algorithms = list(settings.ALGORITHMS)
if CUSTOM_ALGORITHMS:
for custom_algorithm in settings.CUSTOM_ALGORITHMS:
anomaly_breakdown_algorithms.append(custom_algorithm)
# @modified 20200607 - Feature #3566: custom_algorithms
# for i_anomaly_breakdown in settings.ALGORITHMS:
for i_anomaly_breakdown in anomaly_breakdown_algorithms:
if i_anomaly_breakdown not in anomaly_breakdown.keys():
anomaly_breakdown[i_anomaly_breakdown] = 0
exceptions_string = ''
for i_exception in list(exceptions.keys()):
if exceptions_string == '':
exceptions_string = '%s: %s' % (str(i_exception), str(exceptions[i_exception]))
else:
exceptions_string = '%s, %s: %s' % (exceptions_string, str(i_exception), str(exceptions[i_exception]))
logger.info('exceptions - %s' % str(exceptions_string))
anomaly_breakdown_string = ''
if anomaly_breakdown:
for i_anomaly_breakdown in list(anomaly_breakdown.keys()):
if anomaly_breakdown_string == '':
anomaly_breakdown_string = '%s: %s' % (str(i_anomaly_breakdown), str(anomaly_breakdown[i_anomaly_breakdown]))
else:
anomaly_breakdown_string = '%s, %s: %s' % (anomaly_breakdown_string, str(i_anomaly_breakdown), str(anomaly_breakdown[i_anomaly_breakdown]))
logger.info('anomaly_breakdown - %s' % str(anomaly_breakdown_string))
else:
logger.info('anomaly_breakdown - none, no anomalies')
except:
logger.error(traceback.format_exc())
logger.error('error :: could not exceptions and anomaly_breakdown details')
try:
del exceptions
except:
pass
try:
del anomaly_breakdown
except:
pass
try:
# with self.batch_exceptions_q.mutex:
# self.batch_exceptions_q.queue.clear()
logger.info('clearing self.batch_exceptions_q of %s items' % str(self.batch_exceptions_q.qsize()))
while not self.batch_exceptions_q.empty():
try:
drop_value = self.batch_exceptions_q.get()
del drop_value
except:
pass
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: clearing self.batch_exceptions_q failed - %s' % err)
try:
# with self.batch_anomaly_breakdown_q.mutex:
# self.batch_anomaly_breakdown_q.queue.clear()
logger.info('clearing self.batch_anomaly_breakdown_q of %s items' % str(self.batch_anomaly_breakdown_q.qsize()))
while not self.batch_anomaly_breakdown_q.empty():
try:
drop_value = self.batch_anomaly_breakdown_q.get()
del drop_value
except:
pass
except:
logger.error(traceback.format_exc())
logger.error('error :: clearing self.batch_anomaly_breakdown_q failed - %s' % err)