Source code for luminosity.related_metrics
import logging
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
import sys
from os import kill, getpid
import traceback
from ast import literal_eval
import settings
from skyline_functions import get_redis_conn, get_redis_conn_decoded
from functions.metrics.get_metric_latest_anomaly import get_metric_latest_anomaly
from functions.database.queries.get_metric_group_info import get_metric_group_info
from functions.luminosity.get_cross_correlation_relationships import get_cross_correlation_relationships
from functions.luminosity.update_metric_group import update_metric_group
skyline_app = 'luminosity'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
LUMINOSITY_RELATED_METRICS_MAX_5MIN_LOADAVG = settings.LUMINOSITY_RELATED_METRICS_MAX_5MIN_LOADAVG
except:
LUMINOSITY_RELATED_METRICS_MAX_5MIN_LOADAVG = 3
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
[docs]class RelatedMetrics(Thread):
"""
The RelatedMetrics class controls the luminosity/related_metrics thread and
spawned processes. luminosity/related_metrics analyses the results of
luminosity cross_correlations and related_metricss to create and maintain
metric groups.
"""
def __init__(self, parent_pid):
"""
Initialize RelatedMetrics
"""
super(RelatedMetrics, self).__init__()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
sys.exit(0)
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up
- Spawn a process_metric process to do analysis
- Wait for the process to finish.
- run_every 300 seconds
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('related_metrics :: starting')
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: related_metrics cannot connect to redis at socket path %s - %s' % (
settings.REDIS_SOCKET_PATH, e))
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as e:
logger.info(traceback.format_exc())
logger.error('error :: related_metrics cannot connect to get_redis_conn - %s' % e)
continue
# Report app up
try:
self.redis_conn.setex('luminosity.related_metrics', 120, now)
logger.info('related_metrics :: set luminosity.related_metrics Redis key')
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: related_metrics :: could not update the Redis luminosity.related_metrics key - %s' % str(err))
now_timestamp = int(time())
# Spawn process
pids = []
spawned_pids = []
pid_count = 0
for i in range(1, 1 + 1):
try:
p = Process(target=self.find_related, args=(i,))
pids.append(p)
pid_count += 1
logger.info('related_metrics starting %s of 1 find_related processes' % (str(pid_count)))
p.start()
spawned_pids.append(p.pid)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: related_metrics :: failed to spawn find_related_metrics process - %s' % e)
# Self monitor processes and terminate if any find_related
# has run for longer than run_every - 10
p_starts = time()
while time() - p_starts <= (120 - 10):
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('related_metrics :: find_related process completed in %.2f seconds' % (
time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('related_metrics :: timed out, killing find_related process')
for p in pids:
logger.info('related_metrics :: killing find_related process')
p.terminate()
logger.info('related_metrics :: killed find_related process')
for p in pids:
if p.is_alive():
try:
logger.info('related_metrics :: stopping find_related - %s' % (str(p.is_alive())))
p.terminate()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: related_metrics :: failed to stop find_related - %s' % e)
run_every = 60
process_runtime = time() - now
if process_runtime < run_every:
sleep_for = (run_every - process_runtime)
process_runtime_now = time() - now
sleep_for = (run_every - process_runtime_now)
logger.info('related_metrics :: sleeping for %.2f seconds due to low run time...' % sleep_for)
sleep(sleep_for)
try:
del sleep_for
except Exception as e:
logger.error('error :: related_metrics :: failed to del sleep_for - %s' % e)
try:
del process_runtime
except Exception as e:
logger.error('error :: related_metrics :: failed to del process_runtime - %s' % e)