import logging
try:
from Queue import Empty
except:
from queue import Empty
from redis import StrictRedis
import time
from time import time, sleep
from threading import Thread
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Use Redis sets in place of Manager().list to reduce memory and number of
# processes
# from multiprocessing import Process, Manager
from multiprocessing import Process
from msgpack import packb
import os
from os.path import join, isfile
from os import kill, getpid, listdir
from sys import exit, version_info
import traceback
import re
import json
import gzip
import requests
try:
import urlparse
except ImportError:
import urllib.parse
try:
import urllib2
except ImportError:
import urllib.request
import urllib.error
import errno
import datetime
import shutil
import os.path
# sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
# sys.path.insert(0, os.path.dirname(__file__))
from ast import literal_eval
import settings
from skyline_functions import load_metric_vars, fail_check, mkdir_p
from crucible_algorithms import run_algorithms
skyline_app = 'crucible'
skyline_app_logger = skyline_app + 'Log'
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = settings.LOG_PATH + '/' + skyline_app + '.log'
skyline_app_loglock = skyline_app_logfile + '.lock'
skyline_app_logwait = skyline_app_logfile + '.wait'
python_version = int(version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.' + settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.' + skyline_app + SERVER_METRIC_PATH
FULL_NAMESPACE = settings.FULL_NAMESPACE
ENABLE_CRUCIBLE_DEBUG = settings.ENABLE_CRUCIBLE_DEBUG
crucible_data_folder = str(settings.CRUCIBLE_DATA_FOLDER)
failed_checks_dir = crucible_data_folder + '/failed_checks'
[docs]class Crucible(Thread):
def __init__(self, parent_pid):
"""
Initialize Crucible
"""
super(Crucible, self).__init__()
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# Task #3032: Debug number of Python processes and memory use
# Branch #3002: docker
# Reduce amount of Manager instances that are used as each requires a
# copy of entire memory to be copied into each subprocess so this
# results in a python process per Manager instance, using as much
# memory as the parent. OK on a server, not so much in a container.
# Disabled all the Manager() lists below and replaced with Redis sets
# self.process_list = Manager().list()
# self.metric_variables = Manager().list()
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs] def check_if_parent_is_alive(self):
"""
Check if the parent process is alive
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
exit(0)
[docs] def spin_process(self, i, run_timestamp, metric_check_file):
"""
Assign a metric for a process to analyze.
:param i: python process id
:param run_timestamp: the epoch timestamp at which this process was called
:param metric_check_file: full path to the metric check file
:return: returns True
"""
child_process_pid = os.getpid()
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('child_process_pid - %s' % str(child_process_pid))
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# self.process_list.append(child_process_pid)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('processing metric check - %s' % metric_check_file)
if not os.path.isfile(str(metric_check_file)):
logger.error('error :: file not found - metric_check_file - %s' % (str(metric_check_file)))
return
check_file_name = os.path.basename(str(metric_check_file))
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('check_file_name - %s' % check_file_name)
check_file_timestamp = check_file_name.split('.', 1)[0]
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('check_file_timestamp - %s' % str(check_file_timestamp))
check_file_metricname_txt = check_file_name.split('.', 1)[1]
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('check_file_metricname_txt - %s' % check_file_metricname_txt)
check_file_metricname = check_file_metricname_txt.replace('.txt', '')
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('check_file_metricname - %s' % check_file_metricname)
check_file_metricname_dir = check_file_metricname.replace('.', '/')
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('check_file_metricname_dir - %s' % check_file_metricname_dir)
metric_failed_check_dir = failed_checks_dir + '/' + check_file_metricname_dir + '/' + check_file_timestamp
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric_failed_check_dir - %s' % metric_failed_check_dir)
# failed_check_file = failed_checks_dir + '/' + check_file_name
failed_check_file = metric_failed_check_dir + '/' + check_file_name
# Load and validate metric variables
try:
metric_vars = load_metric_vars(skyline_app, str(metric_check_file))
except:
logger.error('error :: failed to import metric variables from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
# TBD - a failed check Panorama update will go here, perhaps txt
# files are not the only "queue" that will be used, both, but
# Panorama, may be just a part of Skyline Flux, the flux DB
# would allow for a very nice, distributed "queue" and a
# distributed Skyline workforce...
# Any Skyline node could just have one role, e.g. lots of
# Skyline nodes running crucible only and instead of reading
# the local filesystem for input, they could read the Flux DB
# queue or both...
return
# Test metric variables
# We use a pythonic methodology to test if the variables are defined,
# this ensures that if any of the variables are not set for some reason
# we can handle unexpected data or situations gracefully and try and
# ensure that the process does not hang.
# if len(str(metric_vars.metric)) == 0:
# if not metric_vars.metric:
try:
metric_vars.metric
except:
logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
else:
metric = str(metric_vars.metric)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - metric - %s' % metric)
# if len(metric_vars.value) == 0:
# if not metric_vars.value:
try:
metric_vars.value
except:
logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
else:
value = str(metric_vars.value)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - value - %s' % (value))
# if len(metric_vars.from_timestamp) == 0:
# if not metric_vars.from_timestamp:
try:
metric_vars.from_timestamp
except:
logger.error('error :: failed to read from_timestamp variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
else:
from_timestamp = str(metric_vars.from_timestamp)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - from_timestamp - %s' % from_timestamp)
# if len(metric_vars.metric_timestamp) == 0:
# if not metric_vars.metric_timestamp:
try:
metric_vars.metric_timestamp
except:
logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
else:
metric_timestamp = str(metric_vars.metric_timestamp)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - metric_timestamp - %s' % metric_timestamp)
# if len(metric_vars.algorithms) == 0:
# if not metric_vars.algorithms:
algorithms = []
try:
metric_vars.algorithms
except:
logger.error('error :: failed to read algorithms variable from check file setting to all')
algorithms = ['all']
# if not algorithms:
# algorithms = []
# for i_algorithm in metric_vars.algorithms:
# algorithms.append(i_algorithm)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - algorithms - %s' % str(algorithms))
# if len(metric_vars.anomaly_dir) == 0:
# if not metric_vars.anomaly_dir:
try:
metric_vars.anomaly_dir
except:
logger.error('error :: failed to read anomaly_dir variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
else:
anomaly_dir = str(metric_vars.anomaly_dir)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - anomaly_dir - %s' % anomaly_dir)
# if len(str(metric_vars.graphite_metric)) == 0:
try:
metric_vars.graphite_metric
except:
logger.info('failed to read graphite_metric variable from check file setting to False')
# yes this is a string
graphite_metric = 'False'
else:
graphite_metric = str(metric_vars.graphite_metric)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - graphite_metric - %s' % graphite_metric)
# if len(str(metric_vars.run_crucible_tests)) == 0:
try:
metric_vars.run_crucible_tests
except:
logger.info('failed to read run_crucible_tests variable from check file setting to False')
# yes this is a string
run_crucible_tests = 'False'
else:
run_crucible_tests = str(metric_vars.run_crucible_tests)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - run_crucible_tests - %s' % run_crucible_tests)
try:
metric_vars.added_by
except:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('failed to read added_by variable from check file setting to crucible - set to crucible')
added_by = 'crucible'
else:
added_by = str(metric_vars.added_by)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - added_by - %s' % added_by)
try:
metric_vars.run_script
except:
run_script = False
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - run_script - not present set to False')
else:
run_script = str(metric_vars.run_script)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - run_script - %s' % run_script)
# @added 20190612 - Feature #3108: crucible - graphite_override_uri_parameters_specific_url
# This metric variable is used to to declare absolute graphite uri
# parameters
try:
metric_vars.graphite_override_uri_parameters
except:
logger.info('failed to read graphite_override_uri_parameters variable from check file setting to False')
# yes this is a string
graphite_override_uri_parameters = False
else:
graphite_override_uri_parameters = str(metric_vars.graphite_override_uri_parameters)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric variable - graphite_override_uri_parameters - %s' % graphite_override_uri_parameters)
# Only check if the metric does not a EXPIRATION_TIME key set, crucible
# uses the alert EXPIRATION_TIME for the relevant alert setting contexts
# whether that be analyzer, mirage, boundary, etc and sets its own
# cache_keys in redis. This prevents large amounts of data being added
# in terms of tieseries json and image files, crucible samples at the
# same EXPIRATION_TIME as alerts.
source_app = 'crucible'
expiration_timeout = 1800
remove_all_anomaly_files = False
check_expired = False
check_time = time()
if added_by == 'analyzer' or added_by == 'mirage':
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('Will check %s ALERTS' % added_by)
if settings.ENABLE_ALERTS:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('Checking %s ALERTS' % added_by)
for alert in settings.ALERTS:
ALERT_MATCH_PATTERN = alert[0]
METRIC_PATTERN = metric
alert_match_pattern = re.compile(ALERT_MATCH_PATTERN)
pattern_match = alert_match_pattern.match(METRIC_PATTERN)
if pattern_match:
source_app = added_by
expiration_timeout = alert[2]
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('matched - %s - %s - EXPIRATION_TIME is %s' % (source_app, metric, str(expiration_timeout)))
check_age = int(check_time) - int(metric_timestamp)
if int(check_age) > int(expiration_timeout):
check_expired = True
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('the check is older than EXPIRATION_TIME for the metric - not checking - check_expired')
if added_by == 'boundary':
if settings.BOUNDARY_ENABLE_ALERTS:
for alert in settings.BOUNDARY_METRICS:
ALERT_MATCH_PATTERN = alert[0]
METRIC_PATTERN = metric
alert_match_pattern = re.compile(ALERT_MATCH_PATTERN)
pattern_match = alert_match_pattern.match(METRIC_PATTERN)
if pattern_match:
source_app = 'boundary'
expiration_timeout = alert[2]
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('matched - %s - %s - EXPIRATION_TIME is %s' % (source_app, metric, str(expiration_timeout)))
check_age = int(check_time) - int(metric_timestamp)
if int(check_age) > int(expiration_timeout):
check_expired = True
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('the check is older than EXPIRATION_TIME for the metric - not checking - check_expired')
cache_key = 'crucible.last_check.%s.%s' % (source_app, metric)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('cache_key - crucible.last_check.%s.%s' % (source_app, metric))
# Only use the cache_key EXPIRATION_TIME if this is not a request to
# run_crucible_tests on a timeseries
if run_crucible_tests == 'False':
if check_expired:
logger.info('check_expired - not checking Redis key')
last_check = True
else:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('Checking if cache_key exists')
try:
last_check = self.redis_conn.get(cache_key)
except Exception as e:
logger.error('error :: could not query cache_key for %s - %s - %s' % (source_app, metric, e))
logger.info('all anomaly files will be removed')
remove_all_anomaly_files = True
if not last_check:
try:
self.redis_conn.setex(cache_key, expiration_timeout, packb(value))
logger.info('set cache_key for %s - %s with timeout of %s' % (source_app, metric, str(expiration_timeout)))
except Exception as e:
logger.error('error :: could not query cache_key for %s - %s - %s' % (source_app, metric, e))
logger.info('all anomaly files will be removed')
remove_all_anomaly_files = True
else:
if check_expired:
logger.info('check_expired - all anomaly files will be removed')
remove_all_anomaly_files = True
else:
logger.info('cache_key is set and not expired for %s - %s - all anomaly files will be removed' % (source_app, metric))
remove_all_anomaly_files = True
# anomaly dir
if not os.path.exists(str(anomaly_dir)):
try:
# mkdir_p(skyline_app, str(anomaly_dir))
mkdir_p(anomaly_dir)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('created anomaly dir - %s' % str(anomaly_dir))
except:
logger.error('error :: failed to create anomaly_dir - %s' % str(anomaly_dir))
if not os.path.exists(str(anomaly_dir)):
logger.error('error :: anomaly_dir does not exist')
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
else:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly dir exists - %s' % str(anomaly_dir))
failed_check_file = anomaly_dir + '/' + metric_timestamp + '.failed.check.' + metric + '.txt'
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('failed_check_file - %s' % str(failed_check_file))
# Retrieve data from graphite is necessary
anomaly_graph = anomaly_dir + '/' + metric + '.png'
anomaly_json = anomaly_dir + '/' + metric + '.json'
anomaly_json_gz = anomaly_json + '.gz'
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_graph - %s' % str(anomaly_graph))
logger.info('anomaly_json - %s' % str(anomaly_json))
logger.info('anomaly_json_gz - %s' % str(anomaly_json_gz))
# Some things added to crucible may not be added by a skyline app per se
# and if run_crucible_tests is string True then no anomaly files should
# be removed.
if run_crucible_tests == 'True':
remove_all_anomaly_files = False
# Remove check and anomaly files if the metric has a EXPIRATION_TIME
# cache_key set
if remove_all_anomaly_files:
if os.path.isfile(anomaly_graph):
try:
os.remove(anomaly_graph)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_graph removed - %s' % str(anomaly_graph))
except OSError:
pass
if os.path.isfile(anomaly_json):
try:
os.remove(anomaly_json)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_json removed - %s' % str(anomaly_json))
except OSError:
pass
if os.path.isfile(anomaly_json_gz):
try:
os.remove(anomaly_json_gz)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_json_gz removed - %s' % str(anomaly_json_gz))
except OSError:
pass
anomaly_txt_file = anomaly_dir + '/' + metric + '.txt'
if os.path.isfile(anomaly_txt_file):
try:
os.remove(anomaly_txt_file)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_txt_file removed - %s' % str(anomaly_txt_file))
except OSError:
pass
# TBD - this data would have to be added to the panaorama DB before
# it is removed
if os.path.isfile(str(metric_check_file)):
try:
os.remove(str(metric_check_file))
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('metric_check_file removed - %s' % str(metric_check_file))
except OSError:
pass
if os.path.exists(str(anomaly_dir)):
try:
os.rmdir(str(anomaly_dir))
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_dir removed - %s' % str(anomaly_dir))
except OSError:
pass
logger.info('check and anomaly files removed')
return
# Check if the image exists
if graphite_metric == 'True':
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('graphite_metric - %s' % (graphite_metric))
# Graphite timeouts
connect_timeout = int(settings.GRAPHITE_CONNECT_TIMEOUT)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('connect_timeout - %s' % str(connect_timeout))
read_timeout = int(settings.GRAPHITE_READ_TIMEOUT)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('read_timeout - %s' % str(read_timeout))
graphite_until = datetime.datetime.fromtimestamp(int(metric_timestamp)).strftime('%H:%M_%Y%m%d')
graphite_from = datetime.datetime.fromtimestamp(int(from_timestamp)).strftime('%H:%M_%Y%m%d')
# graphite URL
if settings.GRAPHITE_PORT != '':
url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + ':' + settings.GRAPHITE_PORT + '/render/?from=' + graphite_from + '&until=' + graphite_until + '&target=' + metric + '&format=json'
else:
url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + '/render/?from=' + graphite_from + '&until=' + graphite_until + '&target=' + metric + '&format=json'
# @added 20190612 - Feature #3108: crucible - graphite_override_uri_parameters
# This metric variable is used to to declare absolute graphite uri
# parameters
#from=00%3A00_20190527&until=23%3A59_20190612&target=movingMedian(nonNegativeDerivative(stats.zpf-watcher-prod-1-30g-doa2.vda.readTime)%2C24)
if graphite_override_uri_parameters:
if settings.GRAPHITE_PORT != '':
url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + ':' + settings.GRAPHITE_PORT + '/render/?' + graphite_override_uri_parameters + '&format=json'
else:
url = settings.GRAPHITE_PROTOCOL + '://' + settings.GRAPHITE_HOST + '/render/?' + graphite_override_uri_parameters + '&format=json'
logger.info('graphite url set from graphite_override_uri_parameters - %s' % (url))
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('graphite url - %s' % (url))
if not os.path.isfile(anomaly_graph):
logger.info('retrieving png - surfacing %s graph from graphite from %s to %s' % (metric, graphite_from, graphite_until))
image_url = url.replace('&format=json', '')
graphite_image_file = anomaly_dir + '/' + metric + '.png'
if 'width' not in image_url:
image_url += '&width=586'
if 'height' not in image_url:
image_url += '&height=308'
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('graphite image url - %s' % (image_url))
image_url_timeout = int(connect_timeout)
image_data = None
try:
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
image_data = urllib2.urlopen(image_url, timeout=image_url_timeout).read() # nosec
logger.info('url OK - %s' % (image_url))
except urllib2.URLError:
image_data = None
logger.error('error :: url bad - %s' % (image_url))
if image_data is not None:
with open(graphite_image_file, 'w') as f:
f.write(image_data)
logger.info('retrieved - %s' % (anomaly_graph))
if python_version == 2:
os.chmod(graphite_image_file, 0644)
if python_version == 3:
os.chmod(graphite_image_file, mode=0o644)
else:
logger.error('error :: failed to retrieved - %s' % (anomaly_graph))
else:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_graph file exists - %s' % str(anomaly_graph))
if not os.path.isfile(anomaly_graph):
logger.error('error :: retrieve failed to surface %s graph from graphite' % (metric))
else:
logger.info('graph image exists - %s' % (anomaly_graph))
# Check if the json exists
if not os.path.isfile(anomaly_json_gz):
if not os.path.isfile(anomaly_json):
logger.info('surfacing timeseries data for %s from graphite from %s to %s' % (metric, graphite_from, graphite_until))
if requests.__version__ >= '2.4.0':
use_timeout = (int(connect_timeout), int(read_timeout))
else:
use_timeout = int(connect_timeout)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('use_timeout - %s' % (str(use_timeout)))
try:
r = requests.get(url, timeout=use_timeout)
js = r.json()
datapoints = js[0]['datapoints']
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('data retrieved OK')
except:
datapoints = [[None, int(graphite_until)]]
logger.error('error :: data retrieval failed')
converted = []
for datapoint in datapoints:
try:
new_datapoint = [float(datapoint[1]), float(datapoint[0])]
converted.append(new_datapoint)
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
except: # nosec
continue
with open(anomaly_json, 'w') as f:
f.write(json.dumps(converted))
if python_version == 2:
os.chmod(anomaly_json, 0644)
if python_version == 3:
os.chmod(anomaly_json, mode=0o644)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('json file - %s' % anomaly_json)
if not os.path.isfile(anomaly_json):
logger.error('error :: failed to surface %s json from graphite' % (metric))
# Move metric check file
try:
shutil.move(metric_check_file, failed_check_file)
logger.info('moved check file to - %s' % failed_check_file)
except OSError:
logger.error('error :: failed to move check file to - %s' % failed_check_file)
pass
return
# Check timeseries json exists - raw or gz
if not os.path.isfile(anomaly_json):
if not os.path.isfile(anomaly_json_gz):
logger.error('error :: no json data found')
# Move metric check file
try:
shutil.move(metric_check_file, failed_check_file)
logger.info('moved check file to - %s' % failed_check_file)
except OSError:
logger.error('error :: failed to move check file to - %s' % failed_check_file)
pass
return
else:
logger.info('timeseries json gzip exists - %s' % (anomaly_json_gz))
else:
logger.info('timeseries json exists - %s' % (anomaly_json))
# If timeseries json and run_crucible_tests is str(False) gzip and
# return here as there is nothing further to do
if run_crucible_tests == 'False':
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('run_crucible_tests - %s' % run_crucible_tests)
# gzip the json timeseries data
if os.path.isfile(anomaly_json):
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('gzipping - %s' % anomaly_json)
try:
f_in = open(anomaly_json)
f_out = gzip.open(anomaly_json_gz, 'wb')
f_out.writelines(f_in)
f_out.close()
f_in.close()
os.remove(anomaly_json)
if python_version == 2:
os.chmod(anomaly_json_gz, 0644)
if python_version == 3:
os.chmod(anomaly_json_gz, mode=0o644)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('gzipped - %s' % anomaly_json_gz)
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
pass
return
except:
logger.error('error :: Failed to gzip data file - %s' % str(traceback.print_exc()))
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
pass
return
if os.path.isfile(anomaly_json_gz):
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('gzip exists - %s' % anomaly_json)
try:
os.remove(metric_check_file)
logger.info('removed check file - %s' % metric_check_file)
except OSError:
pass
return
nothing_to_do = 'true - for debug only'
# self.check_if_parent_is_alive()
# Run crucible algorithms
logger.info('running crucible tests - %s' % (metric))
if os.path.isfile(anomaly_json_gz):
if not os.path.isfile(anomaly_json):
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('ungzipping - %s' % anomaly_json_gz)
try:
# with gzip.open(anomaly_json_gz, 'rb') as fr:
fr = gzip.open(anomaly_json_gz, 'rb')
raw_timeseries = fr.read()
fr.close()
except Exception as e:
logger.error('error :: could not ungzip %s - %s' % (anomaly_json_gz, e))
traceback.print_exc()
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('ungzipped')
logger.info('writing to - %s' % anomaly_json)
with open(anomaly_json, 'w') as fw:
fw.write(raw_timeseries)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_json done')
if python_version == 2:
os.chmod(anomaly_json, 0644)
if python_version == 3:
os.chmod(anomaly_json, mode=0o644)
else:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('No gzip - %s' % anomaly_json_gz)
nothing_to_do = 'true - for debug only'
if os.path.isfile(anomaly_json):
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomaly_json exists - %s' % anomaly_json)
if os.path.isfile(anomaly_json):
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('loading timeseries from - %s' % anomaly_json)
timeseries = None
try:
with open(anomaly_json, 'r') as f:
timeseries = json.loads(f.read())
raw_timeseries = f.read()
timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']')
timeseries = literal_eval(timeseries_array_str)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('loaded time series from - %s' % anomaly_json)
except:
# logger.error(traceback.format_exc())
logger.info('failed to load with JSON, literal_eval will be tried - %s' % anomaly_json)
# @added 20180715 - Task #2444: Evaluate CAD
# Task #2446: Optimize Ionosphere
# Branch #2270: luminosity
# If the json.loads fails use literal_eval
if not timeseries:
try:
with open(anomaly_json, 'r') as f:
raw_timeseries = f.read()
timeseries_array_str = str(raw_timeseries).replace('(', '[').replace(')', ']')
timeseries = literal_eval(timeseries_array_str)
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('loaded time series with literal_eval from - %s' % anomaly_json)
except:
logger.info(traceback.format_exc())
logger.error('error :: failed to load JSON - %s' % anomaly_json)
else:
try:
logger.error('error :: file not found - %s' % anomaly_json)
shutil.move(metric_check_file, failed_check_file)
if python_version == 2:
os.chmod(failed_check_file, 0644)
if python_version == 3:
os.chmod(failed_check_file, mode=0o644)
logger.info('moved check file to - %s' % failed_check_file)
except OSError:
logger.error('error :: failed to move check file to - %s' % failed_check_file)
pass
return
start_timestamp = int(timeseries[0][0])
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('start_timestamp - %s' % str(start_timestamp))
end_timestamp = int(timeseries[-1][0])
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('end_timestamp - %s' % str(end_timestamp))
full_duration = end_timestamp - start_timestamp
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('full_duration - %s' % str(full_duration))
self.check_if_parent_is_alive()
run_algorithms_start_timestamp = int(time())
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('run_algorithms_start_timestamp - %s' % str(run_algorithms_start_timestamp))
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('run_algorithms - %s,%s,%s,%s,%s,%s' % (metric, str(end_timestamp), str(full_duration), anomaly_json, skyline_app, str(algorithms)))
try:
anomalous, ensemble = run_algorithms(timeseries, str(metric), end_timestamp, full_duration, str(anomaly_json), skyline_app, algorithms)
except:
logger.error('error :: run_algorithms failed - %s' % str(traceback.print_exc()))
run_algorithms_end_timestamp = int(time())
run_algorithms_seconds = run_algorithms_end_timestamp - run_algorithms_start_timestamp
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomalous, ensemble - %s, %s' % (anomalous, str(ensemble)))
if anomalous:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('anomalous - %s' % (anomalous))
nothing_to_do = 'true - for debug only'
logger.info('run_algorithms took %s seconds' % str(run_algorithms_seconds))
# Update anomaly file
crucible_data = 'crucible_tests_run = "%s"\n' \
'crucible_triggered_algorithms = %s\n' \
'tested_by = "%s"\n' \
% (str(run_timestamp), str(ensemble), str(this_host))
crucible_anomaly_file = '%s/%s.txt' % (anomaly_dir, metric)
with open(crucible_anomaly_file, 'a') as fh:
fh.write(crucible_data)
if python_version == 2:
os.chmod(crucible_anomaly_file, 0644)
if python_version == 3:
os.chmod(crucible_anomaly_file, mode=0o644)
logger.info('updated crucible anomaly file - %s/%s.txt' % (anomaly_dir, metric))
# gzip the json timeseries data after analysis
if os.path.isfile(anomaly_json):
if not os.path.isfile(anomaly_json_gz):
try:
f_in = open(anomaly_json)
f_out = gzip.open(anomaly_json_gz, 'wb')
f_out.writelines(f_in)
f_out.close()
f_in.close()
os.remove(anomaly_json)
if python_version == 2:
os.chmod(anomaly_json_gz, 0644)
if python_version == 3:
os.chmod(anomaly_json_gz, mode=0o644)
logger.info('gzipped - %s' % (anomaly_json_gz))
except:
logger.error('error :: Failed to gzip data file - %s' % str(traceback.print_exc()))
else:
os.remove(anomaly_json)
if run_script:
if os.path.isfile(run_script):
logger.info('running - %s' % (run_script))
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
os.system('%s %s' % (str(run_script), str(crucible_anomaly_file))) # nosec
# Remove metric check file
nothing_to_do = ''
try:
os.remove(metric_check_file)
logger.info('complete removed check file - %s %s' % (metric_check_file, nothing_to_do))
except OSError:
pass
[docs] def run(self):
"""
Called when the process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
pass
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
pass
else:
logger.info('bin/%s.d log management done' % skyline_app)
logger.info('process intialized')
while 1:
now = time()
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('process started - %s' % int(now))
# Make sure check_dir exists and has not been removed
try:
if settings.ENABLE_CRUCIBLE_DEBUG:
logger.info('checking check dir exists - %s' % settings.CRUCIBLE_CHECK_PATH)
os.path.exists(settings.CRUCIBLE_CHECK_PATH)
except:
logger.error('error :: check dir did not exist - %s' % settings.CRUCIBLE_CHECK_PATH)
mkdir_p(settings.CRUCIBLE_CHECK_PATH)
logger.info('check dir created - %s' % settings.CRUCIBLE_CHECK_PATH)
os.path.exists(settings.CRUCIBLE_CHECK_PATH)
# continue
# Make sure Redis is up
try:
self.redis_conn.ping()
logger.info('connected to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
except:
logger.info('skyline can not connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
logger.info('connecting to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
continue
"""
Determine if any metric has been added to test
"""
while True:
# Report app up
self.redis_conn.setex(skyline_app, 120, now)
metric_var_files = [f for f in listdir(settings.CRUCIBLE_CHECK_PATH) if isfile(join(settings.CRUCIBLE_CHECK_PATH, f))]
# if len(metric_var_files) == 0:
if not metric_var_files:
logger.info('sleeping 10 no metric check files')
sleep(10)
# Discover metric to analyze
metric_var_files = ''
metric_var_files = [f for f in listdir(settings.CRUCIBLE_CHECK_PATH) if isfile(join(settings.CRUCIBLE_CHECK_PATH, f))]
# if len(metric_var_files) > 0:
if metric_var_files:
break
metric_var_files_sorted = sorted(metric_var_files)
metric_check_file = settings.CRUCIBLE_CHECK_PATH + "/" + str(metric_var_files_sorted[0])
logger.info('assigning check for processing - %s' % str(metric_var_files_sorted[0]))
# Reset process_list
# @modified 20190522 - Task #3034: Reduce multiprocessing Manager list usage
# try:
# self.process_list[:] = []
# except:
# logger.error('error :: failed to reset self.process_list')
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
run_timestamp = int(now)
for i in range(1, settings.CRUCIBLE_PROCESSES + 1):
p = Process(target=self.spin_process, args=(i, run_timestamp, str(metric_check_file)))
pids.append(p)
pid_count += 1
logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.CRUCIBLE_PROCESSES)))
p.start()
spawned_pids.append(p.pid)
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_process has run
# for longer than CRUCIBLE_TESTS_TIMEOUT
p_starts = time()
while time() - p_starts <= settings.CRUCIBLE_TESTS_TIMEOUT:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('%s :: %s spin_process/es completed in %.2f seconds' % (skyline_app, str(settings.CRUCIBLE_PROCESSES), time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive())))
p.join()
while os.path.isfile(metric_check_file):
sleep(1)