import logging
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
from os import kill, getpid
import traceback
from sqlalchemy.sql import select
# @added 20220722 - Task #4624: Change all dict copy to deepcopy
import copy
import settings
from skyline_functions import (
get_redis_conn, get_redis_conn_decoded)
from database import get_engine, cloudburst_table_meta
# from functions.database.queries.base_name_from_metric_id import base_name_from_metric_id
from functions.metrics.get_base_name_from_metric_id import get_base_name_from_metric_id
skyline_app = 'luminosity'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)
full_uniques = '%sunique_metrics' % settings.FULL_NAMESPACE
LOCAL_DEBUG = False
run_every = 30
current_path = os.path.dirname(__file__)
root_path = os.path.dirname(current_path)
# @added 20210805 - Feature #4164: luminosity - cloudbursts
[docs]class Cloudbursts(Thread):
"""
The Cloudbursts class which controls the luminosity/cloudbursts thread and
spawned processes. luminosity/cloudbursts finds metrics related to each
cloudburst that have been identified by significant changepoints using the
m66 algorithm.
"""
#### NOT USED ####
def __init__(self, parent_pid):
"""
Initialize Cloudbursts
"""
super(Cloudbursts, self).__init__()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
exit(0)
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up
- Determine new cloudbursts to find related metrics on
- Wait for the process to finish.
- run_every 30 seconds
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('luminosity/cloudbursts :: starting')
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except Exception as e:
SERVER_METRIC_PATH = ''
logger.warning('warning :: luminosity/cloudbursts :: settings.SERVER_METRICS_NAME is not declared in settings.py, defaults to \'\' - %s' % e)
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: cloudbursts cannot connect to redis at socket path %s - %s' % (
settings.REDIS_SOCKET_PATH, e))
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
except Exception as e:
logger.info(traceback.format_exc())
logger.error('error :: cloudbursts cannot connect to get_redis_conn - %s' % e)
continue
# Report app up
try:
self.redis_conn.setex('luminosity.cloudbursts', 120, now)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: cloudbursts :: could not update the Redis luminosity.cloudbursts key - %s' % e)
# Spawn process
pids = []
spawned_pids = []
pid_count = 0
try:
p = Process(target=self.find_related, args=(1,))
pids.append(p)
pid_count += 1
logger.info('cloudbursts :: starting find_related process')
p.start()
spawned_pids.append(p.pid)
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: cloudbursts :: failed to spawn find_related process - %s' % e)
# Self monitor processes and terminate if any find_cloudbursts
# has run for longer than run_every - 10
p_starts = time()
while time() - p_starts <= 3600:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('cloudbursts :: find_related process completed in %.2f seconds' % (
time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('cloudbursts :: timed out, killing find_related process')
for p in pids:
logger.info('cloudbursts :: killing find_related process')
p.terminate()
logger.info('cloudbursts :: killed find_related process')
for p in pids:
if p.is_alive():
try:
logger.info('cloudbursts :: stopping find_related - %s' % (str(p.is_alive())))
p.terminate()
except Exception as e:
logger.error(traceback.format_exc())
logger.error('error :: cloudbursts :: failed to stop find_related - %s' % e)
process_runtime = time() - now
if process_runtime < run_every:
sleep_for = (run_every - process_runtime)
process_runtime_now = time() - now
sleep_for = (run_every - process_runtime_now)
logger.info('cloudbursts :: sleeping for %.2f seconds due to low run time...' % sleep_for)
sleep(sleep_for)
try:
del sleep_for
except Exception as e:
logger.error('error :: cloudbursts :: failed to del sleep_for - %s' % e)
try:
del process_runtime
except Exception as e:
logger.error('error :: cloudbursts :: failed to del process_runtime - %s' % e)