Source code for luminosity.related_metrics

import logging
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
import sys
from os import kill, getpid
import traceback
from ast import literal_eval

import settings
from skyline_functions import get_redis_conn, get_redis_conn_decoded
from functions.metrics.get_metric_latest_anomaly import get_metric_latest_anomaly
from functions.database.queries.get_metric_group_info import get_metric_group_info
from functions.luminosity.get_cross_correlation_relationships import get_cross_correlation_relationships
from functions.luminosity.update_metric_group import update_metric_group

skyline_app = 'luminosity'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

try:
    LUMINOSITY_RELATED_METRICS_MAX_5MIN_LOADAVG = settings.LUMINOSITY_RELATED_METRICS_MAX_5MIN_LOADAVG
except:
    LUMINOSITY_RELATED_METRICS_MAX_5MIN_LOADAVG = 3

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)


[docs]class RelatedMetrics(Thread): """ The RelatedMetrics class controls the luminosity/related_metrics thread and spawned processes. luminosity/related_metrics analyses the results of luminosity cross_correlations and related_metricss to create and maintain metric groups. """ def __init__(self, parent_pid): """ Initialize RelatedMetrics """ super(RelatedMetrics, self).__init__() self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: sys.exit(0)
[docs] def run(self): """ - Called when the process intializes. - Determine if Redis is up - Spawn a process_metric process to do analysis - Wait for the process to finish. - run_every 300 seconds """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('related_metrics :: starting') while 1: now = time() # Make sure Redis is up try: self.redis_conn.ping() except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: related_metrics cannot connect to redis at socket path %s - %s' % ( settings.REDIS_SOCKET_PATH, e)) sleep(10) try: self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) except Exception as e: logger.info(traceback.format_exc()) logger.error('error :: related_metrics cannot connect to get_redis_conn - %s' % e) continue # Report app up try: self.redis_conn.setex('luminosity.related_metrics', 120, now) logger.info('related_metrics :: set luminosity.related_metrics Redis key') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: related_metrics :: could not update the Redis luminosity.related_metrics key - %s' % str(err)) now_timestamp = int(time()) # Spawn process pids = [] spawned_pids = [] pid_count = 0 for i in range(1, 1 + 1): try: p = Process(target=self.find_related, args=(i,)) pids.append(p) pid_count += 1 logger.info('related_metrics starting %s of 1 find_related processes' % (str(pid_count))) p.start() spawned_pids.append(p.pid) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: related_metrics :: failed to spawn find_related_metrics process - %s' % e) # Self monitor processes and terminate if any find_related # has run for longer than run_every - 10 p_starts = time() while time() - p_starts <= (120 - 10): if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('related_metrics :: find_related process completed in %.2f seconds' % ( time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('related_metrics :: timed out, killing find_related process') for p in pids: logger.info('related_metrics :: killing find_related process') p.terminate() logger.info('related_metrics :: killed find_related process') for p in pids: if p.is_alive(): try: logger.info('related_metrics :: stopping find_related - %s' % (str(p.is_alive()))) p.terminate() except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: related_metrics :: failed to stop find_related - %s' % e) run_every = 60 process_runtime = time() - now if process_runtime < run_every: sleep_for = (run_every - process_runtime) process_runtime_now = time() - now sleep_for = (run_every - process_runtime_now) logger.info('related_metrics :: sleeping for %.2f seconds due to low run time...' % sleep_for) sleep(sleep_for) try: del sleep_for except Exception as e: logger.error('error :: related_metrics :: failed to del sleep_for - %s' % e) try: del process_runtime except Exception as e: logger.error('error :: related_metrics :: failed to del process_runtime - %s' % e)