Source code for crucible.crucible

import logging
try:
    from Queue import Empty
except:
    from queue import Empty
from redis import StrictRedis
# import time
from time import time, sleep
from threading import Thread
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list to reduce memory and number of
# processes
# from multiprocessing import Process, Manager
from multiprocessing import Process
from msgpack import packb
import os
from os.path import join, isfile
from os import kill, getpid, listdir
from sys import exit, version_info
import traceback
import re
import json
import gzip
import requests

# @modified 20200328 - Task #3290: Handle urllib2 in py3
#                      Branch #3262: py3
# Use urlretrieve
# try:
#     import urlparse
# except ImportError:
#     import urllib.parse
# try:
#     import urllib2
# except ImportError:
#     import urllib.request
#     import urllib.error
try:
    import urllib
except:
    # For backwards compatibility with py2 load urlib.request as urllib so
    # that urllib.urlretrieve is available to both as the same module.
    # from urllib import request as urllib
    import urllib.request
    import urllib.error

import errno
import datetime
import shutil

import os.path
# sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
# sys.path.insert(0, os.path.dirname(__file__))

from ast import literal_eval

import settings

# @modified 20200327 - Branch #3262: py3
# from skyline_functions import load_metric_vars, fail_check, mkdir_p
# @modified 20200428 - Feature #3500: webapp - crucible_process_metrics
#                      Feature #1448: Crucible web UI
# Added write_data_to_file and filesafe_metricname to send to Panorama
from skyline_functions import (
    fail_check, mkdir_p, write_data_to_file, filesafe_metricname,
    # @added 20200506 - Feature #3532: Sort all time series
    sort_timeseries,
    # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url
    #                   Bug #3778: Handle single encoded forward slash requests to Graphite
    sanitise_graphite_url)

from crucible_algorithms import run_algorithms

skyline_app = 'crucible'
skyline_app_logger = skyline_app + 'Log'
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = settings.LOG_PATH + '/' + skyline_app + '.log'
skyline_app_loglock = skyline_app_logfile + '.lock'
skyline_app_logwait = skyline_app_logfile + '.wait'

python_version = int(version_info[0])

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.' + settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.' + skyline_app + SERVER_METRIC_PATH

FULL_NAMESPACE = settings.FULL_NAMESPACE
ENABLE_CRUCIBLE_DEBUG = settings.ENABLE_CRUCIBLE_DEBUG
crucible_data_folder = str(settings.CRUCIBLE_DATA_FOLDER)
failed_checks_dir = crucible_data_folder + '/failed_checks'


