import logging
try:
from Queue import Empty
except:
from queue import Empty
from time import time, sleep
from threading import Thread
from multiprocessing import Process, Manager
import os
from os import kill, getpid, listdir
from os.path import join, isfile
from ast import literal_eval
from redis import StrictRedis
from msgpack import Unpacker, packb
import traceback
from sys import version_info
import mysql.connector
from mysql.connector import errorcode
import settings
from skyline_functions import fail_check, mkdir_p
# @added 20170115 - Feature #1854: Ionosphere learn - generations
# Added determination of the learn related variables so that any new metrics
# that Panorama adds to the Skyline database, it adds the default
# IONOSPHERE_LEARN_DEFAULT_ values or the namespace specific values matched
# from settings.IONOSPHERE_LEARN_NAMESPACE_CONFIG to the metric database
# entry.
from ionosphere_functions import get_ionosphere_learn_details
skyline_app = 'panorama'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(version_info[0])
this_host = str(os.uname()[1])
# Converting one settings variable into a local variable, just because it is a
# long string otherwise.
try:
ENABLE_PANORAMA_DEBUG = settings.ENABLE_PANORAMA_DEBUG
except:
logger.error('error :: cannot determine ENABLE_PANORAMA_DEBUG from settings')
ENABLE_PANORAMA_DEBUG = False
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
failed_checks_dir = '%s_failed' % settings.PANORAMA_CHECK_PATH
# @added 20160907 - Handle Panorama stampede on restart after not running #26
# Allow to expire check if greater than PANORAMA_CHECK_MAX_AGE, backwards
# compatible
try:
test_max_age_set = 1 + settings.PANORAMA_CHECK_MAX_AGE
if test_max_age_set > 1:
max_age = True
if test_max_age_set == 1:
max_age = False
max_age_seconds = settings.PANORAMA_CHECK_MAX_AGE
except:
max_age = False
max_age_seconds = 0
expired_checks_dir = '%s_expired' % settings.PANORAMA_CHECK_PATH
# Database configuration
config = {'user': settings.PANORAMA_DBUSER,
'password': settings.PANORAMA_DBUSERPASS,
'host': settings.PANORAMA_DBHOST,
'port': settings.PANORAMA_DBPORT,
'database': settings.PANORAMA_DATABASE,
'raise_on_warnings': True}
[docs]class Panorama(Thread):
"""
The Panorama class which controls the panorama thread and spawned processes.
"""
def __init__(self, parent_pid):
"""
Initialize Panorama
Create the :obj:`self.anomalous_metrics` list
"""
super(Panorama, self).__init__()
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
self.anomalous_metrics = Manager().list()
self.metric_variables = Manager().list()
self.mysql_conn = mysql.connector.connect(**config)
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
exit(0)
"""
These are the panorama mysql functions used to surface and input panorama data
for timeseries.
"""
[docs] def mysql_select(self, select):
"""
Select data from mysql database
:param select: the select string
:type select: str
:return: tuple
:rtype: tuple, boolean
- **Example usage**::
query = 'select id, test from test'
result = self.mysql_select(query)
- **Example of the 0 indexed results tuple, which can hold multiple results**::
>> print('results: %s' % str(results))
results: [(1, u'test1'), (2, u'test2')]
>> print('results[0]: %s' % str(results[0]))
results[0]: (1, u'test1')
.. note::
- If the MySQL query fails a boolean will be returned not a tuple
* ``False``
* ``None``
"""
try:
cnx = mysql.connector.connect(**config)
if ENABLE_PANORAMA_DEBUG:
logger.info('debug :: connected to mysql')
except mysql.connector.Error as err:
logger.error('error :: mysql error - %s' % str(err))
logger.error('error :: failed to connect to mysql')
return False
if cnx:
try:
if ENABLE_PANORAMA_DEBUG:
logger.info('debug :: %s' % (str(select)))
cursor = cnx.cursor()
query = ('%s' % (str(select)))
cursor.execute(query)
result = cursor.fetchall()
cursor.close()
cnx.close()
return result
except mysql.connector.Error as err:
logger.error('error :: mysql error - %s' % str(err))
logger.error('error :: failed to query database - %s' % (str(select)))
try:
cnx.close()
return False
except:
return False
else:
if ENABLE_PANORAMA_DEBUG:
logger.error('error :: failed to connect to mysql')
# Close the test mysql connection
try:
cnx.close()
return False
except:
return False
return False
[docs] def mysql_insert(self, insert):
"""
Insert data into mysql table
:param select: the insert string
:type select: str
:return: int
:rtype: int or boolean
- **Example usage**::
query = 'insert into host (host) VALUES (\'this_host\')'
result = self.mysql_insert(query)
.. note::
- If the MySQL query fails a boolean will be returned not a tuple
* ``False``
* ``None``
"""
try:
cnx = mysql.connector.connect(**config)
if ENABLE_PANORAMA_DEBUG:
logger.info('debug :: connected to mysql')
except mysql.connector.Error as err:
logger.error('error :: mysql error - %s' % str(err))
logger.error('error :: failed to connect to mysql')
raise
if cnx:
try:
cursor = cnx.cursor()
cursor.execute(insert)
inserted_id = cursor.lastrowid
# Make sure data is committed to the database
cnx.commit()
cursor.close()
cnx.close()
return inserted_id
except mysql.connector.Error as err:
logger.error('error :: mysql error - %s' % str(err))
logger.error('Failed to insert record')
cnx.close()
raise
else:
cnx.close()
return False
return False
# @added 20170101 - Feature #1830: Ionosphere alerts
# Bug #1460: panorama check file fails
# Panorama check file fails #24
# Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
[docs] def new_load_metric_vars(self, metric_vars_file):
"""
Load the metric variables for a check from a metric check variables file
:param metric_vars_file: the path and filename to the metric variables files
:type metric_vars_file: str
:return: the metric_vars module object or ``False``
:rtype: list
"""
if os.path.isfile(metric_vars_file):
logger.info(
'loading metric variables from metric_check_file - %s' % (
str(metric_vars_file)))
else:
logger.error(
'error :: loading metric variables from metric_check_file - file not found - %s' % (
str(metric_vars_file)))
return False
metric_vars = []
with open(metric_vars_file) as f:
for line in f:
no_new_line = line.replace('\n', '')
no_equal_line = no_new_line.replace(' = ', ',')
array = str(no_equal_line.split(',', 1))
add_line = literal_eval(array)
metric_vars.append(add_line)
string_keys = ['metric', 'anomaly_dir', 'added_by', 'app', 'source']
float_keys = ['value']
int_keys = ['from_timestamp', 'metric_timestamp', 'added_at', 'full_duration']
array_keys = ['algorithms', 'triggered_algorithms']
boolean_keys = ['graphite_metric', 'run_crucible_tests']
metric_vars_array = []
for var_array in metric_vars:
key = None
value = None
if var_array[0] in string_keys:
key = var_array[0]
value_str = str(var_array[1]).replace("'", '')
value = str(value_str)
if var_array[0] == 'metric':
metric = value
if var_array[0] in float_keys:
key = var_array[0]
value_str = str(var_array[1]).replace("'", '')
value = float(value_str)
if var_array[0] in int_keys:
key = var_array[0]
value_str = str(var_array[1]).replace("'", '')
value = int(value_str)
if var_array[0] in array_keys:
key = var_array[0]
value = literal_eval(str(var_array[1]))
if var_array[0] in boolean_keys:
key = var_array[0]
if str(var_array[1]) == 'True':
value = True
else:
value = False
if key:
metric_vars_array.append([key, value])
if len(metric_vars_array) == 0:
logger.error(
'error :: loading metric variables - none found' % (
str(metric_vars_file)))
return False
if settings.ENABLE_DEBUG:
logger.info(
'debug :: metric_vars determined - metric variable - metric - %s' % str(metric_vars.metric))
logger.info('debug :: metric_vars for %s' % str(metric))
logger.info('debug :: %s' % str(metric_vars_array))
return metric_vars_array
[docs] def spin_process(self, i, metric_check_file):
"""
Assign a metric anomaly to process.
:param i: python process id
:param metric_check_file: full path to the metric check file
:return: returns True
"""
child_process_pid = os.getpid()
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: child_process_pid - %s' % str(child_process_pid))
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: processing metric check - %s' % metric_check_file)
if not os.path.isfile(str(metric_check_file)):
logger.error('error :: file not found - metric_check_file - %s' % (str(metric_check_file)))
return
check_file_name = os.path.basename(str(metric_check_file))
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_name - %s' % check_file_name)
check_file_timestamp = check_file_name.split('.', 1)[0]
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_timestamp - %s' % str(check_file_timestamp))
check_file_metricname_txt = check_file_name.split('.', 1)[1]
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_metricname_txt - %s' % check_file_metricname_txt)
check_file_metricname = check_file_metricname_txt.replace('.txt', '')
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_metricname - %s' % check_file_metricname)
check_file_metricname_dir = check_file_metricname.replace('.', '/')
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_metricname_dir - %s' % check_file_metricname_dir)
metric_failed_check_dir = '%s/%s/%s' % (failed_checks_dir, check_file_metricname_dir, check_file_timestamp)
failed_check_file = '%s/%s' % (metric_failed_check_dir, check_file_name)
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: failed_check_file - %s' % failed_check_file)
# Load and validate metric variables
try:
# @modified 20170101 - Feature #1830: Ionosphere alerts
# Bug #1460: panorama check file fails
# Panorama check file fails #24
# Get rid of the skyline_functions imp as imp is deprecated in py3 anyway
# Use def new_load_metric_vars(self, metric_vars_file):
# metric_vars = load_metric_vars(skyline_app, str(metric_check_file))
metric_vars_array = self.new_load_metric_vars(str(metric_check_file))
except:
logger.info(traceback.format_exc())
logger.error('error :: failed to load metric variables from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
# Test metric variables
# We use a pythonic methodology to test if the variables are defined,
# this ensures that if any of the variables are not set for some reason
# we can handle unexpected data or situations gracefully and try and
# ensure that the process does not hang.
metric = None
try:
# metric_vars.metric
# metric = str(metric_vars.metric)
key = 'metric'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
metric = str(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - metric - %s' % metric)
except:
logger.error('error :: failed to read metric variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not metric:
logger.error('error :: failed to load metric variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
value = None
# @added 20171214 - Bug #2234: panorama metric_vars value check
value_valid = None
try:
# metric_vars.value
# value = str(metric_vars.value)
key = 'value'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
value = float(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - value - %s' % (value))
# @added 20171214 - Bug #2234: panorama metric_vars value check
value_valid = True
except:
logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
# @added 20171214 - Bug #2234: panorama metric_vars value check
# If value was float of 0.0 then this was interpolated as not set
# if not value:
if not value_valid:
# @added 20171214 - Bug #2234: panorama metric_vars value check
# Added exception handling here
logger.info(traceback.format_exc())
logger.error('error :: failed to read value variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
from_timestamp = None
try:
# metric_vars.from_timestamp
# from_timestamp = str(metric_vars.from_timestamp)
key = 'from_timestamp'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
from_timestamp = int(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - from_timestamp - %s' % from_timestamp)
except:
# @added 20160822 - Bug #1460: panorama check file fails
# Added exception handling here
logger.info(traceback.format_exc())
logger.error('error :: failed to read from_timestamp variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not from_timestamp:
logger.error('error :: failed to load from_timestamp variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
metric_timestamp = None
try:
# metric_vars.metric_timestamp
# metric_timestamp = str(metric_vars.metric_timestamp)
key = 'metric_timestamp'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
metric_timestamp = int(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - metric_timestamp - %s' % metric_timestamp)
except:
logger.error('error :: failed to read metric_timestamp variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not metric_timestamp:
logger.error('error :: failed to load metric_timestamp variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
algorithms = None
try:
# metric_vars.algorithms
# algorithms = metric_vars.algorithms
key = 'algorithms'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
algorithms = value_list[0]
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - algorithms - %s' % str(algorithms))
except:
logger.error('error :: failed to read algorithms variable from check file setting to all - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not algorithms:
logger.error('error :: failed to load algorithms variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
triggered_algorithms = None
try:
# metric_vars.triggered_algorithms
# triggered_algorithms = metric_vars.triggered_algorithms
key = 'triggered_algorithms'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
triggered_algorithms = value_list[0]
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - triggered_algorithms - %s' % str(triggered_algorithms))
except:
logger.error('error :: failed to read triggered_algorithms variable from check file setting to all - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not triggered_algorithms:
logger.error('error :: failed to load triggered_algorithms variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
app = None
try:
# metric_vars.app
# app = str(metric_vars.app)
key = 'app'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
app = str(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - app - %s' % app)
except:
logger.error('error :: failed to read app variable from check file setting to all - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not app:
logger.error('error :: failed to load app variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
source = None
try:
# metric_vars.source
# source = str(metric_vars.source)
key = 'source'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
source = str(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - source - %s' % source)
except:
logger.error('error :: failed to read source variable from check file setting to all - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not app:
logger.error('error :: failed to load app variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
added_by = None
try:
# metric_vars.added_by
# added_by = str(metric_vars.added_by)
key = 'added_by'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
added_by = str(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - added_by - %s' % added_by)
except:
logger.error('error :: failed to read added_by variable from check file setting to all - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not added_by:
logger.error('error :: failed to load added_by variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
added_at = None
try:
# metric_vars.added_at
# added_at = str(metric_vars.added_at)
key = 'added_at'
value_list = [var_array[1] for var_array in metric_vars_array if var_array[0] == key]
added_at = str(value_list[0])
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: metric variable - added_at - %s' % added_at)
except:
logger.error('error :: failed to read added_at variable from check file setting to all - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
if not added_at:
logger.error('error :: failed to load added_at variable from check file - %s' % (metric_check_file))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return
record_anomaly = True
cache_key = '%s.last_check.%s.%s' % (skyline_app, app, metric)
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: cache_key - %s.last_check.%s.%s' % (
skyline_app, app, metric))
try:
last_check = self.redis_conn.get(cache_key)
except Exception as e:
logger.error(
'error :: could not query cache_key - %s.last_check.%s.%s - %s' % (
skyline_app, app, metric, e))
last_check = None
if last_check:
record_anomaly = False
logger.info(
'Panorama metric key not expired - %s.last_check.%s.%s' % (
skyline_app, app, metric))
# @added 20160907 - Handle Panorama stampede on restart after not running #26
# Allow to expire check if greater than PANORAMA_CHECK_MAX_AGE
if max_age:
now = time()
anomaly_age = int(now) - int(metric_timestamp)
if anomaly_age > max_age_seconds:
record_anomaly = False
logger.info(
'Panorama check max age exceeded - %s - %s seconds old, older than %s seconds discarding' % (
metric, str(anomaly_age), str(max_age_seconds)))
if not record_anomaly:
logger.info('not recording anomaly for - %s' % (metric))
if os.path.isfile(str(metric_check_file)):
try:
os.remove(str(metric_check_file))
logger.info('metric_check_file removed - %s' % str(metric_check_file))
except OSError:
pass
return
# Determine id of something thing
def determine_id(table, key, value):
"""
Get the id of something from Redis or the database and create a new
Redis key with the value if one does not exist.
:param table: table name
:param key: key name
:param value: value name
:type table: str
:type key: str
:type value: str
:return: int or boolean
"""
query_cache_key = '%s.mysql_ids.%s.%s.%s' % (skyline_app, table, key, value)
determined_id = None
redis_determined_id = None
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: query_cache_key - %s' % (query_cache_key))
try:
redis_known_id = self.redis_conn.get(query_cache_key)
except:
redis_known_id = None
if redis_known_id:
unpacker = Unpacker(use_list=False)
unpacker.feed(redis_known_id)
redis_determined_id = list(unpacker)
if redis_determined_id:
determined_id = int(redis_determined_id[0])
if determined_id:
if determined_id > 0:
return determined_id
# Query MySQL
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
query = 'select id FROM %s WHERE %s=\'%s\'' % (table, key, value) # nosec
# @modified 20170916 - Bug #2166: panorama incorrect mysql_id cache keys
# Wrap in except
# results = self.mysql_select(query)
results = None
try:
results = self.mysql_select(query)
except:
logger.error('error :: failed to determine results from - %s' % (query))
determined_id = 0
if results:
try:
determined_id = int(results[0][0])
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: determined_id is not an int')
determined_id = 0
if determined_id > 0:
# Set the key for a week
if not redis_determined_id:
try:
self.redis_conn.setex(query_cache_key, 604800, packb(determined_id))
logger.info('set redis query_cache_key - %s - id: %s' % (
query_cache_key, str(determined_id)))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: failed to set query_cache_key - %s - id: %s' % (
query_cache_key, str(determined_id)))
return int(determined_id)
# @added 20170115 - Feature #1854: Ionosphere learn - generations
# Added determination of the learn related variables
# learn_full_duration_days, learn_valid_ts_older_than,
# max_generations and max_percent_diff_from_origin value to the
# insert statement if the table is the metrics table.
if table == 'metrics' and key == 'metric':
# Set defaults
learn_full_duration_days = int(settings.IONOSPHERE_LEARN_DEFAULT_FULL_DURATION_DAYS)
valid_learning_duration = int(settings.IONOSPHERE_LEARN_DEFAULT_VALID_TIMESERIES_OLDER_THAN_SECONDS)
max_generations = int(settings.IONOSPHERE_LEARN_DEFAULT_MAX_GENERATIONS)
max_percent_diff_from_origin = float(settings.IONOSPHERE_LEARN_DEFAULT_MAX_PERCENT_DIFF_FROM_ORIGIN)
try:
use_full_duration, valid_learning_duration, use_full_duration_days, max_generations, max_percent_diff_from_origin = get_ionosphere_learn_details(skyline_app, value)
learn_full_duration_days = use_full_duration_days
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to get_ionosphere_learn_details for %s' % value)
logger.info('metric learn details determined for %s' % value)
logger.info('learn_full_duration_days :: %s days' % (str(learn_full_duration_days)))
logger.info('valid_learning_duration :: %s seconds' % (str(valid_learning_duration)))
logger.info('max_generations :: %s' % (str(max_generations)))
logger.info('max_percent_diff_from_origin :: %s' % (str(max_percent_diff_from_origin)))
# INSERT because no known id
# @modified 20170115 - Feature #1854: Ionosphere learn - generations
# Added the learn_full_duration_days, learn_valid_ts_older_than,
# max_generations and max_percent_diff_from_origin value to the
# insert statement if the table is the metrics table.
# insert_query = 'insert into %s (%s) VALUES (\'%s\')' % (table, key, value)
if table == 'metrics' and key == 'metric':
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
insert_query_string = '%s (%s, learn_full_duration_days, learn_valid_ts_older_than, max_generations, max_percent_diff_from_origin) VALUES (\'%s\', %s, %s, %s, %s)' % (
table, key, value, str(learn_full_duration_days),
str(valid_learning_duration), str(max_generations),
str(max_percent_diff_from_origin))
insert_query = 'insert into %s' % insert_query_string # nosec
else:
insert_query = 'insert into %s (%s) VALUES (\'%s\')' % (table, key, value) # nosec
logger.info('inserting %s into %s table' % (value, table))
try:
results = self.mysql_insert(insert_query)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to determine the id of %s from the insert' % (value))
raise
determined_id = 0
if results:
determined_id = int(results)
else:
logger.error('error :: results not set')
raise
if determined_id > 0:
# Set the key for a week
if not redis_determined_id:
try:
self.redis_conn.setex(query_cache_key, 604800, packb(determined_id))
logger.info('set redis query_cache_key - %s - id: %s' % (
query_cache_key, str(determined_id)))
except Exception as e:
logger.error(traceback.format_exc())
logger.error('%s' % str(e))
logger.error('error :: failed to set query_cache_key - %s - id: %s' % (
query_cache_key, str(determined_id)))
return determined_id
logger.error('error :: failed to determine the inserted id for %s' % value)
return False
try:
added_by_host_id = determine_id('hosts', 'host', added_by)
except:
logger.error('error :: failed to determine id of %s' % (added_by))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
try:
app_id = determine_id('apps', 'app', app)
except:
logger.error('error :: failed to determine id of %s' % (app))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
try:
source_id = determine_id('sources', 'source', source)
except:
logger.error('error :: failed to determine id of %s' % (source))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
try:
metric_id = determine_id('metrics', 'metric', metric)
except:
logger.error('error :: failed to determine id of %s' % (metric))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
algorithms_ids_csv = ''
for algorithm in algorithms:
try:
algorithm_id = determine_id('algorithms', 'algorithm', algorithm)
except:
logger.error('error :: failed to determine id of %s' % (algorithm))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
if algorithms_ids_csv == '':
algorithms_ids_csv = str(algorithm_id)
else:
new_algorithms_ids_csv = '%s,%s' % (algorithms_ids_csv, str(algorithm_id))
algorithms_ids_csv = new_algorithms_ids_csv
triggered_algorithms_ids_csv = ''
for triggered_algorithm in triggered_algorithms:
try:
triggered_algorithm_id = determine_id('algorithms', 'algorithm', triggered_algorithm)
except:
logger.error('error :: failed to determine id of %s' % (triggered_algorithm))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
if triggered_algorithms_ids_csv == '':
triggered_algorithms_ids_csv = str(triggered_algorithm_id)
else:
new_triggered_algorithms_ids_csv = '%s,%s' % (
triggered_algorithms_ids_csv, str(triggered_algorithm_id))
triggered_algorithms_ids_csv = new_triggered_algorithms_ids_csv
logger.info('inserting anomaly')
try:
full_duration = int(metric_timestamp) - int(from_timestamp)
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: full_duration - %s' % str(full_duration))
except:
logger.error('error :: failed to determine full_duration')
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
try:
anomalous_datapoint = round(float(value), 6)
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: anomalous_datapoint - %s' % str(anomalous_datapoint))
except:
logger.error('error :: failed to determine anomalous_datapoint')
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
try:
columns = '%s, %s, %s, %s, %s, %s, %s, %s, %s' % (
'metric_id', 'host_id', 'app_id', 'source_id',
'anomaly_timestamp', 'anomalous_datapoint', 'full_duration',
'algorithms_run', 'triggered_algorithms')
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: columns - %s' % str(columns))
except:
logger.error('error :: failed to construct columns string')
logger.info(traceback.format_exc())
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
try:
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
query_string = '(%s) VALUES (%d, %d, %d, %d, %s, %.6f, %d, \'%s\', \'%s\')' % (
columns, metric_id, added_by_host_id, app_id, source_id,
metric_timestamp, anomalous_datapoint, full_duration,
algorithms_ids_csv, triggered_algorithms_ids_csv)
query = 'insert into anomalies %s' % query_string # nosec
except:
logger.error('error :: failed to construct insert query')
logger.info(traceback.format_exc())
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: anomaly insert - %s' % str(query))
try:
anomaly_id = self.mysql_insert(query)
logger.info('anomaly id - %d - created for %s at %s' % (
anomaly_id, metric, metric_timestamp))
except:
logger.error('error :: failed to insert anomaly %s at %s' % (
anomaly_id, metric, metric_timestamp))
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
return False
# Set anomaly record cache key
try:
self.redis_conn.setex(
cache_key, settings.PANORAMA_EXPIRY_TIME, packb(value))
logger.info('set cache_key - %s.last_check.%s.%s - %s' % (
skyline_app, app, metric, str(settings.PANORAMA_EXPIRY_TIME)))
except Exception as e:
logger.error(
'error :: could not query cache_key - %s.last_check.%s.%s - %s' % (
skyline_app, app, metric, e))
if os.path.isfile(str(metric_check_file)):
try:
os.remove(str(metric_check_file))
logger.info('metric_check_file removed - %s' % str(metric_check_file))
except OSError:
pass
return anomaly_id
[docs] def run(self):
"""
Called when the process intializes.
Determine if what is known in the Skyline DB
blah
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
logger.info('removing %s' % skyline_app_logwait)
os.remove(skyline_app_logwait)
except OSError:
logger.error('error :: failed to remove %s, continuing' % skyline_app_logwait)
pass
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error :: bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error :: failed to remove %s, continuing' % skyline_app_loglock)
pass
else:
logger.info('bin/%s.d log management done' % skyline_app)
# See if I am known in the DB, if so, what are my variables
# self.populate mysql
# What is my host id in the Skyline panorama DB?
# - if not known - INSERT hostname INTO hosts
# What are the known apps?
# - if returned make a dictionary
# What are the known algorithms?
# - if returned make a dictionary
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
if ENABLE_PANORAMA_DEBUG:
logger.info('debug :: connected to Redis')
except:
logger.error('error :: cannot connect to redis at socket path %s' % (
settings.REDIS_SOCKET_PATH))
sleep(30)
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
continue
# Report app up
try:
self.redis_conn.setex(skyline_app, 120, now)
logger.info('updated Redis key for %s up' % skyline_app)
except:
logger.error('error :: failed to update Redis key for %s up' % skyline_app)
if ENABLE_PANORAMA_DEBUG:
# Make sure mysql is available
mysql_down = True
while mysql_down:
query = 'SHOW TABLES'
results = self.mysql_select(query)
if results:
mysql_down = False
logger.info('debug :: tested database query - OK')
else:
logger.error('error :: failed to query database')
sleep(30)
if ENABLE_PANORAMA_DEBUG:
try:
query = 'SELECT id, test FROM test'
result = self.mysql_select(query)
logger.info('debug :: tested mysql SELECT query - OK')
logger.info('debug :: result: %s' % str(result))
logger.info('debug :: result[0]: %s' % str(result[0]))
logger.info('debug :: result[1]: %s' % str(result[1]))
# Works
# 2016-06-10 19:07:23 :: 4707 :: result: [(1, u'test1')]
except:
logger.error(
'error :: mysql error - %s' %
traceback.print_exc())
logger.error('error :: failed to SELECT')
# self.populate the database metatdata tables
# What is my host id in the Skyline panorama DB?
host_id = False
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
query = 'select id FROM hosts WHERE host=\'%s\'' % this_host # nosec
results = self.mysql_select(query)
if results:
host_id = results[0][0]
logger.info('host_id: %s' % str(host_id))
else:
logger.info('failed to determine host id of %s' % this_host)
# - if not known - INSERT hostname INTO host
if not host_id:
logger.info('inserting %s into hosts table' % this_host)
# @modified 20170913 - Task #2160: Test skyline with bandit
# Added nosec to exclude from bandit tests
query = 'insert into hosts (host) VALUES (\'%s\')' % this_host # nosec
host_id = self.mysql_insert(query)
if host_id:
logger.info('new host_id: %s' % str(host_id))
if not host_id:
logger.error(
'error :: failed to determine populate %s into the hosts table' %
this_host)
sleep(30)
continue
# Like loop through the panorama dir and see if anyone has left you
# any work, etc
# Make sure check_dir exists and has not been removed
try:
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: checking check dir exists - %s' % settings.PANORAMA_CHECK_PATH)
os.path.exists(settings.PANORAMA_CHECK_PATH)
except:
logger.error('error :: check dir did not exist - %s' % settings.PANORAMA_CHECK_PATH)
mkdir_p(settings.PANORAMA_CHECK_PATH)
logger.info('check dir created - %s' % settings.PANORAMA_CHECK_PATH)
os.path.exists(settings.PANORAMA_CHECK_PATH)
# continue
"""
Determine if any metric has been added to add
"""
while True:
metric_var_files = False
try:
metric_var_files = [f for f in listdir(settings.PANORAMA_CHECK_PATH) if isfile(join(settings.PANORAMA_CHECK_PATH, f))]
except:
logger.error('error :: failed to list files in check dir')
logger.info(traceback.format_exc())
if not metric_var_files:
logger.info('sleeping 20 no metric check files')
sleep(20)
# Discover metric anomalies to insert
metric_var_files = False
try:
metric_var_files = [f for f in listdir(settings.PANORAMA_CHECK_PATH) if isfile(join(settings.PANORAMA_CHECK_PATH, f))]
except:
logger.error('error :: failed to list files in check dir')
logger.info(traceback.format_exc())
if metric_var_files:
break
metric_var_files_sorted = sorted(metric_var_files)
metric_check_file = '%s/%s' % (settings.PANORAMA_CHECK_PATH, str(metric_var_files_sorted[0]))
logger.info('assigning anomaly for insertion - %s' % str(metric_var_files_sorted[0]))
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
now = time()
for i in range(1, settings.PANORAMA_PROCESSES + 1):
try:
p = Process(target=self.spin_process, args=(i, metric_check_file))
pids.append(p)
pid_count += 1
logger.info('starting %s of %s spin_process/es' % (str(pid_count), str(settings.PANORAMA_PROCESSES)))
p.start()
spawned_pids.append(p.pid)
except:
logger.error('error :: to start spin_process')
logger.info(traceback.format_exc())
continue
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_process has run
# for longer than CRUCIBLE_TESTS_TIMEOUT
p_starts = time()
while time() - p_starts <= 20:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info(
'%s :: %s spin_process/es completed in %.2f seconds' % (
skyline_app, str(settings.PANORAMA_PROCESSES),
time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing all spin_process processes' % (skyline_app))
for p in pids:
p.terminate()
# p.join()
check_file_name = os.path.basename(str(metric_check_file))
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_name - %s' % check_file_name)
check_file_timestamp = check_file_name.split('.', 1)[0]
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_timestamp - %s' % str(check_file_timestamp))
check_file_metricname_txt = check_file_name.split('.', 1)[1]
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_metricname_txt - %s' % check_file_metricname_txt)
check_file_metricname = check_file_metricname_txt.replace('.txt', '')
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_metricname - %s' % check_file_metricname)
check_file_metricname_dir = check_file_metricname.replace('.', '/')
if settings.ENABLE_PANORAMA_DEBUG:
logger.info('debug :: check_file_metricname_dir - %s' % check_file_metricname_dir)
metric_failed_check_dir = '%s/%s/%s' % (failed_checks_dir, check_file_metricname_dir, check_file_timestamp)
fail_check(skyline_app, metric_failed_check_dir, str(metric_check_file))
for p in pids:
if p.is_alive():
logger.info('%s :: stopping spin_process - %s' % (skyline_app, str(p.is_alive())))
p.join()