from __future__ import division
import logging
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
from sys import exit
from os import kill, getpid
import traceback
import settings
from skyline_functions import (
get_redis_conn, get_redis_conn_decoded, send_graphite_metric)
from functions.thunder.checks.app.up import thunder_check_app
from functions.thunder.checks.analyzer.run_time import thunder_check_analyzer_run_time
from functions.thunder.checks.horizon.metrics_received import thunder_check_horizon_metrics_received
# from functions.thunder.stale_metrics import thunder_stale_metrics
skyline_app = 'thunder'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
full_uniques = '%sunique_metrics' % settings.FULL_NAMESPACE
LOCAL_DEBUG = False
[docs]class RollingThunder(Thread):
"""
The RollingThunder class which controls the thunder/rolling thread and
spawned processes. thunder/rolling carries out internal and external checks
and sends any events to thunder for various Skyline app operations and
dependencies, such as Redis, mariadb, memcache and Graphite.
In a distributed Skyline set up, thunder/rolling can alert on other Skyline
apps in the cluster (TBD)
"""
def __init__(self, parent_pid):
"""
Initialize Rolling
"""
super(RollingThunder, self).__init__()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
exit(0)
[docs] def rolling_process(self, i):
"""
Create and manage the required lists and Redis sets
"""
spin_start = time()
logger.info('thunder/rolling :: rolling_process started')
redis_available = False
last_run_timestamp = 0
thunder_rolling_last_timestamp_key = 'thunder.rolling.last_run_timestamp'
try:
last_run_timestamp = self.redis_conn_decoded.get(thunder_rolling_last_timestamp_key)
redis_available = True
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: failed to timestamp from %s Redis key - %s' % (
thunder_rolling_last_timestamp_key, err))
last_run_timestamp = 0
if last_run_timestamp:
logger.info('thunder/rolling :: %s Redis key has not expired, not running' % (
thunder_rolling_last_timestamp_key))
return
# Check apps
thunder_apps = list(settings.THUNDER_CHECKS.keys())
check_apps = []
for thunder_app in thunder_apps:
if settings.THUNDER_CHECKS[thunder_app]['up']['run']:
check_apps.append(thunder_app)
apps_up = 0
check_apps_up = []
for check_app in check_apps:
if not redis_available:
logger.warning('warning :: thunder/rolling :: redis not available not check %s' % (
check_app))
continue
try:
success = thunder_check_app(self, check_app)
if success:
apps_up += 1
logger.info('thunder/rolling :: %s is reporting UP' % (
check_app))
check_apps_up.append(check_app)
else:
logger.warning('warning :: thunder/rolling :: %s is NOT reporting UP' % (
check_app))
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: thunder_check_app errored for %s - %s' % (
check_app, err))
if check_apps:
logger.info('thunder/rolling :: %s Skyline apps checked and %s apps reporting UP' % (
str(len(check_apps)), str(apps_up)))
# Analyzer checks
if 'analyzer' in thunder_apps:
# run_time check
run_time_check = False
try:
run_time_check = settings.THUNDER_CHECKS['analyzer']['run_time']['run']
except Exception as e:
logger.error('error :: thunder/rolling :: failed to determine if analyzer run_time check should be run - %s' % (
e))
if not redis_available:
logger.warning('warning :: thunder/rolling :: redis not available not checking analyzer run_time')
run_time_check = False
if run_time_check:
try:
success = thunder_check_analyzer_run_time(self)
if success:
logger.info('thunder/rolling :: analyzer run_time OK')
else:
logger.warning('warning :: thunder/rolling :: analyzer run_time overruning')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: thunder_check_analyzer_run_time errored - %s' % (
err))
# Horizon checks
if 'horizon' in thunder_apps:
# metrics_received check
metrics_received_check = False
try:
metrics_received_check = settings.THUNDER_CHECKS['horizon']['worker.metrics_received']['run']
except Exception as e:
logger.error('error :: thunder/rolling :: failed to determine if horizon metrics_received check should be run - %s' % (
e))
if not redis_available:
logger.warning('warning :: thunder/rolling :: redis not available not checking horizon metrics_received')
metrics_received_check = False
if metrics_received_check:
try:
success = thunder_check_horizon_metrics_received(self)
if success:
logger.info('thunder/rolling :: horizon metrics_received OK')
else:
logger.warning('warning :: thunder/rolling :: horizon metrics_received has significantly changed')
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: tthunder_check_horizon_metrics_recieved errored - %s' % (
e))
else:
logger.warning('warning :: thunder/rolling :: did not check horizon metrics_recieved as thunder_check_horizon_metrics_received failed to import')
if not redis_available:
for check_app in check_apps:
check_apps_up.append(check_app)
# @added 20220328 - Feature #4018: thunder - skyline.errors
# Consume the app RedisErrorLogHandler Redis key which is a count for
# every error logged and create the
# skyline.<hostname>.<skyline_app>.logged_errors metrics
if 'thunder' not in check_apps_up:
check_apps.append('thunder')
flux_done = False
vista_done = False
for check_app in check_apps_up:
# Do not check Redis
if check_app == 'redis':
continue
check_app_name = str(check_app)
error_count = 0
error_count_key = '%s.log.errors.per_minute' % check_app
# Only check flux once, although there are flux.listen and
# flux.worker check app Redis keys, both submit errors to the same
# Redis error key
if check_app.startswith('flux'):
check_app_name = 'flux'
error_count_key = 'flux.log.errors.per_minute'
if flux_done:
continue
flux_done = True
check_app = 'flux'
# Only check vista once, although there are vista and vista.worker
# check app Redis keys, both submit errors to the same Redis error
# key
if check_app.startswith('vista'):
check_app_name = 'vista'
error_count_key = 'vista.log.errors.per_minute'
if vista_done:
continue
vista_done = True
check_app = 'vista'
check_log_errors_file = False
try:
error_count_str = None
error_count_str = self.redis_conn_decoded.getset(error_count_key, 0)
if error_count_str:
error_count = int(error_count_str)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: failed to getset Redis key %s - %s' % (
error_count_key, err))
check_log_errors_file = True
if check_log_errors_file:
error_count = 0
log_errors_file = '%s/%s.log_errors.txt' % (
settings.SKYLINE_TMP_DIR, check_app_name)
error_count_array = []
if os.path.isfile(log_errors_file):
logger.info('thunder/rolling :: using %s' % (
log_errors_file))
try:
with open(log_errors_file, 'r') as f:
for line in f:
value_string = line.replace('\n', '')
unquoted_value_string = value_string.replace("'", '')
float_value = float(unquoted_value_string)
error_count_array.append(float_value)
except:
error_count_array = []
try:
os.remove(log_errors_file)
except:
pass
if error_count_array:
error_count = len(error_count_array)
check_app_graphite_namespace = 'skyline.%s%s' % (check_app, SERVER_METRIC_PATH)
send_metric_name = '%s.logged_errors' % check_app_graphite_namespace
logger.info('thunder/rolling :: %s :: %s' % (send_metric_name, str(error_count)))
send_graphite_metric(skyline_app, send_metric_name, error_count)
spin_end = time() - spin_start
logger.info('thunder/rolling :: checks took %.2f seconds' % spin_end)
return
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up
- Spawn a rolling process to do checks
- Wait for the process to finish.
- run_every 60 seconds
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('thunder/rolling :: starting %s/rolling' % skyline_app)
run_every = 60
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling cannot connect to redis at socket path %s - %s' % (
settings.REDIS_SOCKET_PATH, e))
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as e:
logger.info(traceback.format_exc())
logger.error('error :: thunder/rolling cannot connect to get_redis_conn - %s' % e)
# continue
# Report app up
try:
self.redis_conn.setex('thunder.rolling', 120, now)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: could not update the Redis analyzer.thunder/rolling key - %s' % e)
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
try:
p = Process(target=self.rolling_process, args=(0,))
pids.append(p)
pid_count += 1
logger.info('thunder/rolling :: starting rolling_process')
p.start()
spawned_pids.append(p.pid)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: failed to spawn process - %s' % e)
# Self monitor processes and terminate if any rolling_process that
# has run for longer than 180 seconds
p_starts = time()
while time() - p_starts <= run_every:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('thunder/rolling :: rolling_process completed in %.2f seconds' % (time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('thunder/rolling :: timed out, killing rolling_process process')
for p in pids:
logger.info('thunder/rolling :: killing rolling_process process')
p.terminate()
logger.info('thunder/rolling :: killed rolling_process process')
for p in pids:
if p.is_alive():
try:
logger.info('thunder/rolling :: stopping rolling_process - %s' % (str(p.is_alive())))
p.terminate()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: thunder/rolling :: failed to stop rolling_process - %s' % e)
process_runtime = time() - now
if process_runtime < run_every:
sleep_for = (run_every - process_runtime)
process_runtime_now = time() - now
sleep_for = (run_every - process_runtime_now)
logger.info('thunder/rolling :: sleeping for %.2f seconds due to low run time...' % sleep_for)
sleep(sleep_for)
try:
del sleep_for
except Exception as e:
logger.error('error :: thunder/rolling :: failed to del sleep_for - %s' % e)
try:
del process_runtime
except Exception as e:
logger.error('error :: thunder/rolling :: failed to del process_runtime - %s' % e)