[docs]class Crucible(Thread): def __init__(self, parent_pid): """ Initialize Crucible """ super(Crucible, self).__init__() self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # Task #3032: Debug number of Python processes and memory use # Branch #3002: docker # Reduce amount of Manager instances that are used as each requires a # copy of entire memory to be copied into each subprocess so this # results in a python process per Manager instance, using as much # memory as the parent. OK on a server, not so much in a container. # Disabled all the Manager() lists below and replaced with Redis sets # self.process_list = Manager().list() # self.metric_variables = Manager().list() # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs] def check_if_parent_is_alive(self): """ Check if the parent process is alive """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: # @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics # Log warning logger.warning('warning :: parent or current process dead') exit(0)
# @added 20200327 - Branch #3262: py3 # Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def new_load_metric_vars(self, metric_vars_file): """ Load the metric variables for a check from a metric check variables file :param metric_vars_file: the path and filename to the metric variables files :type metric_vars_file: str :return: the metric_vars list or ``False`` :rtype: list """ if os.path.isfile(metric_vars_file): logger.info( 'loading metric variables from metric_check_file - %s' % ( str(metric_vars_file))) else: logger.error( 'error :: loading metric variables from metric_check_file - file not found - %s' % ( str(metric_vars_file))) return False metric_vars = [] with open(metric_vars_file) as f: for line in f: no_new_line = line.replace('\n', '') no_equal_line = no_new_line.replace(' = ', ',') array = str(no_equal_line.split(',', 1)) add_line = literal_eval(array) metric_vars.append(add_line) # @added 20200607 - Feature #3630: webapp - crucible_process_training_data # Added training_data_json string_keys = [ 'metric', 'anomaly_dir', 'added_by', 'app', 'run_script', 'graphite_override_uri_parameters', 'training_data_json'] float_keys = ['value'] # @modified 20170127 - Feature #1886: Ionosphere learn - child like parent with evolutionary maturity # Added ionosphere_parent_id, always zero from Analyzer and Mirage # @modified 20200420 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added alert_interval to int_keys and add_to_panorama to the boolean_keys int_keys = [ 'from_timestamp', 'metric_timestamp', 'added_at', 'full_duration', 'ionosphere_parent_id', 'alert_interval'] # @added 20201001 - Task #3748: POC SNAB # Added algorithms_run required to determine the anomalyScore # so this needs to be sent to Ionosphere so Ionosphere # can send it back on an alert, but the same file gets sent to Crucible so... array_keys = ['triggered_algorithms', 'algorithms', 'algorithms_run'] boolean_keys = ['graphite_metric', 'run_crucible_tests', 'add_to_panorama'] metric_vars_array = [] for var_array in metric_vars: key = None value = None if var_array[0] in string_keys: key = var_array[0] value_str = str(var_array[1]).replace("'", '') value = str(value_str) if var_array[0] == 'metric': metric = value if var_array[0] in float_keys: key = var_array[0] value_str = str(var_array[1]).replace("'", '') value = float(value_str) if var_array[0] in int_keys: key = var_array[0] value_str = str(var_array[1]).replace("'", '') value = int(value_str) if var_array[0] in array_keys: key = var_array[0] value = literal_eval(str(var_array[1])) if var_array[0] in boolean_keys: key = var_array[0] if str(var_array[1]) == 'True': value = True else: value = False if key: metric_vars_array.append([key, value]) if len(metric_vars_array) == 0: logger.error( 'error :: loading metric variables - none found' % ( str(metric_vars_file))) return False logger.info('debug :: metric_vars for %s' % str(metric)) logger.info('debug :: %s' % str(metric_vars_array)) return metric_vars_array
[docs] def spin_process(self, i, run_timestamp, metric_check_file): """ Assign a metric for a process to analyze. :param i: python process id :param run_timestamp: the epoch timestamp at which this process was called :param metric_check_file: full path to the metric check file :return: returns True """ child_process_pid = os.getpid() if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('child_process_pid - %s' % str(child_process_pid)) # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # self.process_list.append(child_process_pid) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('processing metric check - %s' % metric_check_file) if not os.path.isfile(str(metric_check_file)): logger.error('error :: file not found - metric_check_file - %s' % (str(metric_check_file))) return check_file_name = os.path.basename(str(metric_check_file)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_name - %s' % check_file_name) check_file_timestamp = check_file_name.split('.', 1)[0] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_timestamp - %s' % str(check_file_timestamp)) check_file_metricname_txt = check_file_name.split('.', 1)[1] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_metricname_txt - %s' % check_file_metricname_txt) check_file_metricname = check_file_metricname_txt.replace('.txt', '') if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_metricname - %s' % check_file_metricname) check_file_metricname_dir = check_file_metricname.replace('.', '/') if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('check_file_metricname_dir - %s' % check_file_metricname_dir) metric_failed_check_dir = failed_checks_dir + '/' + check_file_metricname_dir + '/' + check_file_timestamp if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric_failed_check_dir - %s' % metric_failed_check_dir) # failed_check_file = failed_checks_dir + '/' + check_file_name failed_check_file = metric_failed_check_dir + '/' + check_file_name # Load and validate metric variables try: # @modified 20200327 - Branch #3262: py3 # metric_vars = load_metric_vars(skyline_app, str(metric_check_file)) metric_vars_array = self.new_load_metric_vars(str(metric_check_file)) except: logger.error(traceback.format_exc()) logger.error('error :: failed to import metric variables from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # TBD - a failed check Panorama update will go here, perhaps txt # files are not the only "queue" that will be used, both, but # Panorama, may be just a part of Skyline Flux, the flux DB # would allow for a very nice, distributed "queue" and a # distributed Skyline workforce... # Any Skyline node could just have one role, e.g. lots of # Skyline nodes running crucible only and instead of reading # the local filesystem for input, they could read the Flux DB # queue or both... return # Test metric variables # We use a pythonic methodology to test if the variables are defined, # this ensures that if any of the variables are not set for some reason # we can handle unexpected data or situations gracefully and try and # ensure that the process does not hang. # if len(str(metric_vars.metric)) == 0: # if not metric_vars.metric: # try: # metric_vars.metric # except: # logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # metric = str(metric_vars.metric) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - metric - %s' % metric) metric = None try: # metric_vars.metric key = 'metric' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric = str(value_list[0]) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('debug :: metric variable - metric - %s' % metric) except: logger.error(traceback.format_exc()) logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file)) metric = None if not metric: logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.value) == 0: # if not metric_vars.value: # try: # metric_vars.value # except: # logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # value = str(metric_vars.value) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - value - %s' % (value)) value = None try: key = 'value' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] value = float(value_list[0]) except: logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file)) value = None if not value: # @modified 20181119 - Bug #2708: Failing to load metric vars if value == 0.0: pass else: logger.error('error :: failed to load value variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.from_timestamp) == 0: # if not metric_vars.from_timestamp: # try: # metric_vars.from_timestamp # except: # logger.error('error :: failed to read from_timestamp variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # from_timestamp = str(metric_vars.from_timestamp) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - from_timestamp - %s' % from_timestamp) from_timestamp = None try: key = 'from_timestamp' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] from_timestamp = int(value_list[0]) except: logger.error('error :: failed to read from_timestamp variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.metric_timestamp) == 0: # if not metric_vars.metric_timestamp: # try: # metric_vars.metric_timestamp # except: # logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # metric_timestamp = str(metric_vars.metric_timestamp) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - metric_timestamp - %s' % metric_timestamp) metric_timestamp = None try: key = 'metric_timestamp' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] metric_timestamp = str(value_list[0]) except: logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file)) fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return # if len(metric_vars.algorithms) == 0: # if not metric_vars.algorithms: # algorithms = [] # try: # metric_vars.algorithms # except: # logger.error('error :: failed to read algorithms variable from check file setting to all') # algorithms = ['all'] # # if not algorithms: # # algorithms = [] # # for i_algorithm in metric_vars.algorithms: # # algorithms.append(i_algorithm) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - algorithms - %s' % str(algorithms)) algorithms = [] try: key = 'algorithms' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] algorithms = value_list[0] except: logger.info('failed to read algorithms variable from check file setting to all') algorithms = ['all'] # if len(metric_vars.anomaly_dir) == 0: # if not metric_vars.anomaly_dir: # try: # metric_vars.anomaly_dir # except: # logger.error('error :: failed to read anomaly_dir variable from check file - %s' % (metric_check_file)) # fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) # return # else: # anomaly_dir = str(metric_vars.anomaly_dir) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - anomaly_dir - %s' % anomaly_dir) anomaly_dir = None try: key = 'anomaly_dir' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] anomaly_dir = str(value_list[0]) except: logger.error('failed to read anomaly_dir variable from check file setting to all') fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - anomaly_dir - %s' % anomaly_dir) # if len(str(metric_vars.graphite_metric)) == 0: # try: # metric_vars.graphite_metric # except: # logger.info('failed to read graphite_metric variable from check file setting to False') # # yes this is a string # graphite_metric = 'False' # else: # graphite_metric = str(metric_vars.graphite_metric) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - graphite_metric - %s' % graphite_metric) graphite_metric = None try: key = 'graphite_metric' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] graphite_metric = str(value_list[0]) except: logger.info('failed to read graphite_metric variable from check file setting to False') # yes this is a string graphite_metric = 'False' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - graphite_metric - %s' % graphite_metric) # if len(str(metric_vars.run_crucible_tests)) == 0: try: # metric_vars.run_crucible_tests key = 'run_crucible_tests' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] run_crucible_tests = str(value_list[0]) except: logger.info('failed to read run_crucible_tests variable from check file setting to False') # yes this is a string run_crucible_tests = 'False' # else: # run_crucible_tests = str(metric_vars.run_crucible_tests) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - run_crucible_tests - %s' % run_crucible_tests) try: # metric_vars.added_by key = 'added_by' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] added_by = str(value_list[0]) except: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('failed to read added_by variable from check file setting to crucible - set to crucible') added_by = 'crucible' # else: # added_by = str(metric_vars.added_by) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - added_by - %s' % added_by) try: # metric_vars.run_script key = 'run_script' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] run_script = str(value_list[0]) except: run_script = False if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - run_script - not present set to False') # else: # run_script = str(metric_vars.run_script) # if settings.ENABLE_CRUCIBLE_DEBUG: # logger.info('metric variable - run_script - %s' % run_script) # @added 20190612 - Feature #3108: crucible - graphite_override_uri_parameters_specific_url # This metric variable is used to to declare absolute graphite uri # parameters try: # metric_vars.graphite_override_uri_parameters key = 'graphite_override_uri_parameters' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] graphite_override_uri_parameters = str(value_list[0]) except: logger.info('failed to read graphite_override_uri_parameters variable from check file setting to False') # yes this is a string graphite_override_uri_parameters = False # else: # graphite_override_uri_parameters = str(metric_vars.graphite_override_uri_parameters) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - graphite_override_uri_parameters - %s' % graphite_override_uri_parameters) add_to_panorama = False try: key = 'add_to_panorama' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] add_to_panorama = str(value_list[0]) if add_to_panorama == 'True': add_to_panorama = True else: add_to_panorama = False except: logger.info('failed to read add_to_panorama variable from check file setting to False') add_to_panorama = False if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - add_to_panorama - %s' % str(add_to_panorama)) # @modified 20200420 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added alert_interval try: key = 'alert_interval' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] alert_interval = int(value_list[0]) except: logger.info('failed to read alert_interval variable from check file setting to 0') alert_interval = 0 if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - alert_interval - %s' % str(alert_interval)) # @added 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # In order for metrics to be analysed in Crucible like the # Analyzer or Mirage analysis, the time series data needs to # be padded. Added pad_timeseries in the webapp. padded_timeseries = False # @added 20200607 - Feature #3630: webapp - crucible_process_training_data # Added training_data_json try: key = 'training_data_json' value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key] training_data_json = str(value_list[0]) except: training_data_json = None if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric variable - training_data_json - %s' % str(training_data_json)) # Only check if the metric does not a EXPIRATION_TIME key set, crucible # uses the alert EXPIRATION_TIME for the relevant alert setting contexts # whether that be analyzer, mirage, boundary, etc and sets its own # cache_keys in redis. This prevents large amounts of data being added # in terms of tieseries json and image files, crucible samples at the # same EXPIRATION_TIME as alerts. source_app = 'crucible' expiration_timeout = 1800 remove_all_anomaly_files = False check_expired = False check_time = time() if added_by == 'analyzer' or added_by == 'mirage': if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('Will check %s ALERTS' % added_by) if settings.ENABLE_ALERTS: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('Checking %s ALERTS' % added_by) for alert in settings.ALERTS: ALERT_MATCH_PATTERN = alert[0] METRIC_PATTERN = metric alert_match_pattern = re.compile(ALERT_MATCH_PATTERN) pattern_match = alert_match_pattern.match(METRIC_PATTERN) if pattern_match: source_app = added_by expiration_timeout = alert[2] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('matched - %s - %s - EXPIRATION_TIME is %s' % (source_app, metric, str(expiration_timeout))) check_age = int(check_time) - int(metric_timestamp) if int(check_age) > int(expiration_timeout): check_expired = True if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('the check is older than EXPIRATION_TIME for the metric - not checking - check_expired') if added_by == 'boundary': if settings.BOUNDARY_ENABLE_ALERTS: for alert in settings.BOUNDARY_METRICS: ALERT_MATCH_PATTERN = alert[0] METRIC_PATTERN = metric alert_match_pattern = re.compile(ALERT_MATCH_PATTERN) pattern_match = alert_match_pattern.match(METRIC_PATTERN) if pattern_match: source_app = 'boundary' expiration_timeout = alert[2] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('matched - %s - %s - EXPIRATION_TIME is %s' % (source_app, metric, str(expiration_timeout))) check_age = int(check_time) - int(metric_timestamp) if int(check_age) > int(expiration_timeout): check_expired = True if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('the check is older than EXPIRATION_TIME for the metric - not checking - check_expired') # @added 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # If the check is added by the webapp explicitly set the check_expired # to True so that analysis will run if added_by == 'webapp': check_expired = True cache_key = 'crucible.last_check.%s.%s' % (source_app, metric) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('cache_key - crucible.last_check.%s.%s' % (source_app, metric)) # Only use the cache_key EXPIRATION_TIME if this is not a request to # run_crucible_tests on a timeseries if run_crucible_tests == 'False': if check_expired: logger.info('check_expired - not checking Redis key') last_check = True else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('Checking if cache_key exists') try: last_check = self.redis_conn.get(cache_key) except Exception as e: logger.error('error :: could not query cache_key for %s - %s - %s' % (source_app, metric, e)) logger.info('all anomaly files will be removed') remove_all_anomaly_files = True if not last_check: try: self.redis_conn.setex(cache_key, expiration_timeout, packb(value)) logger.info('set cache_key for %s - %s with timeout of %s' % (source_app, metric, str(expiration_timeout))) except Exception as e: logger.error('error :: could not query cache_key for %s - %s - %s' % (source_app, metric, e)) logger.info('all anomaly files will be removed') remove_all_anomaly_files = True else: if check_expired: logger.info('check_expired - all anomaly files will be removed') remove_all_anomaly_files = True else: logger.info('cache_key is set and not expired for %s - %s - all anomaly files will be removed' % (source_app, metric)) remove_all_anomaly_files = True # anomaly dir if not os.path.exists(str(anomaly_dir)): try: # mkdir_p(skyline_app, str(anomaly_dir)) mkdir_p(anomaly_dir) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('created anomaly dir - %s' % str(anomaly_dir)) except: logger.error('error :: failed to create anomaly_dir - %s' % str(anomaly_dir)) if not os.path.exists(str(anomaly_dir)): logger.error('error :: anomaly_dir does not exist') fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file)) return else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly dir exists - %s' % str(anomaly_dir)) failed_check_file = anomaly_dir + '/' + metric_timestamp + '.failed.check.' + metric + '.txt' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('failed_check_file - %s' % str(failed_check_file)) # Retrieve data from graphite is necessary anomaly_graph = anomaly_dir + '/' + metric + '.png' anomaly_json = anomaly_dir + '/' + metric + '.json' anomaly_json_gz = anomaly_json + '.gz' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_graph - %s' % str(anomaly_graph)) logger.info('anomaly_json - %s' % str(anomaly_json)) logger.info('anomaly_json_gz - %s' % str(anomaly_json_gz)) # Some things added to crucible may not be added by a skyline app per se # and if run_crucible_tests is string True then no anomaly files should # be removed. if run_crucible_tests == 'True': remove_all_anomaly_files = False # Remove check and anomaly files if the metric has a EXPIRATION_TIME # cache_key set if remove_all_anomaly_files: if os.path.isfile(anomaly_graph): try: os.remove(anomaly_graph) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_graph removed - %s' % str(anomaly_graph)) except OSError: pass if os.path.isfile(anomaly_json): try: os.remove(anomaly_json) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json removed - %s' % str(anomaly_json)) except OSError: pass if os.path.isfile(anomaly_json_gz): try: os.remove(anomaly_json_gz) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json_gz removed - %s' % str(anomaly_json_gz)) except OSError: pass anomaly_txt_file = anomaly_dir + '/' + metric + '.txt' if os.path.isfile(anomaly_txt_file): try: os.remove(anomaly_txt_file) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_txt_file removed - %s' % str(anomaly_txt_file)) except OSError: pass # TBD - this data would have to be added to the panaorama DB before # it is removed if os.path.isfile(str(metric_check_file)): try: os.remove(str(metric_check_file)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('metric_check_file removed - %s' % str(metric_check_file)) except OSError: pass if os.path.exists(str(anomaly_dir)): try: os.rmdir(str(anomaly_dir)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_dir removed - %s' % str(anomaly_dir)) except OSError: pass logger.info('check and anomaly files removed') return # Check if the image exists if graphite_metric == 'True': if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('graphite_metric - %s' % (graphite_metric)) # Graphite timeouts connect_timeout = int(settings.GRAPHITE_CONNECT_TIMEOUT) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('connect_timeout - %s' % str(connect_timeout)) read_timeout = int(settings.GRAPHITE_READ_TIMEOUT) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('read_timeout - %s' % str(read_timeout)) graphite_until = datetime.datetime.fromtimestamp(int(metric_timestamp)).strftime('%H:%M_%Y%m%d') graphite_from = datetime.datetime.fromtimestamp(int(from_timestamp)).strftime('%H:%M_%Y%m%d') # graphite URL if settings.GRAPHITE_PORT != '': url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + ':' + settings.GRAPHITE_PORT + '/render/?from=' + graphite_from + '&until=' + graphite_until + '&target=' + metric + '&format=json' else: url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + '/render/?from=' + graphite_from + '&until=' + graphite_until + '&target=' + metric + '&format=json' # @added 20190612 - Feature #3108: crucible - graphite_override_uri_parameters # This metric variable is used to to declare absolute graphite uri # parameters # from=00%3A00_20190527&until=23%3A59_20190612&target=movingMedian(nonNegativeDerivative(stats.zpf-watcher-prod-1-30g-doa2.vda.readTime)%2C24) if graphite_override_uri_parameters: if settings.GRAPHITE_PORT != '': url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + ':' + settings.GRAPHITE_PORT + '/render/?' + graphite_override_uri_parameters + '&format=json' else: url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + '/render/?' + graphite_override_uri_parameters + '&format=json' logger.info('graphite url set from graphite_override_uri_parameters - %s' % (url)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('graphite url - %s' % (url)) if not os.path.isfile(anomaly_graph): image_url = url.replace('&format=json', '') graphite_image_file = anomaly_dir + '/' + metric + '.png' if 'width' not in image_url: image_url += '&width=586' if 'height' not in image_url: image_url += '&height=308' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('graphite image url - %s' % (image_url)) # @modified 20200327 - Feature #3108: crucible - graphite_override_uri_parameters # Branch #3262: py3 # Added graphite_override_uri_parameters conditional if graphite_override_uri_parameters: logger.info('retrieving png - surfacing %s graph from graphite with %s' % (metric, url)) else: logger.info('retrieving png - surfacing %s graph from graphite from %s to %s' % (metric, graphite_from, graphite_until)) # @modified 20200328 - Task #3290: Handle urllib2 in py3 # Branch #3262: py3 # Use urlretrieve - image_url_timeout and image_data not longer # required # image_url_timeout = int(connect_timeout) # image_data = None try: # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests # @modified 20200328 - Task #3290: Handle urllib2 in py3 # Branch #3262: py3 # Use urlretrieve # image_data = urllib2.urlopen(image_url, timeout=image_url_timeout).read() # nosec if python_version == 2: # @modified 20200808 - Task #3608: Update Skyline to Python 3.8.3 and deps # Added nosec for bandit [B310:blacklist] Audit url open for # permitted schemes. Allowing use of file:/ or custom schemes is # often unexpected. if image_url.lower().startswith('http'): urllib.urlretrieve(image_url, graphite_image_file) # nosec else: logger.error( 'error :: %s :: image_url does not start with http - %s' % (str(image_url))) if python_version == 3: # @modified 20200808 - Task #3608: Update Skyline to Python 3.8.3 and deps # Added nosec for bandit [B310:blacklist] Audit url open for # permitted schemes. Allowing use of file:/ or custom schemes is # often unexpected. if image_url.lower().startswith('http'): urllib.request.urlretrieve(image_url, graphite_image_file) # nosec else: logger.error( 'error :: %s :: image_url does not start with http - %s' % (str(image_url))) logger.info('url OK - %s' % (image_url)) # except urllib2.URLError: except: logger.error(traceback.print_exc()) logger.error('error :: url bad - %s' % (image_url)) # image_data = None try: if python_version == 2: os.chmod(graphite_image_file, 0o644) if python_version == 3: os.chmod(graphite_image_file, mode=0o644) logger.info('graphite_image_file permissions set OK - %s' % (graphite_image_file)) except: logger.error(traceback.print_exc()) logger.error('error :: graphite_image_file permissions could not be set - %s' % (graphite_image_file)) # @modified 20200328 - Task #3290: Handle urllib2 in py3 # Branch #3262: py3 # Use urlretrieve so no need to write data to file # if image_data is not None: # with open(graphite_image_file, 'w') as f: # f.write(image_data) # logger.info('retrieved - %s' % (anomaly_graph)) # if python_version == 2: # # @modified 20200327 - Branch #3262: py3 # # os.chmod(graphite_image_file, 0644) # os.chmod(graphite_image_file, 0o644) # if python_version == 3: # os.chmod(graphite_image_file, mode=0o644) # else: # logger.error('error :: failed to retrieved - %s' % (anomaly_graph)) else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_graph file exists - %s' % str(anomaly_graph)) if not os.path.isfile(anomaly_graph): logger.error('error :: retrieve failed to surface %s graph from graphite' % (metric)) else: logger.info('graph image exists - %s' % (anomaly_graph)) # Check if the json exists if not os.path.isfile(anomaly_json_gz): if not os.path.isfile(anomaly_json): logger.info('surfacing timeseries data for %s from graphite from %s to %s' % (metric, graphite_from, graphite_until)) if requests.__version__ >= '2.4.0': use_timeout = (int(connect_timeout), int(read_timeout)) else: use_timeout = int(connect_timeout) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('use_timeout - %s' % (str(use_timeout))) # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url # Bug #3778: Handle single encoded forward slash requests to Graphite sanitised = False sanitised, url = sanitise_graphite_url(skyline_app, url) try: r = requests.get(url, timeout=use_timeout) js = r.json() datapoints = js[0]['datapoints'] if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('data retrieved OK') except: datapoints = [[None, int(graphite_until)]] logger.error('error :: data retrieval failed') converted = [] for datapoint in datapoints: try: new_datapoint = [float(datapoint[1]), float(datapoint[0])] converted.append(new_datapoint) # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests except: # nosec continue try: with open(anomaly_json, 'w') as f: f.write(json.dumps(converted)) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json, 0644) os.chmod(anomaly_json, 0o644) if python_version == 3: os.chmod(anomaly_json, mode=0o644) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('json file - %s' % anomaly_json) except: logger.error(traceback.print_exc()) logger.error('error :: failed to write or chmod anomaly_json - %s' % (anomaly_json)) # @added 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Clean up converted if converted: try: del converted del js del datapoints except: pass if not os.path.isfile(anomaly_json): logger.error('error :: failed to surface %s json from graphite' % (metric)) # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) pass return # @added 20200607 - Feature #3630: webapp - crucible_process_training_data # Added training_data_json if training_data_json: if not os.path.isfile(training_data_json): logger.error('error :: no training data json data found - %s' % training_data_json) # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) pass return else: logger.info('setting anomaly json to training_data_json - %s' % (training_data_json)) anomaly_json = training_data_json # Check timeseries json exists - raw or gz if not os.path.isfile(anomaly_json): if not os.path.isfile(anomaly_json_gz): logger.error('error :: no json data found') # Move metric check file try: shutil.move(metric_check_file, failed_check_file) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) pass return else: logger.info('timeseries json gzip exists - %s' % (anomaly_json_gz)) else: logger.info('timeseries json exists - %s' % (anomaly_json)) # If timeseries json and run_crucible_tests is str(False) gzip and # return here as there is nothing further to do if run_crucible_tests == 'False': if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('run_crucible_tests - %s' % run_crucible_tests) # gzip the json timeseries data if os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('gzipping - %s' % anomaly_json) try: f_in = open(anomaly_json) f_out = gzip.open(anomaly_json_gz, 'wb') f_out.writelines(f_in) f_out.close() f_in.close() os.remove(anomaly_json) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json_gz, 0644) os.chmod(anomaly_json_gz, 0o644) if python_version == 3: os.chmod(anomaly_json_gz, mode=0o644) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('gzipped - %s' % anomaly_json_gz) try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass return except: logger.error('error :: Failed to gzip data file - %s' % str(traceback.print_exc())) try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass return if os.path.isfile(anomaly_json_gz): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('gzip exists - %s' % anomaly_json) try: os.remove(metric_check_file) logger.info('removed check file - %s' % metric_check_file) except OSError: pass return nothing_to_do = 'true - for debug only' # self.check_if_parent_is_alive() # Run crucible algorithms logger.info('running crucible tests - %s' % (metric)) if os.path.isfile(anomaly_json_gz): if not os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('ungzipping - %s' % anomaly_json_gz) try: # with gzip.open(anomaly_json_gz, 'rb') as fr: fr = gzip.open(anomaly_json_gz, 'rb') raw_timeseries = fr.read() fr.close() except Exception as e: logger.error(traceback.print_exc()) logger.error('error :: could not ungzip %s - %s' % (anomaly_json_gz, e)) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('ungzipped') logger.info('writing to - %s' % anomaly_json) with open(anomaly_json, 'w') as fw: fw.write(raw_timeseries) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json done') if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json, 0644) os.chmod(anomaly_json, 0o644) if python_version == 3: os.chmod(anomaly_json, mode=0o644) else: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('No gzip - %s' % anomaly_json_gz) nothing_to_do = 'true - for debug only' if os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomaly_json exists - %s' % anomaly_json) if os.path.isfile(anomaly_json): if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('loading timeseries from - %s' % anomaly_json) timeseries = None try: with open(anomaly_json, 'r') as f: timeseries = json.loads(f.read()) raw_timeseries = f.read() timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']') timeseries = literal_eval(timeseries_array_str) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('loaded time series from - %s' % anomaly_json) except: # logger.error(traceback.format_exc()) logger.info('failed to load with JSON, literal_eval will be tried - %s' % anomaly_json) # @added 20180715 - Task #2444: Evaluate CAD # Task #2446: Optimize Ionosphere # Branch #2270: luminosity # If the json.loads fails use literal_eval if not timeseries: try: with open(anomaly_json, 'r') as f: raw_timeseries = f.read() timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']') timeseries = literal_eval(timeseries_array_str) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('loaded time series with literal_eval from - %s' % anomaly_json) except: logger.info(traceback.format_exc()) logger.error('error :: failed to load JSON - %s' % anomaly_json) else: try: logger.error('error :: file not found - %s' % anomaly_json) shutil.move(metric_check_file, failed_check_file) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(failed_check_file, 0644) os.chmod(failed_check_file, 0o644) if python_version == 3: os.chmod(failed_check_file, mode=0o644) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) pass return if not timeseries: try: logger.info('failing check, no time series from - %s' % anomaly_json) shutil.move(metric_check_file, failed_check_file) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(failed_check_file, 0644) os.chmod(failed_check_file, 0o644) if python_version == 3: os.chmod(failed_check_file, mode=0o644) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) return # @added 20200507 - Feature #3532: Sort all time series # To ensure that there are no unordered timestamps in the time # series which are artefacts of the collector or carbon-relay, sort # all time series by timestamp before analysis. original_timeseries = timeseries if original_timeseries: timeseries = sort_timeseries(original_timeseries) del original_timeseries logger.info('timeseries length: %s' % str(len(timeseries))) start_timestamp = int(timeseries[0][0]) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('start_timestamp - %s' % str(start_timestamp)) end_timestamp = int(timeseries[-1][0]) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('end_timestamp - %s' % str(end_timestamp)) full_duration = end_timestamp - start_timestamp if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('full_duration - %s' % str(full_duration)) # @added 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # In order for metrics to be analysed in Crucible like the # Analyzer or Mirage analysis, the time series data needs to # be padded. Added pad_timeseries in the webapp so check # here if the time series is padded if graphite_override_uri_parameters and added_by == 'webapp': if start_timestamp < from_timestamp: padded_timeseries = True padded_with = from_timestamp - start_timestamp logger.info('padded time series identified, padded with %s seconds' % str(padded_with)) self.check_if_parent_is_alive() run_algorithms_start_timestamp = int(time()) if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('run_algorithms_start_timestamp - %s' % str(run_algorithms_start_timestamp)) # @added 20200427 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Set variables so on fail the process does not hang anomalous = None ensemble = None alert_interval_discarded_anomalies_count = 0 # For debug only but do not remove as this is an item in the final # return nothing_to_do = '' if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('run_algorithms - %s,%s,%s,%s,%s,%s' % (metric, str(end_timestamp), str(full_duration), anomaly_json, skyline_app, str(algorithms))) try: # @modified 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Pass alert_interval, add_to_panaroma and alert_interval_discarded_anomalies_count # anomalous, ensemble = run_algorithms(timeseries, str(metric), end_timestamp, full_duration, str(anomaly_json), skyline_app, algorithms) # @modified 20200422 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added padded_timeseries and from_timestamp anomalous, ensemble, alert_interval_discarded_anomalies_count = run_algorithms(timeseries, str(metric), end_timestamp, full_duration, str(anomaly_json), skyline_app, algorithms, alert_interval, add_to_panorama, padded_timeseries, from_timestamp) except: logger.error('error :: run_algorithms failed - %s' % str(traceback.print_exc())) try: shutil.move(metric_check_file, failed_check_file) if python_version == 2: os.chmod(failed_check_file, 0o644) if python_version == 3: os.chmod(failed_check_file, mode=0o644) logger.info('moved check file to - %s' % failed_check_file) except OSError: logger.error('error :: failed to move check file to - %s' % failed_check_file) pass return run_algorithms_end_timestamp = int(time()) run_algorithms_seconds = run_algorithms_end_timestamp - run_algorithms_start_timestamp if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomalous, ensemble - %s, %s' % (anomalous, str(ensemble))) if anomalous: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('anomalous - %s' % (anomalous)) nothing_to_do = 'true - for debug only' logger.info('run_algorithms took %s seconds' % str(run_algorithms_seconds)) # Update anomaly file # @modified 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Added run_algorithms_seconds and alert_interval_discarded_anomalies crucible_data = 'crucible_tests_run = \'%s\'\n' \ 'crucible_triggered_algorithms = %s\n' \ 'tested_by = \'%s\'\n' \ 'run_algorithms_seconds = \'%s\'\n' \ 'alert_interval_discarded_anomalies = \'%s\'\n' \ % (str(run_timestamp), str(ensemble), str(this_host), str(run_algorithms_seconds), str(alert_interval_discarded_anomalies_count)) crucible_anomaly_file = '%s/%s.txt' % (anomaly_dir, metric) with open(crucible_anomaly_file, 'a') as fh: fh.write(crucible_data) if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(crucible_anomaly_file, 0644) os.chmod(crucible_anomaly_file, 0o644) if python_version == 3: os.chmod(crucible_anomaly_file, mode=0o644) logger.info('updated crucible anomaly file - %s/%s.txt' % (anomaly_dir, metric)) # gzip the json timeseries data after analysis if os.path.isfile(anomaly_json): if not os.path.isfile(anomaly_json_gz): remove_json = False try: f_in = open(anomaly_json) f_out = gzip.open(anomaly_json_gz, 'wb') f_out.writelines(f_in) f_out.close() f_in.close() remove_json = True logger.info('gzipped - %s' % (anomaly_json_gz)) except: logger.error('error :: Failed to gzip data file - %s' % str(traceback.print_exc())) if remove_json: try: if python_version == 2: # @modified 20200327 - Branch #3262: py3 # os.chmod(anomaly_json_gz, 0644) os.chmod(anomaly_json_gz, 0o644) if python_version == 3: os.chmod(anomaly_json_gz, mode=0o644) logger.info('gzipped - %s' % (anomaly_json_gz)) except: logger.error('error :: Failed to chmod anomaly_json_gz file - %s' % str(traceback.print_exc())) try: os.remove(anomaly_json) except: logger.error('error :: Failed to remove anomaly_json file - %s' % str(traceback.print_exc())) else: os.remove(anomaly_json) if run_script: if os.path.isfile(run_script): logger.info('running - %s' % (run_script)) # @modified 20170913 - Task #2160: Test skyline with bandit # Added nosec to exclude from bandit tests os.system('%s %s' % (str(run_script), str(crucible_anomaly_file))) # nosec # @added 20200428 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # Send Skyline consensus anomalie to Panorama if add_to_panorama: added_to_panorama = False user_id = 1 skyline_anomalies_score_file = anomaly_dir + '/' + 'skyline.anomalies_score.txt' if os.path.isfile(skyline_anomalies_score_file): try: with open(skyline_anomalies_score_file) as f: output = f.read() skyline_anomalies = literal_eval(output) except: logger.info(traceback.format_exc()) logger.error('error :: failed to get Skyline anomalies scores from %s' % skyline_anomalies_score_file) skyline_anomalies = None skyline_consensus_anomalies = [] if skyline_anomalies: # skyline_anomalies format # [timestamp, value, anomaly_score, triggered_algorithms] # [1583234400.0, 44.39999999990687, 2, ['histogram_bins', 'median_absolute_deviation']], # Convert float timestamp from Graphite to int for timestamp, value, anomaly_score, triggered_algorithms in skyline_anomalies: if anomaly_score >= settings.CONSENSUS: skyline_consensus_anomalies.append([int(timestamp), value, anomaly_score, triggered_algorithms]) del skyline_anomalies added_at = int(time()) if skyline_consensus_anomalies: sane_metricname = filesafe_metricname(str(metric)) for timestamp, datapoint, anomaly_score, triggered_algorithms in skyline_consensus_anomalies: # To allow multiple Panorama anomaly files to added quickly just # increment the added_at by 1 seconds so that all the files have a # unique name added_at += 1 # Note: # The values are enclosed is single quoted intentionally # as the imp.load_source used results in a shift in the # decimal position when double quoted, e.g. # value = "5622.0" gets imported as # 2016-03-02 12:53:26 :: 28569 :: metric variable - value - 562.2 # single quoting results in the desired, # 2016-03-02 13:16:17 :: 1515 :: metric variable - value - 5622.0 source = 'graphite' panaroma_anomaly_data = 'metric = \'%s\'\n' \ 'value = \'%s\'\n' \ 'from_timestamp = \'%s\'\n' \ 'metric_timestamp = \'%s\'\n' \ 'algorithms = %s\n' \ 'triggered_algorithms = %s\n' \ 'app = \'%s\'\n' \ 'source = \'%s\'\n' \ 'added_by = \'%s\'\n' \ 'added_at = \'%s\'\n' \ 'label = \'added by Crucible\'\n' \ 'user_id = \'%s\'\n' \ % (metric, str(datapoint), from_timestamp, str(timestamp), str(settings.ALGORITHMS), triggered_algorithms, skyline_app, source, this_host, str(added_at), str(user_id)) # Create an anomaly file with details about the anomaly panaroma_anomaly_file = '%s/%s.%s.txt' % ( settings.PANORAMA_CHECK_PATH, added_at, sane_metricname) try: write_data_to_file( skyline_app, panaroma_anomaly_file, 'w', panaroma_anomaly_data) logger.info('added panorama anomaly file :: %s' % (panaroma_anomaly_file)) added_to_panorama = True except: logger.error(traceback.format_exc()) logger.error('error :: send_crucible_job_metric_to_panorama - failed to add panorama anomaly file :: %s' % (panaroma_anomaly_file)) if added_to_panorama: crucible_sent_to_panorama_file = '%s/%s.%s.%s.sent_to_panorama.txt' % ( anomaly_dir, str(added_at), sane_metricname) panorama_done_data = [added_at, int(user_id), skyline_consensus_anomalies] try: write_data_to_file( skyline_app, crucible_sent_to_panorama_file, 'w', str(panorama_done_data)) logger.info('added panorama crucible job file :: %s' % (crucible_sent_to_panorama_file)) except: logger.error(traceback.format_exc()) logger.error('error :: crucible send to panorama - failed to add panorama file :: %s' % (crucible_sent_to_panorama_file)) try: os.remove(metric_check_file) logger.info('complete removed check file - %s %s' % (metric_check_file, nothing_to_do)) except OSError: pass
[docs] def run(self): """ Called when the process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) pass now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) pass else: logger.info('bin/%s.d log management done' % skyline_app) logger.info('process intialized') while 1: now = time() if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('process started - %s' % int(now)) # Make sure check_dir exists and has not been removed try: if settings.ENABLE_CRUCIBLE_DEBUG: logger.info('checking check dir exists - %s' % settings.CRUCIBLE_CHECK_PATH) os.path.exists(settings.CRUCIBLE_CHECK_PATH) except: logger.error('error :: check dir did not exist - %s' % settings.CRUCIBLE_CHECK_PATH) mkdir_p(settings.CRUCIBLE_CHECK_PATH) logger.info('check dir created - %s' % settings.CRUCIBLE_CHECK_PATH) os.path.exists(settings.CRUCIBLE_CHECK_PATH) # continue # Make sure Redis is up try: self.redis_conn.ping() logger.info('connected to redis at socket path %s' % settings.REDIS_SOCKET_PATH) except: logger.info('skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(10) logger.info('connecting to redis at socket path %s' % settings.REDIS_SOCKET_PATH) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) continue """ Determine if any metric has been added to test """ while True: # Report app up self.redis_conn.setex(skyline_app, 120, now) metric_var_files = [f for f in listdir(settings.CRUCIBLE_CHECK_PATH) if isfile(join(settings.CRUCIBLE_CHECK_PATH, f))] # @added 20200428 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI # If crucible is busy, stop processing around midnight to allow # for log rotation as log rotation can make the process hang if # running pause_for_log_rotation = False current_now = datetime.datetime.now() current_hour = current_now.strftime('%H') if current_hour == '23': current_minute = current_now.strftime('%M') before_midnight_minutes = ['55', '56', '57', '58', '59'] if current_minute in before_midnight_minutes: logger.info('setting metric_var_files to [] for log rotation') metric_var_files = [] pause_for_log_rotation = True if current_hour == '00': current_minute = current_now.strftime('%M') after_midnight_minutes = ['00', '01', '02'] if current_minute in after_midnight_minutes: logger.info('setting metric_var_files to [] for log rotation') metric_var_files = [] pause_for_log_rotation = True # if len(metric_var_files) == 0: if not metric_var_files: logger.info('sleeping 10 no metric check files') sleep(10) # Discover metric to analyze metric_var_files = '' metric_var_files = [f for f in listdir(settings.CRUCIBLE_CHECK_PATH) if isfile(join(settings.CRUCIBLE_CHECK_PATH, f))] if pause_for_log_rotation: logger.info('setting metric_var_files to [] for log rotation') metric_var_files = [] # if len(metric_var_files) > 0: if metric_var_files: break metric_var_files_sorted = sorted(metric_var_files) metric_check_file = settings.CRUCIBLE_CHECK_PATH + "/" + str(metric_var_files_sorted[0]) # @added 20200421 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # TODO # Handled multiple processes # Prioritise current metrics over webapp metrics logger.info('assigning check for processing - %s' % str(metric_var_files_sorted[0])) # Reset process_list # @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage # try: # self.process_list[:] = [] # except: # logger.error('error :: failed to reset self.process_list') # Spawn processes pids = [] spawned_pids = [] pid_count = 0 run_timestamp = int(now) # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES use_range = 1 if len(metric_var_files_sorted) > 1: if settings.CRUCIBLE_PROCESSES > 1: if len(metric_var_files_sorted) < settings.CRUCIBLE_PROCESSES: use_range = settings.CRUCIBLE_PROCESSES - len(metric_var_files_sorted) else: use_range = settings.CRUCIBLE_PROCESSES logger.info('dynamically determined to submit to and use %s processors' % str(use_range)) logger.info('will use %s processors' % str(use_range)) # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # for i in range(1, settings.CRUCIBLE_PROCESSES + 1): for i in range(1, use_range + 1): # @added 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES if len(metric_var_files_sorted) > 1: if use_range > 1: if i > 1: list_element = i - 1 metric_check_file = settings.CRUCIBLE_CHECK_PATH + "/" + str(metric_var_files_sorted[list_element]) logger.info('assigning additional check to processor %s for processing - %s' % (str(i), str(metric_var_files_sorted[list_element]))) p = Process(target=self.spin_process, args=(i, run_timestamp, str(metric_check_file))) pids.append(p) pid_count += 1 # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.CRUCIBLE_PROCESSES))) logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(use_range))) p.start() spawned_pids.append(p.pid) # Send wait signal to zombie processes # for p in pids: # p.join() # Self monitor processes and terminate if any spin_process has run # for longer than CRUCIBLE_TESTS_TIMEOUT p_starts = time() sleep_count = 0 while time() - p_starts <= settings.CRUCIBLE_TESTS_TIMEOUT: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) # @added 20200421 - Feature #3500: webapp - crucible_process_metrics # Feature #1448: Crucible web UI sleep_count += 1 if (sleep_count % 100 == 0): logger.info('%s :: spin_process/es still running pid/s - %s' % (skyline_app, str(pids))) else: # All the processes are done, break now. time_to_run = time() - p_starts # @modified 20200427 - Feature #3516: Handle multiple CRUCIBLE_PROCESSES # logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(settings.CRUCIBLE_PROCESSES), time_to_run)) logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(use_range), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app)) for p in pids: p.terminate() # p.join() for p in pids: if p.is_alive(): logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive()))) p.join() while os.path.isfile(metric_check_file): sleep(1)