Source code for crucible.crucible

import logging
try:
    from Queue import Empty
except:
    from queue import Empty
# import time
from time import time, sleep
from threading import Thread
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list to reduce memory and number of
# processes
# from multiprocessing import Process, Manager
from multiprocessing import Process
import os
from os.path import join, isfile
from os import kill, getpid, listdir
from sys import version_info
from sys import exit as sys_exit
import traceback
import re
import json
import gzip

# @modified 20200328 - Task #3290: Handle urllib2 in py3
#                      Branch #3262: py3
# Use urlretrieve
# try:
#     import urlparse
# except ImportError:
#     import urllib.parse
# try:
#     import urllib2
# except ImportError:
#     import urllib.request
#     import urllib.error
try:
    import urllib
except:
    # For backwards compatibility with py2 load urlib.request as urllib so
    # that urllib.urlretrieve is available to both as the same module.
    # from urllib import request as urllib
    import urllib.request
    import urllib.error

# import errno
import datetime
import shutil

import os.path
# sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
# sys.path.insert(0, os.path.dirname(__file__))

from ast import literal_eval

from redis import StrictRedis
from msgpack import packb
import requests

import settings

# @modified 20200327 - Branch #3262: py3
# from skyline_functions import load_metric_vars, fail_check, mkdir_p
# @modified 20200428 - Feature #3500: webapp - crucible_process_metrics
#                      Feature #1448: Crucible web UI
# Added write_data_to_file and filesafe_metricname to send to Panorama
from skyline_functions import (
    fail_check, mkdir_p, write_data_to_file, filesafe_metricname,
    # @added 20200506 - Feature #3532: Sort all time series
    sort_timeseries,
    # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url
    #                   Bug #3778: Handle single encoded forward slash requests to Graphite
    sanitise_graphite_url)

# @added 20220610 - Feature #3500: webapp - crucible_process_metrics
# Added summarise option
from functions.timeseries.determine_data_frequency import determine_data_frequency
from functions.timeseries.downsample import downsample_timeseries

from crucible_algorithms import run_algorithms

skyline_app = 'crucible'
skyline_app_logger = skyline_app + 'Log'
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = settings.LOG_PATH + '/' + skyline_app + '.log'
skyline_app_loglock = skyline_app_logfile + '.lock'
skyline_app_logwait = skyline_app_logfile + '.wait'

python_version = int(version_info[0])

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.' + settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.' + skyline_app + SERVER_METRIC_PATH

FULL_NAMESPACE = settings.FULL_NAMESPACE
ENABLE_CRUCIBLE_DEBUG = settings.ENABLE_CRUCIBLE_DEBUG
crucible_data_folder = str(settings.CRUCIBLE_DATA_FOLDER)
failed_checks_dir = crucible_data_folder + '/failed_checks'


