Source code for luminosity.cloudbursts

import logging
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
from os import kill, getpid
import traceback
from sqlalchemy.sql import select
# @added 20220722 - Task #4624: Change all dict copy to deepcopy
import copy

import settings
from skyline_functions import (
    get_redis_conn, get_redis_conn_decoded)
from database import get_engine, cloudburst_table_meta
# from functions.database.queries.base_name_from_metric_id import base_name_from_metric_id
from functions.metrics.get_base_name_from_metric_id import get_base_name_from_metric_id

skyline_app = 'luminosity'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)

full_uniques = '%sunique_metrics' % settings.FULL_NAMESPACE
LOCAL_DEBUG = False

run_every = 30
current_path = os.path.dirname(__file__)
root_path = os.path.dirname(current_path)


# @added 20210805 - Feature #4164: luminosity - cloudbursts
[docs]class Cloudbursts(Thread): """ The Cloudbursts class which controls the luminosity/cloudbursts thread and spawned processes. luminosity/cloudbursts finds metrics related to each cloudburst that have been identified by significant changepoints using the m66 algorithm. """ #### NOT USED #### def __init__(self, parent_pid): """ Initialize Cloudbursts """ super(Cloudbursts, self).__init__() self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: exit(0)
[docs] def run(self): """ - Called when the process intializes. - Determine if Redis is up - Determine new cloudbursts to find related metrics on - Wait for the process to finish. - run_every 30 seconds """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('luminosity/cloudbursts :: starting') try: SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME if SERVER_METRIC_PATH == '.': SERVER_METRIC_PATH = '' except Exception as e: SERVER_METRIC_PATH = '' logger.warning('warning :: luminosity/cloudbursts :: settings.SERVER_METRICS_NAME is not declared in settings.py, defaults to \'\' - %s' % e) while 1: now = time() # Make sure Redis is up try: self.redis_conn.ping() except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: cloudbursts cannot connect to redis at socket path %s - %s' % ( settings.REDIS_SOCKET_PATH, e)) sleep(10) try: self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) except Exception as e: logger.info(traceback.format_exc()) logger.error('error :: cloudbursts cannot connect to get_redis_conn - %s' % e) continue # Report app up try: self.redis_conn.setex('luminosity.cloudbursts', 120, now) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: cloudbursts :: could not update the Redis luminosity.cloudbursts key - %s' % e) # Spawn process pids = [] spawned_pids = [] pid_count = 0 try: p = Process(target=self.find_related, args=(1,)) pids.append(p) pid_count += 1 logger.info('cloudbursts :: starting find_related process') p.start() spawned_pids.append(p.pid) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: cloudbursts :: failed to spawn find_related process - %s' % e) # Self monitor processes and terminate if any find_cloudbursts # has run for longer than run_every - 10 p_starts = time() while time() - p_starts <= 3600: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('cloudbursts :: find_related process completed in %.2f seconds' % ( time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('cloudbursts :: timed out, killing find_related process') for p in pids: logger.info('cloudbursts :: killing find_related process') p.terminate() logger.info('cloudbursts :: killed find_related process') for p in pids: if p.is_alive(): try: logger.info('cloudbursts :: stopping find_related - %s' % (str(p.is_alive()))) p.terminate() except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: cloudbursts :: failed to stop find_related - %s' % e) process_runtime = time() - now if process_runtime < run_every: sleep_for = (run_every - process_runtime) process_runtime_now = time() - now sleep_for = (run_every - process_runtime_now) logger.info('cloudbursts :: sleeping for %.2f seconds due to low run time...' % sleep_for) sleep(sleep_for) try: del sleep_for except Exception as e: logger.error('error :: cloudbursts :: failed to del sleep_for - %s' % e) try: del process_runtime except Exception as e: logger.error('error :: cloudbursts :: failed to del process_runtime - %s' % e)