[docs]class Crucible(Thread): def __init__(self, parent_pid): """ Initialize Crucible """ super(Crucible, self).__init__() self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Task #3032: Debug number of Python processes and memory use # Branch #3002: docker # Reduce amount of Manager instances that are used as each requires a # copy of entire memory to be copied into each subprocess so this # results in a python process per Manager instance, using as much # memory as the parent. OK on a server, not so much in a container. # Disabled all the Manager() lists below and replaced with Redis sets # self.process_list = Manager().list() # self.metric_variables = Manager().list() # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs] def check_if_parent_is_alive(self): """ Check if the parent process is alive """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: # @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics # Log warning logger.warning('warning :: parent or current process dead') sys_exit(0)
# @added 20200327 - Branch #3262: py3 # Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def new_load_metric_vars(self, metric_vars_file): """ Load the metric variables for a check from a metric check variables file :param metric_vars_file: the path and filename to the metric variables files :type metric_vars_file: str :return: the metric_vars list or ``False`` :rtype: list """ if os.path.isfile(metric_vars_file): logger.info( 'loading metric variables from metric_check_file - %s' % ( str(metric_vars_file))) else: logger.error( 'error :: loading metric variables from metric_check_file - file not found - %s' % ( str(metric_vars_file))) return False metric_vars = [] with open(metric_vars_file) as f: for line in f: no_new_line = line.replace('\n', '') no_equal_line = no_new_line.replace(' = ', ',') array = str(no_equal_line.split(',', 1)) add_line = literal_eval(array) metric_vars.append(add_line) # @added 20200607 - Feature #3630: webapp - crucible_process_training_data # Added training_data_json string_keys = [ 'metric', 'anomaly_dir', 'added_by', 'app', 'run_script', 'graphite_override_uri_parameters', 'training_data_json'] float_keys = ['value'] # @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Added ionosphere_parent_id, always zero from Analyzer and Mirage # @modified 20200420 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added alert_interval to int_keys and add_to_panorama to the boolean_keys # @added 20220610 - Feature #3500: webapp - crucible_process_metrics # Added summarise option int_keys = [ 'from_timestamp', 'metric_timestamp', 'added_at', 'full_duration', 'ionosphere_parent_id', 'alert_interval', 'summarise'] # @added 20201001 - Task #3748: POC SNAB # Added algorithms_run required to determine the anomalyScore # so this needs to be sent to Ionosphere so Ionosphere # can send it back on an alert, but the same file gets sent to Crucible so... array_keys = ['triggered_algorithms', 'algorithms', 'algorithms_run'] boolean_keys = ['graphite_metric', 'run_crucible_tests', 'add_to_panorama'] metric_vars_array = [] for var_array in metric_vars: key = None value = None if var_array[0] in string_keys: key = var_array[0] value_str = str(var_array[1]).replace("'", '') value = str(value_str) if var_array[0] == 'metric': metric = value if var_array[0] in float_keys: key = var_array[0] value_str = str(var_array[1]).replace("'", '') value = float(value_str) if var_array[0] in int_keys: key = var_array[0] value_str = str(var_array[1]).replace("'", '') value = int(value_str) if var_array[0] in array_keys: key = var_array[0] value = literal_eval(str(var_array[1])) if var_array[0] in boolean_keys: key = var_array[0] if str(var_array[1]) == 'True': value = True else: value = False if key: metric_vars_array.append([key, value]) if len(metric_vars_array) == 0: logger.error( 'error :: loading metric variables - none found - %s' % ( str(metric_vars_file))) return False logger.info('debug :: metric_vars for %s' % str(metric)) logger.info('debug :: %s' % str(metric_vars_array)) return metric_vars_array
[docs] def spin_process(self, i, run_timestamp, metric_check_file): """ Assign a metric for a process to analyze. :param i: python process id :param run_timestamp: the epoch timestamp at which this process was called :param metric_check_file: full path to the metric check file :return: returns True """ child_process_pid = os.getpid() if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('child_process_pid - %s' % str(child_process_pid)) # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.process_list.append(child_process_pid) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('processing metric check - %s' % metric_check_file) if not os.path.isfile(str(metric_check_file)): logger.error('error :: file not found - metric_check_file - %s' % (str(metric_check_file))) return check_file_name = os.path.basename(str(metric_check_file)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_name - %s' % check_file_name) check_file_timestamp = check_file_name.split('.', 1)[0] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_timestamp - %s' % str(check_file_timestamp)) check_file_metricname_txt = check_file_name.split('.', 1)[1] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_metricname_txt - %s' % check_file_metricname_txt) check_file_metricname = check_file_metricname_txt.replace('.txt', '') if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_metricname - %s' % check_file_metricname) check_file_metricname_dir = check_file_metricname.replace('.', '/') if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_metricname_dir - %s' % check_file_metricname_dir) metric_failed_check_dir = failed_checks_dir + '/' + check_file_metricname_dir + '/' + check_file_timestamp if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric_failed_check_dir - %s' % metric_failed_check_dir) # failed_check_file = failed_checks_dir + '/' + check_file_name failed_check_file = metric_failed_check_dir + '/' + check_file_name # Load and validate metric variables try: # @modified 20200327 - Branch #3262: py3 # metric_vars = load_metric_vars(skyline_app, str(metric_check_file)) metric_vars_array = self.new_load_metric_vars(str(metric_check_file)) except: logger.error(traceback.format_exc()) logger.error('error :: failed to import metric variables from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # TBD - a failed check Panorama update will go here, perhaps txt # files are not the only "queue" that will be used, both, but # Panorama, may be just a part of Skyline Flux, the flux DB # would allow for a very nice, distributed "queue" and a # distributed Skyline workforce... # Any Skyline node could just have one role, e.g. lots of # Skyline nodes running crucible only and instead of reading # the local filesystem for input, they could read the Flux DB # queue or both... return # Test metric variables # We use a pythonic methodology to test if the variables are defined, # this ensures that if any of the variables are not set for some reason # we can handle unexpected data or situations gracefully and try and # ensure that the process does not hang. # if len(str(metric_vars.metric)) == 0: # if not metric_vars.metric: # try: # metric_vars.metric # except: # logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # metric = str(metric_vars.metric) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - metric - %s' % metric) metric = None try: # metric_vars.metric key = 'metric' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric = str(value_list[0]) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('debug :: metric variable - metric - %s' % metric) except: logger.error(traceback.format_exc()) logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file)) metric = None if not metric: logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.value) == 0: # if not metric_vars.value: # try: # metric_vars.value # except: # logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # value = str(metric_vars.value) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - value - %s' % (value)) value = None try: key = 'value' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] value = float(value_list[0]) except: logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file)) value = None if not value: # @modified 20181119 - Bug #2708: Failing to load metric vars if value == 0.0: pass else: logger.error('error :: failed to load value variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.from_timestamp) == 0: # if not metric_vars.from_timestamp: # try: # metric_vars.from_timestamp # except: # logger.error('error :: failed to read from_timestamp variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # from_timestamp = str(metric_vars.from_timestamp) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - from_timestamp - %s' % from_timestamp) from_timestamp = None try: key = 'from_timestamp' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] from_timestamp = int(value_list[0]) except: logger.error('error :: failed to read from_timestamp variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.metric_timestamp) == 0: # if not metric_vars.metric_timestamp: # try: # metric_vars.metric_timestamp # except: # logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # metric_timestamp = str(metric_vars.metric_timestamp) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - metric_timestamp - %s' % metric_timestamp) metric_timestamp = None try: key = 'metric_timestamp' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric_timestamp = str(value_list[0]) except: logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.algorithms) == 0: # if not metric_vars.algorithms: # algorithms = [] # try: # metric_vars.algorithms # except: # logger.error('error :: failed to read algorithms variable from check file setting to all') # algorithms = ['all'] # # if not algorithms: # # algorithms = [] # # for i_algorithm in metric_vars.algorithms: # # algorithms.append(i_algorithm) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - algorithms - %s' % str(algorithms)) algorithms = [] try: key = 'algorithms' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] algorithms = value_list[0] except: logger.info('failed to read algorithms variable from check file setting to all') algorithms = ['all'] # if len(metric_vars.anomaly_dir) == 0: # if not metric_vars.anomaly_dir: # try: # metric_vars.anomaly_dir # except: # logger.error('error :: failed to read anomaly_dir variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # anomaly_dir = str(metric_vars.anomaly_dir) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - anomaly_dir - %s' % anomaly_dir) anomaly_dir = None try: key = 'anomaly_dir' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] anomaly_dir = str(value_list[0]) except: logger.error('failed to read anomaly_dir variable from check file setting to all') fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - anomaly_dir - %s' % anomaly_dir) # if len(str(metric_vars.graphite_metric)) == 0: # try: # metric_vars.graphite_metric # except: # logger.info('failed to read graphite_metric variable from check file setting to False') # # yes this is a string # graphite_metric = 'False' # else: # graphite_metric = str(metric_vars.graphite_metric) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - graphite_metric - %s' % graphite_metric) graphite_metric = None try: key = 'graphite_metric' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] graphite_metric = str(value_list[0]) except: logger.info('failed to read graphite_metric variable from check file setting to False') # yes this is a string graphite_metric = 'False' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - graphite_metric - %s' % graphite_metric) # if len(str(metric_vars.run_crucible_tests)) == 0: try: # metric_vars.run_crucible_tests key = 'run_crucible_tests' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] run_crucible_tests = str(value_list[0]) except: logger.info('failed to read run_crucible_tests variable from check file setting to False') # yes this is a string run_crucible_tests = 'False' # else: # run_crucible_tests = str(metric_vars.run_crucible_tests) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - run_crucible_tests - %s' % run_crucible_tests) try: # metric_vars.added_by key = 'added_by' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] added_by = str(value_list[0]) except: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('failed to read added_by variable from check file setting to crucible - set to crucible') added_by = 'crucible' # else: # added_by = str(metric_vars.added_by) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - added_by - %s' % added_by) try: # metric_vars.run_script key = 'run_script' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] run_script = str(value_list[0]) # @modified 20230109 - Task #4798: Deprecate run_script from crucible # Task #4778: v4.0.0 - update dependencies # logger.info('running - %s' % (run_script)) logger.warning('WARNING :: DEPRECATED run_script in v4.0.0') except: run_script = False if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - run_script - not present set to False') # else: # run_script = str(metric_vars.run_script) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - run_script - %s' % run_script) # @added 20190612 - Feature #3108: crucible - graphite_override_uri_parameters_specific_url # This metric variable is used to to declare absolute graphite uri # parameters try: # metric_vars.graphite_override_uri_parameters key = 'graphite_override_uri_parameters' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] graphite_override_uri_parameters = str(value_list[0]) except: logger.info('failed to read graphite_override_uri_parameters variable from check file setting to False') # yes this is a string graphite_override_uri_parameters = False # else: # graphite_override_uri_parameters = str(metric_vars.graphite_override_uri_parameters) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - graphite_override_uri_parameters - %s' % graphite_override_uri_parameters) add_to_panorama = False try: key = 'add_to_panorama' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] add_to_panorama = str(value_list[0]) if add_to_panorama == 'True': add_to_panorama = True else: add_to_panorama = False except: logger.info('failed to read add_to_panorama variable from check file setting to False') add_to_panorama = False if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - add_to_panorama - %s' % str(add_to_panorama)) # @modified 20200420 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added alert_interval try: key = 'alert_interval' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] alert_interval = int(value_list[0]) except: logger.info('failed to read alert_interval variable from check file setting to 0') alert_interval = 0 if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - alert_interval - %s' % str(alert_interval)) # @added 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # In order for metrics to be analysed in Crucible like the # Analyzer or Mirage analysis, the time series data needs to # be padded. Added pad_timeseries in the webapp. padded_timeseries = False # @added 20200607 - Feature #3630: webapp - crucible_process_training_data # Added training_data_json try: key = 'training_data_json' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] training_data_json = str(value_list[0]) except: training_data_json = None if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - training_data_json - %s' % str(training_data_json)) # @added 20220610 - Feature #3500: webapp - crucible_process_metrics # Added summarise option summarise = 0 try: key = 'summarise' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] summarise = int(value_list[0]) except: summarise = 0 if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - summarise - %s' % str(summarise)) # Only check if the metric does not a EXPIRATION_TIME key set, crucible # uses the alert EXPIRATION_TIME for the relevant alert setting contexts # whether that be analyzer, mirage, boundary, etc and sets its own # cache_keys in redis. This prevents large amounts of data being added # in terms of tieseries json and image files, crucible samples at the # same EXPIRATION_TIME as alerts. source_app = 'crucible' expiration_timeout = 1800 remove_all_anomaly_files = False check_expired = False check_time = time() if added_by == 'analyzer' or added_by == 'mirage': if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('Will check %s ALERTS' % added_by) if settings.ENABLE_ALERTS: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('Checking %s ALERTS' % added_by) for alert in settings.ALERTS: ALERT_MATCH_PATTERN = alert[0] METRIC_PATTERN = metric alert_match_pattern = re.compile(ALERT_MATCH_PATTERN) pattern_match = alert_match_pattern.match(METRIC_PATTERN) if pattern_match: source_app = added_by expiration_timeout = alert[2] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('matched - %s - %s - EXPIRATION_TIME is %s' % (source_app, metric, str(expiration_timeout))) check_age = int(check_time) - int(metric_timestamp) if int(check_age) > int(expiration_timeout): check_expired = True if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('the check is older than EXPIRATION_TIME for the metric - not checking - check_expired') if added_by == 'boundary': if settings.BOUNDARY_ENABLE_ALERTS: for alert in settings.BOUNDARY_METRICS: ALERT_MATCH_PATTERN = alert[0] METRIC_PATTERN = metric alert_match_pattern = re.compile(ALERT_MATCH_PATTERN) pattern_match = alert_match_pattern.match(METRIC_PATTERN) if pattern_match: source_app = 'boundary' expiration_timeout = alert[2] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('matched - %s - %s - EXPIRATION_TIME is %s' % (source_app, metric, str(expiration_timeout))) check_age = int(check_time) - int(metric_timestamp) if int(check_age) > int(expiration_timeout): check_expired = True if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('the check is older than EXPIRATION_TIME for the metric - not checking - check_expired') # @added 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # If the check is added by the webapp explicitly set the check_expired # to True so that analysis will run if added_by == 'webapp': check_expired = True cache_key = 'crucible.last_check.%s.%s' % (source_app, metric) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('cache_key - crucible.last_check.%s.%s' % (source_app, metric)) # Only use the cache_key EXPIRATION_TIME if this is not a request to # run_crucible_tests on a timeseries if run_crucible_tests == 'False': if check_expired: logger.info('check_expired - not checking Redis key') last_check = True else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('Checking if cache_key exists') try: last_check = self.redis_conn.get(cache_key) except Exception as e: logger.error('error :: could not query cache_key for %s - %s - %s' % (source_app, metric, e)) logger.info('all anomaly files will be removed') remove_all_anomaly_files = True if not last_check: try: self.redis_conn.setex(cache_key, expiration_timeout, packb(value)) logger.info('set cache_key for %s - %s with timeout of %s' % (source_app, metric, str(expiration_timeout))) except Exception as e: logger.error('error :: could not query cache_key for %s - %s - %s' % (source_app, metric, e)) logger.info('all anomaly files will be removed') remove_all_anomaly_files = True else: if check_expired: logger.info('check_expired - all anomaly files will be removed') remove_all_anomaly_files = True else: logger.info('cache_key is set and not expired for %s - %s - all anomaly files will be removed' % (source_app, metric)) remove_all_anomaly_files = True # anomaly dir if not os.path.exists(str(anomaly_dir)): try: # mkdir_p(skyline_app, str(anomaly_dir)) mkdir_p(anomaly_dir) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('created anomaly dir - %s' % str(anomaly_dir)) except: logger.error('error :: failed to create anomaly_dir - %s' % str(anomaly_dir)) if not os.path.exists(str(anomaly_dir)): logger.error('error :: anomaly_dir does not exist') fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly dir exists - %s' % str(anomaly_dir)) failed_check_file = anomaly_dir + '/' + metric_timestamp + '.failed.check.' + metric + '.txt' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('failed_check_file - %s' % str(failed_check_file)) # Retrieve data from graphite is necessary anomaly_graph = anomaly_dir + '/' + metric + '.png' anomaly_json = anomaly_dir + '/' + metric + '.json' anomaly_json_gz = anomaly_json + '.gz' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_graph - %s' % str(anomaly_graph)) logger.info('anomaly_json - %s' % str(anomaly_json)) logger.info('anomaly_json_gz - %s' % str(anomaly_json_gz)) # Some things added to crucible may not be added by a skyline app per se # and if run_crucible_tests is string True then no anomaly files should # be removed. if run_crucible_tests == 'True': remove_all_anomaly_files = False # Remove check and anomaly files if the metric has a EXPIRATION_TIME # cache_key set if remove_all_anomaly_files: if os.path.isfile(anomaly_graph): try: os.remove(anomaly_graph) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_graph removed - %s' % str(anomaly_graph)) except OSError: pass if os.path.isfile(anomaly_json): try: os.remove(anomaly_json) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json removed - %s' % str(anomaly_json)) except OSError: pass if os.path.isfile(anomaly_json_gz): try: os.remove(anomaly_json_gz) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json_gz removed - %s' % str(anomaly_json_gz)) except OSError: pass anomaly_txt_file = anomaly_dir + '/' + metric + '.txt' if os.path.isfile(anomaly_txt_file): try: os.remove(anomaly_txt_file) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_txt_file removed - %s' % str(anomaly_txt_file)) except OSError: pass # TBD - this data would have to be added to the panaorama DB before # it is removed if os.path.isfile(str(metric_check_file)): try: os.remove(str(metric_check_file)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric_check_file removed - %s' % str(metric_check_file)) except OSError: pass if os.path.exists(str(anomaly_dir)): try: os.rmdir(str(anomaly_dir)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_dir removed - %s' % str(anomaly_dir)) except OSError: pass logger.info('check and anomaly files removed') return # Check if the image exists if graphite_metric == 'True': if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('graphite_metric - %s' % (graphite_metric)) # Graphite timeouts connect_timeout = int(settings.GRAPHITE_CONNECT_TIMEOUT) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('connect_timeout - %s' % str(connect_timeout)) read_timeout = int(settings.GRAPHITE_READ_TIMEOUT) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('read_timeout - %s' % str(read_timeout)) graphite_until = datetime.datetime.fromtimestamp(int(metric_timestamp)).strftime('%H:%M_%Y%m%d') graphite_from = datetime.datetime.fromtimestamp(int(from_timestamp)).strftime('%H:%M_%Y%m%d') # graphite URL if settings.GRAPHITE_PORT != '': url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + ':' + settings.GRAPHITE_PORT + '/render/?from=' + graphite_from + '&until=' + graphite_until + '&target=' + metric + '&format=json' else: url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + '/render/?from=' + graphite_from + '&until=' + graphite_until + '&target=' + metric + '&format=json' # @added 20190612 - Feature #3108: crucible - graphite_override_uri_parameters # This metric variable is used to to declare absolute graphite uri # parameters # from=00%3A00_20190527&until=23%3A59_20190612&target=movingMedian(nonNegativeDerivative(stats.zpf-watcher-prod-1-30g-doa2.vda.readTime)%2C24) if graphite_override_uri_parameters: if settings.GRAPHITE_PORT != '': url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + ':' + settings.GRAPHITE_PORT + '/render/?' + graphite_override_uri_parameters + '&format=json' else: url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + '/render/?' + graphite_override_uri_parameters + '&format=json' logger.info('graphite url set from graphite_override_uri_parameters - %s' % (url)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('graphite url - %s' % (url)) if not os.path.isfile(anomaly_graph): image_url = url.replace('&format=json', '') graphite_image_file = anomaly_dir + '/' + metric + '.png' if 'width' not in image_url: image_url += '&width=586' if 'height' not in image_url: image_url += '&height=308' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('graphite image url - %s' % (image_url)) # @modified 20200327 - Feature #3108: crucible - graphite_override_uri_parameters # Branch #3262: py3 # Added graphite_override_uri_parameters conditional if graphite_override_uri_parameters: logger.info('retrieving png - surfacing %s graph from graphite with %s' % (metric, url)) else: logger.info('retrieving png - surfacing %s graph from graphite from %s to %s' % (metric, graphite_from, graphite_until)) # @modified 20200328 - Task #3290: Handle urllib2 in py3 # Branch #3262: py3 # Use urlretrieve - image_url_timeout and image_data not longer # required # image_url_timeout = int(connect_timeout) # image_data = None try: # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests # @modified 20200328 - Task #3290: Handle urllib2 in py3 # Branch #3262: py3 # Use urlretrieve # image_data = urllib2.urlopen(image_url, timeout=image_url_timeout).read() # nosec if python_version == 2: # @modified 20200808 - Task #3608: Update Skyline to Python 3.8.3 and deps # Added nosec for bandit [B310:blacklist] Audit url open for # permitted schemes. Allowing use of file:/ or custom schemes is # often unexpected. if image_url.lower().startswith('http'): urllib.urlretrieve(image_url, graphite_image_file) # nosec B310 else: logger.error( 'error :: %s :: image_url does not start with http - %s' % (str(image_url))) if python_version == 3: # @modified 20200808 - Task #3608: Update Skyline to Python 3.8.3 and deps # Added nosec for bandit [B310:blacklist] Audit url open for # permitted schemes. Allowing use of file:/ or custom schemes is # often unexpected. if image_url.lower().startswith('http'): urllib.request.urlretrieve(image_url, graphite_image_file) # nosec B310 else: logger.error( 'error :: %s :: image_url does not start with http - %s' % (str(image_url))) logger.info('url OK - %s' % (image_url)) # except urllib2.URLError: except: logger.error(traceback.print_exc()) logger.error('error :: url bad - %s' % (image_url)) # image_data = None if os.path.isfile(graphite_image_file): try: if python_version == 2: os.chmod(graphite_image_file, 0o644) if python_version == 3: os.chmod(graphite_image_file, mode=0o644) logger.info('graphite_image_file permissions set OK - %s' % (graphite_image_file)) except: logger.error(traceback.print_exc()) logger.error('error :: graphite_image_file permissions could not be set - %s' % (graphite_image_file)) else: logger.error('error :: graphite_image_file does not exist - %s' % (graphite_image_file)) # @modified 20200328 - Task #3290: Handle urllib2 in py3 # Branch #3262: py3 # Use urlretrieve so no need to write data to file # if image_data is not None: # with open(graphite_image_file, 'w') as f: # f.write(image_data) # logger.info('retrieved - %s' % (anomaly_graph)) # if python_version == 2: # # @modified 20200327 - Branch #3262: py3 # # os.chmod(graphite_image_file, 0644) # os.chmod(graphite_image_file, 0o644) # if python_version == 3: # os.chmod(graphite_image_file, mode=0o644) # else: # logger.error('error :: failed to retrieved - %s' % (anomaly_graph)) else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_graph file exists - %s' % str(anomaly_graph)) if not os.path.isfile(anomaly_graph): logger.error('error :: retrieve failed to surface %s graph from graphite' % (metric)) else: logger.info('graph image exists - %s' % (anomaly_graph)) # Check if the json exists if not os.path.isfile(anomaly_json_gz): if not os.path.isfile(anomaly_json): logger.info('surfacing timeseries data for %s from graphite from %s to %s' % (metric, graphite_from, graphite_until)) if requests.__version__ >= '2.4.0': use_timeout = (int(connect_timeout), int(read_timeout)) else: use_timeout = int(connect_timeout) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('use_timeout - %s' % (str(use_timeout))) # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url # Bug #3778: Handle single encoded forward slash requests to Graphite sanitised = False try: sanitised, url = sanitise_graphite_url(skyline_app, url) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: sanitise_graphite_url failed - %s - %s' % (str(url), err)) datapoints = [] try: r = requests.get(url, timeout=use_timeout) js = r.json() datapoints = js[0]['datapoints'] logger.info('data retrieved OK') if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('data retrieved OK') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: data retrieval failed - %s' % err) datapoints = [] converted = [] for datapoint in datapoints: try: new_datapoint = [int(datapoint[1]), float(datapoint[0])] converted.append(new_datapoint) # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests except: # nosec B110, B112 continue if not converted: logger.error('error :: failed to surface any json data for %s from graphite' % (metric)) # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) # pass except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: failed to move check file - %s' % (err)) return if converted: try: with open(anomaly_json, 'w') as f: f.write(json.dumps(converted)) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json, 0644) os.chmod(anomaly_json, 0o644) if python_version == 3: os.chmod(anomaly_json, mode=0o644) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('json file - %s' % anomaly_json) except: logger.error(traceback.print_exc()) logger.error('error :: failed to write or chmod anomaly_json - %s' % (anomaly_json)) # @added 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Clean up converted if converted: try: del converted del js del datapoints except: pass if not os.path.isfile(anomaly_json): logger.error('error :: failed to surface %s json from graphite' % (metric)) # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) # pass return # @added 20200607 - Feature #3630: webapp - crucible_process_training_data # Added training_data_json if training_data_json: if not os.path.isfile(training_data_json): logger.error('error :: no training data json data found - %s' % training_data_json) # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) # pass return else: logger.info('copying training_data_json %s to anomaly_json %s' % (training_data_json, anomaly_json)) try: shutil.copy(training_data_json, anomaly_json) except Exception as err: logger.error('error :: failed to copy training_data_json file - %s' % err) # anomaly_json = training_data_json # Check timeseries json exists - raw or gz if not os.path.isfile(anomaly_json): if not os.path.isfile(anomaly_json_gz): logger.error('error :: no json data found') # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) return logger.info('timeseries json gzip exists - %s' % (anomaly_json_gz)) else: logger.info('timeseries json exists - %s' % (anomaly_json)) # If timeseries json and run_crucible_tests is str(False) gzip and # return here as there is nothing further to do if run_crucible_tests == 'False': if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('run_crucible_tests - %s' % run_crucible_tests) # gzip the json timeseries data if os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('gzipping - %s' % anomaly_json) try: f_in = open(anomaly_json) f_out = gzip.open(anomaly_json_gz, 'wb') f_out.writelines(f_in) f_out.close() f_in.close() os.remove(anomaly_json) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json_gz, 0644) os.chmod(anomaly_json_gz, 0o644) if python_version == 3: os.chmod(anomaly_json_gz, mode=0o644) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('gzipped - %s' % anomaly_json_gz) try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass return except: logger.error('error :: Failed to gzip data file - %s' % str(traceback.print_exc())) try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass return if os.path.isfile(anomaly_json_gz): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('gzip exists - %s' % anomaly_json) try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass return nothing_to_do = 'true - for debug only' # self.check_if_parent_is_alive() # Run crucible algorithms logger.info('running crucible tests - %s' % (metric)) if os.path.isfile(anomaly_json_gz): if not os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('ungzipping - %s' % anomaly_json_gz) try: # with gzip.open(anomaly_json_gz, 'rb') as fr: fr = gzip.open(anomaly_json_gz, 'rb') raw_timeseries = fr.read() fr.close() except Exception as e: logger.error(traceback.print_exc()) logger.error('error :: could not ungzip %s - %s' % (anomaly_json_gz, e)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('ungzipped') logger.info('writing to - %s' % anomaly_json) with open(anomaly_json, 'w') as fw: fw.write(raw_timeseries) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json done') if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json, 0644) os.chmod(anomaly_json, 0o644) if python_version == 3: os.chmod(anomaly_json, mode=0o644) else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('No gzip - %s' % anomaly_json_gz) nothing_to_do = 'true - for debug only' if os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json exists - %s' % anomaly_json) if os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('loading timeseries from - %s' % anomaly_json) timeseries = None try: with open(anomaly_json, 'r') as f: timeseries = json.loads(f.read()) raw_timeseries = f.read() timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']') timeseries = literal_eval(timeseries_array_str) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('loaded time series from - %s' % anomaly_json) except: # logger.error(traceback.format_exc()) logger.info('failed to load with JSON, literal_eval will be tried - %s' % anomaly_json) # @added 20180715 - Task #2444: Evaluate CAD # Task #2446: Optimize Ionosphere # Branch #2270: luminosity # If the json.loads fails use literal_eval if not timeseries: try: with open(anomaly_json, 'r') as f: raw_timeseries = f.read() timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']') timeseries = literal_eval(timeseries_array_str) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('loaded time series with literal_eval from - %s' % anomaly_json) except: logger.info(traceback.format_exc()) logger.error('error :: failed to load JSON - %s' % anomaly_json) else: try: logger.error('error :: file not found - %s' % anomaly_json) shutil.move(metric_check_file, failed_check_file) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(failed_check_file, 0644) os.chmod(failed_check_file, 0o644) if python_version == 3: os.chmod(failed_check_file, mode=0o644) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) return if not timeseries: try: logger.info('failing check, no time series from - %s' % anomaly_json) shutil.move(metric_check_file, failed_check_file) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(failed_check_file, 0644) os.chmod(failed_check_file, 0o644) if python_version == 3: os.chmod(failed_check_file, mode=0o644) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) return # @added 20200507 - Feature #3532: Sort all time series # To ensure that there are no unordered timestamps in the time # series which are artefacts of the collector or carbon-relay, sort # all time series by timestamp before analysis. original_timeseries = timeseries if original_timeseries: timeseries = sort_timeseries(original_timeseries) del original_timeseries logger.info('timeseries length: %s' % str(len(timeseries))) start_timestamp = int(timeseries[0][0]) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('start_timestamp - %s' % str(start_timestamp)) end_timestamp = int(timeseries[-1][0]) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('end_timestamp - %s' % str(end_timestamp)) full_duration = end_timestamp - start_timestamp if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('full_duration - %s' % str(full_duration)) # @added 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # In order for metrics to be analysed in Crucible like the # Analyzer or Mirage analysis, the time series data needs to # be padded. Added pad_timeseries in the webapp so check # here if the time series is padded if graphite_override_uri_parameters and added_by == 'webapp': if start_timestamp < from_timestamp: padded_timeseries = True padded_with = from_timestamp - start_timestamp logger.info('padded time series identified, padded with %s seconds' % str(padded_with)) # @added 20220610 - Feature #3500: webapp - crucible_process_metrics # Added summarise option if training_data_json and summarise: resolution = determine_data_frequency(skyline_app, timeseries, False) if summarise != resolution: logger.info('downsampling training_data_json time series from %s to %s resolution' % ( str(resolution), str(summarise))) aligned_timeseries = [] for ts, value in timeseries: aligned_timeseries.append([int(int(ts) // resolution * resolution), value]) timeseries = aligned_timeseries logger.info('original training_data_json time series has %s data points' % ( str(len(timeseries)))) method = 'mean' resampled_aligned_timeseries = [] try: resampled_aligned_timeseries = downsample_timeseries(skyline_app, timeseries, resolution, summarise, method, origin='end') except Exception as err: logger.error('error :: downsample_timeseries failed on full_duration_timeseries - %s' % err) if resampled_aligned_timeseries: logger.info('downsampled training_data_json time series has %s data points' % ( str(len(resampled_aligned_timeseries)))) timeseries = list(resampled_aligned_timeseries) self.check_if_parent_is_alive() run_algorithms_start_timestamp = int(time()) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('run_algorithms_start_timestamp - %s' % str(run_algorithms_start_timestamp)) # @added 20200427 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Set variables so on fail the process does not hang anomalous = None ensemble = None alert_interval_discarded_anomalies_count = 0 # For debug only but do not remove as this is an item in the final # return nothing_to_do = '' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('run_algorithms - %s,%s,%s,%s,%s,%s' % (metric, str(end_timestamp), str(full_duration), anomaly_json, skyline_app, str(algorithms))) try: # @modified 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Pass alert_interval, add_to_panaroma and alert_interval_discarded_anomalies_count # anomalous, ensemble = run_algorithms(timeseries, str(metric), end_timestamp, full_duration, str(anomaly_json), skyline_app, algorithms) # @modified 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added padded_timeseries and from_timestamp anomalous, ensemble, alert_interval_discarded_anomalies_count = run_algorithms(timeseries, str(metric), end_timestamp, full_duration, str(anomaly_json), skyline_app, algorithms, alert_interval, add_to_panorama, padded_timeseries, from_timestamp) except: logger.error('error :: run_algorithms failed - %s' % str(traceback.print_exc())) try: shutil.move(metric_check_file, failed_check_file) if python_version == 2: os.chmod(failed_check_file, 0o644) if python_version == 3: os.chmod(failed_check_file, mode=0o644) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) return run_algorithms_end_timestamp = int(time()) run_algorithms_seconds = run_algorithms_end_timestamp - run_algorithms_start_timestamp if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomalous, ensemble - %s, %s' % (anomalous, str(ensemble))) if anomalous: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomalous - %s' % (anomalous)) nothing_to_do = 'true - for debug only' logger.info('run_algorithms took %s seconds' % str(run_algorithms_seconds)) # Update anomaly file # @modified 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added run_algorithms_seconds and alert_interval_discarded_anomalies crucible_data = 'crucible_tests_run = \'%s\'\n' \ 'crucible_triggered_algorithms = %s\n' \ 'tested_by = \'%s\'\n' \ 'run_algorithms_seconds = \'%s\'\n' \ 'alert_interval_discarded_anomalies = \'%s\'\n' \ % (str(run_timestamp), str(ensemble), str(this_host), str(run_algorithms_seconds), str(alert_interval_discarded_anomalies_count)) crucible_anomaly_file = '%s/%s.txt' % (anomaly_dir, metric) with open(crucible_anomaly_file, 'a') as fh: fh.write(crucible_data) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(crucible_anomaly_file, 0644) os.chmod(crucible_anomaly_file, 0o644) if python_version == 3: os.chmod(crucible_anomaly_file, mode=0o644) logger.info('updated crucible anomaly file - %s/%s.txt' % (anomaly_dir, metric)) # gzip the json timeseries data after analysis if os.path.isfile(anomaly_json): if not os.path.isfile(anomaly_json_gz): remove_json = False try: f_in = open(anomaly_json) f_out = gzip.open(anomaly_json_gz, 'wb') f_out.writelines(f_in) f_out.close() f_in.close() remove_json = True logger.info('gzipped - %s' % (anomaly_json_gz)) except: logger.error('error :: Failed to gzip data file - %s' % str(traceback.print_exc())) if remove_json: try: if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json_gz, 0644) os.chmod(anomaly_json_gz, 0o644) if python_version == 3: os.chmod(anomaly_json_gz, mode=0o644) logger.info('gzipped - %s' % (anomaly_json_gz)) except: logger.error('error :: Failed to chmod anomaly_json_gz file - %s' % str(traceback.print_exc())) try: os.remove(anomaly_json) except: logger.error('error :: Failed to remove anomaly_json file - %s' % str(traceback.print_exc())) else: os.remove(anomaly_json) if run_script: if os.path.isfile(run_script): # @modified 20230109 - Task #4798: Deprecate run_script from crucible # Task #4778: v4.0.0 - update dependencies # logger.info('running - %s' % (run_script)) logger.warning('WARNING :: not running - %s - DEPRECATED run_script in v4.0.0' % (run_script)) # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests # @modified 20230109 - Task #4798: Deprecate run_script from crucible # Task #4778: v4.0.0 - update dependencies # Deprecate run_script was added to some internal testing, not # tested and not maintained or used and is undocumented # os.system('%s %s' % (str(run_script), str(crucible_anomaly_file))) # nosec # @added 20200428 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Send Skyline consensus anomalie to Panorama if add_to_panorama: added_to_panorama = False user_id = 1 skyline_anomalies_score_file = anomaly_dir + '/' + 'skyline.anomalies_score.txt' if os.path.isfile(skyline_anomalies_score_file): try: with open(skyline_anomalies_score_file) as f: output = f.read() skyline_anomalies = literal_eval(output) except: logger.info(traceback.format_exc()) logger.error('error :: failed to get Skyline anomalies scores from %s' % skyline_anomalies_score_file) skyline_anomalies = None skyline_consensus_anomalies = [] if skyline_anomalies: # skyline_anomalies format # [timestamp, value, anomaly_score, triggered_algorithms] # [1583234400.0, 44.39999999990687, 2, ['histogram_bins', 'median_absolute_deviation']], # Convert float timestamp from Graphite to int for timestamp, value, anomaly_score, triggered_algorithms in skyline_anomalies: if anomaly_score >= settings.CONSENSUS: skyline_consensus_anomalies.append([int(timestamp), value, anomaly_score, triggered_algorithms]) del skyline_anomalies added_at = int(time()) if skyline_consensus_anomalies: sane_metricname = filesafe_metricname(str(metric)) for timestamp, datapoint, anomaly_score, triggered_algorithms in skyline_consensus_anomalies: # To allow multiple Panorama anomaly files to added quickly just # increment the added_at by 1 seconds so that all the files have a # unique name added_at += 1 # Note: # The values are enclosed is single quoted intentionally # as the imp.load_source used results in a shift in the # decimal position when double quoted, e.g. # value = "5622.0" gets imported as # 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2 # single quoting results in the desired, # 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0 source = 'graphite' panaroma_anomaly_data = 'metric = \'%s\'\n' \ 'value = \'%s\'\n' \ 'from_timestamp = \'%s\'\n' \ 'metric_timestamp = \'%s\'\n' \ 'algorithms = %s\n' \ 'triggered_algorithms = %s\n' \ 'app = \'%s\'\n' \ 'source = \'%s\'\n' \ 'added_by = \'%s\'\n' \ 'added_at = \'%s\'\n' \ 'label = \'added by Crucible\'\n' \ 'user_id = \'%s\'\n' \ % (metric, str(datapoint), from_timestamp, str(timestamp), str(settings.ALGORITHMS), triggered_algorithms, skyline_app, source, this_host, str(added_at), str(user_id)) # Create an anomaly file with details about the anomaly panaroma_anomaly_file = '%s/%s.%s.txt' % ( settings.PANORAMA_CHECK_PATH, added_at, sane_metricname) try: write_data_to_file( skyline_app, panaroma_anomaly_file, 'w', panaroma_anomaly_data) logger.info('added panorama anomaly file :: %s' % (panaroma_anomaly_file)) added_to_panorama = True except: logger.error(traceback.format_exc()) logger.error('error :: send_crucible_job_metric_to_panorama - failed to add panorama anomaly file :: %s' % (panaroma_anomaly_file)) if added_to_panorama: crucible_sent_to_panorama_file = '%s/%s.%s.%s.sent_to_panorama.txt' % ( anomaly_dir, str(added_at), sane_metricname) panorama_done_data = [added_at, int(user_id), skyline_consensus_anomalies] try: write_data_to_file( skyline_app, crucible_sent_to_panorama_file, 'w', str(panorama_done_data)) logger.info('added panorama crucible job file :: %s' % (crucible_sent_to_panorama_file)) except: logger.error(traceback.format_exc()) logger.error('error :: crucible send to panorama - failed to add panorama file :: %s' % (crucible_sent_to_panorama_file)) try: os.remove(metric_check_file) logger.info('complete removed check file - %s %s' % (metric_check_file, nothing_to_do)) except OSError: pass
[docs] def run(self): """ Called when the process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) else: logger.info('bin/%s.d log management done' % skyline_app) logger.info('process intialized') while 1: now = time() if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('process started - %s' % int(now)) # Make sure check_dir exists and has not been removed try: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('checking check dir exists - %s' % settings.CRUCIBLE_CHECK_PATH) os.path.exists(settings.CRUCIBLE_CHECK_PATH) except: logger.error('error :: check dir did not exist - %s' % settings.CRUCIBLE_CHECK_PATH) mkdir_p(settings.CRUCIBLE_CHECK_PATH) logger.info('check dir created - %s' % settings.CRUCIBLE_CHECK_PATH) os.path.exists(settings.CRUCIBLE_CHECK_PATH) # continue # Make sure Redis is up try: self.redis_conn.ping() logger.info('connected to redis at socket path %s' % settings.REDIS_SOCKET_PATH) except: logger.info('skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(10) logger.info('connecting to redis at socket path %s' % settings.REDIS_SOCKET_PATH) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) continue # Determine if any metric has been added to test while True: # Report app up self.redis_conn.setex(skyline_app, 120, now) metric_var_files = [f for f in listdir(settings.CRUCIBLE_CHECK_PATH) if isfile(join(settings.CRUCIBLE_CHECK_PATH, f))] # @added 20200428 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # If crucible is busy, stop processing around midnight to allow # for log rotation as log rotation can make the process hang if # running pause_for_log_rotation = False current_now = datetime.datetime.now() current_hour = current_now.strftime('%H') if current_hour == '23': current_minute = current_now.strftime('%M') before_midnight_minutes = ['55', '56', '57', '58', '59'] if current_minute in before_midnight_minutes: logger.info('setting metric_var_files to [] for log rotation') metric_var_files = [] pause_for_log_rotation = True if current_hour == '00': current_minute = current_now.strftime('%M') after_midnight_minutes = ['00', '01', '02'] if current_minute in after_midnight_minutes: logger.info('setting metric_var_files to [] for log rotation') metric_var_files = [] pause_for_log_rotation = True # if len(metric_var_files) == 0: if not metric_var_files: logger.info('sleeping 10 no metric check files') sleep(10) # Discover metric to analyze metric_var_files = '' metric_var_files = [f for f in listdir(settings.CRUCIBLE_CHECK_PATH) if isfile(join(settings.CRUCIBLE_CHECK_PATH, f))] if pause_for_log_rotation: logger.info('setting metric_var_files to [] for log rotation') metric_var_files = [] # if len(metric_var_files) > 0: if metric_var_files: break metric_var_files_sorted = sorted(metric_var_files) metric_check_file = settings.CRUCIBLE_CHECK_PATH + "/" + str(metric_var_files_sorted[0]) # @added 20200421 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # TODO # Handled multiple processes # Prioritise current metrics over webapp metrics logger.info('assigning check for processing - %s' % str(metric_var_files_sorted[0])) # Reset process_list # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # try: # self.process_list[:] = [] # except: # logger.error('error :: failed to reset self.process_list') # Spawn processes pids = [] spawned_pids = [] pid_count = 0 run_timestamp = int(now) # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES use_range = 1 if len(metric_var_files_sorted) > 1: if settings.CRUCIBLE_PROCESSES > 1: if len(metric_var_files_sorted) < settings.CRUCIBLE_PROCESSES: use_range = settings.CRUCIBLE_PROCESSES - len(metric_var_files_sorted) else: use_range = settings.CRUCIBLE_PROCESSES logger.info('dynamically determined to submit to and use %s processors' % str(use_range)) logger.info('will use %s processors' % str(use_range)) # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # for i in range(1, settings.CRUCIBLE_PROCESSES + 1): for i in range(1, use_range + 1): # @added 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES if len(metric_var_files_sorted) > 1: if use_range > 1: if i > 1: list_element = i - 1 metric_check_file = settings.CRUCIBLE_CHECK_PATH + "/" + str(metric_var_files_sorted[list_element]) logger.info('assigning additional check to processor %s for processing - %s' % (str(i), str(metric_var_files_sorted[list_element]))) p = Process(target=self.spin_process, args=(i, run_timestamp, str(metric_check_file))) pids.append(p) pid_count += 1 # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.CRUCIBLE_PROCESSES))) logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(use_range))) p.start() spawned_pids.append(p.pid) # Send wait signal to zombie processes # for p in pids: # p.join() # Self monitor processes and terminate if any spin_process has run # for longer than CRUCIBLE_TESTS_TIMEOUT p_starts = time() sleep_count = 0 while time() - p_starts <= settings.CRUCIBLE_TESTS_TIMEOUT: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) # @added 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI sleep_count += 1 if (sleep_count % 100) == 0: logger.info('%s :: spin_process/es still running pid/s - %s' % (skyline_app, str(pids))) else: # All the processes are done, break now. time_to_run = time() - p_starts # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(settings.CRUCIBLE_PROCESSES), time_to_run)) logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(use_range), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app)) for p in pids: p.terminate() # p.join() for p in pids: if p.is_alive(): logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive()))) p.join() while os.path.isfile(metric_check_file): sleep(1)