Source code for thunder.thunder_alert

import logging
import traceback
from time import time, sleep
from threading import Thread
from multiprocessing import Process

import os
from os import kill, getpid
import os.path
from ast import literal_eval

import settings
from skyline_functions import get_redis_conn_decoded

from analyzer.alerters import trigger_alert as analyzer_trigger_alert
from mirage.mirage_alerters import trigger_alert as mirage_trigger_alert

from functions.graphite.send_graphite_metric import send_graphite_metric

skyline_app = 'thunder'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

this_host = str(os.uname()[1])

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)

LOCAL_DEBUG = False

# @added 20240304 - Feature #5302: thunder.thunder_alert
#                   Task #5300: Decouple alerting from analysis
[docs] class ThunderAlert(Thread): """ The ThunderAlert class which controls the thunder_alert thread and spawned processes. ThunderAlert is alerting for analyzer and mirage. """ def __init__(self, parent_pid): """ Initialize the ThunderAlert Create the :obj:`self.redis_conn_decoded` connection """ super(ThunderAlert, self).__init__() self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: logger.warning('warning :: parent or current process dead') exit(0)
[docs] def remove_thunder_alert_key(self, redis_thunder_alert_key): deleted_key = 0 try: deleted_key = self.redis_conn_decoded.hdel('thunder.thunder_alert.alerts', redis_thunder_alert_key) except Exception as err: logger.error('error :: thunder_alert :: remove_thunder_alert_key failed to remove %s, err: %s' % ( redis_thunder_alert_key, err)) if deleted_key: logger.info('thunder_alert :: remove_thunder_alert_key removed %s' % ( redis_thunder_alert_key)) return deleted_key
[docs] def spin_thunder_alert(self, i, redis_thunder_alert_key): """ Send an alert. :param i: python process id :return: anomalous :rtype: boolean """ spin_start = time() logger.info('spin_thunder_alert :: process %s - alerting for %s' % (str(i), redis_thunder_alert_key)) alert_data_str = None try: alert_data_str = self.redis_conn_decoded.hget('thunder.thunder_alert.alerts', redis_thunder_alert_key) except Exception as err: logger.error('error :: spin_thunder_alert :: hget failed on %s, err: %s' % ( redis_thunder_alert_key, err)) self.remove_thunder_alert_key(redis_thunder_alert_key) return False alert_dict = {} try: alert_dict = literal_eval(alert_data_str) except Exception as err: logger.error('error :: spin_thunder_alert :: literal_eval failed on data from %s, alert_data_str: %s, err: %s' % ( redis_thunder_alert_key, str(alert_data_str), err)) self.remove_thunder_alert_key(redis_thunder_alert_key) return False logger.info('spin_thunder_alert :: alert_dict: %s' % str(alert_dict)) for key in ['app', 'alert', 'metric', 'context']: try: if key == 'app': alert_app = alert_dict[key] if key == 'alert': alert = alert_dict[key] if key == 'metric': metric = alert_dict[key] if key == 'context': context = alert_dict[key] except Exception as err: logger.error('error :: spin_thunder_alert :: failed to determine %s, err: %s' % ( key, err)) self.remove_thunder_alert_key(redis_thunder_alert_key) logger.error('error :: spin_thunder_alert :: failed on %s' % ( redis_thunder_alert_key)) return False if alert_app == 'mirage': try: second_order_resolution_seconds = alert_dict['second_order_resolution_seconds'] except Exception as err: logger.error('error :: spin_thunder_alert :: failed to determine second_order_resolution_seconds, err: %s' % ( err)) self.remove_thunder_alert_key(redis_thunder_alert_key) logger.error('error :: spin_thunder_alert :: failed on %s' % ( redis_thunder_alert_key)) return False try: triggered_algorithms = alert_dict['triggered_algorithms'] except Exception as err: logger.error('error :: spin_thunder_alert :: failed to determine triggered_algorithms, err: %s' % ( err)) self.remove_thunder_alert_key(redis_thunder_alert_key) logger.error('error :: spin_thunder_alert :: failed on %s' % ( redis_thunder_alert_key)) return False mock_alert = False if 'mock_alert' in alert_dict: try: mock_alert = alert_dict['mock_alert'] except KeyError: mock_alert = False if mock_alert: logger.info('spin_thunder_alert :: mock_alert, doing nothing') #alert_app = 'mock_alert' if alert_app == 'analyzer': try: analyzer_trigger_alert(alert, metric, context) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: spin_thunder_alert :: analyzer_trigger_alert failed, err: %s' % ( err)) self.remove_thunder_alert_key(redis_thunder_alert_key) logger.error('error :: spin_thunder_alert :: failed on %s' % ( redis_thunder_alert_key)) return False if alert_app == 'mirage': try: mirage_trigger_alert(alert, metric, second_order_resolution_seconds, context, triggered_algorithms) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: spin_thunder_alert :: mirage_trigger_alert failed, err: %s' % ( err)) self.remove_thunder_alert_key(redis_thunder_alert_key) logger.error('error :: spin_thunder_alert :: failed on %s' % ( redis_thunder_alert_key)) return False self.remove_thunder_alert_key(redis_thunder_alert_key) spin_end = time() - spin_start logger.info('spin_thunder_alert took %.2f seconds' % spin_end) return
[docs] def run(self): """ - Called when the process intializes. - Determine if Redis is up and discover checks to run. - If Redis is down and discover checks to run from the filesystem. - Process event. - Wait for the processes to finish. - Repeat. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s thunder_alert' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) else: logger.info('bin/%s.d log management done' % skyline_app) try: SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME if SERVER_METRIC_PATH == '.': SERVER_METRIC_PATH = '' logger.info('SERVER_METRIC_PATH is set from settings.py to %s' % str(SERVER_METRIC_PATH)) except: SERVER_METRIC_PATH = '' logger.info('warning :: SERVER_METRIC_PATH is not declared in settings.py, defaults to \'\'') logger.info('skyline_app_graphite_namespace is set to %s' % str(skyline_app_graphite_namespace)) last_sent_to_graphite = int(time()) thunder_alerts_sent = 0 while True: now = time() # Make sure Redis is up try: self.redis_conn_decoded.ping() except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: thunder_alert :: not connected via get_redis_conn_decoded - %s' % err) sleep(10) try: self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) logger.info('thunder_alert :: connected via get_redis_conn_decoded') except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: thunder_alert :: cannot connect via get_redis_conn_decoded - %s' % err) # Determine if any alerts have been added to process thunder_last_run = int(time()) while True: current_timestamp = int(time()) if current_timestamp >= (last_sent_to_graphite + 60): logger.info('thunder_alert :: sending Graphite metrics') logger.info('thunder_alert :: alerts.sent :: %s' % str(thunder_alerts_sent)) send_metric_name = '%s.thunder_alert.alerts.sent' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, str(thunder_alerts_sent)) last_sent_to_graphite = int(time()) thunder_alerts_sent = 0 if (current_timestamp - thunder_last_run) < 3: sleep(3) thunder_last_run = int(time()) # Report app AND Redis as up redis_is_up = False try: redis_is_up = self.redis_conn_decoded.setex('thunder.thunder_alert', 120, current_timestamp) if redis_is_up: try: logger.info('thunder_alert :: set thunder Redis key') self.redis_conn_decoded.setex('redis', 120, current_timestamp) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: thunder_alert :: could not update the Redis redis key - %s' % ( err)) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: thunder_alert :: could not update the Redis %s key - %s' % ( skyline_app, err)) if not settings.THUNDER_ENABLED: logger.info('thunder_alert :: sleeping 59 seconds settings.THUNDER_ENABLED not set') sleep(59) logger.info('thunder_alert :: breaking after 59 seconds settings.THUNDER_ENABLED not set') break # Determine events to process from the Redis set thunder_alerts = {} thunder_alerts_exists = False try: thunder_alerts_exists = self.redis_conn_decoded.exists('thunder.thunder_alert.alerts') except Exception as err: logger.error('error :: thunder_alert :: hgetall thunder.thunder_alert.alerts failed, err: %s' % err) if thunder_alerts_exists: try: thunder_alerts = self.redis_conn_decoded.hkeys('thunder.thunder_alert.alerts') except Exception as err: logger.error('error :: thunder_alert :: hkeys thunder.thunder_alert.alerts failed, err: %s' % err) # If no data was returned from Redis ensure thunder_events is # a set so that any event_files can be added to the set if thunder_alerts: logger.info('%s entries in thunder.events Redis set' % str(len(thunder_alerts))) else: logger.info('no entries in thunder.thunder_alert.alerts') break redis_thunder_alert_keys = [] redis_thunder_alert_keys = sorted(thunder_alerts) logger.info('%s alerts to send' % str(len(redis_thunder_alert_keys))) while len(redis_thunder_alert_keys) > 0: if int(time()) >= (last_sent_to_graphite + 60): break # Spawn processes pids = [] spawned_pids = [] pid_count = 0 THUNDER_PROCESSES = 2 for i in range(1, THUNDER_PROCESSES + 1): if len(redis_thunder_alert_keys) == 0: break redis_thunder_alert_key = None try: redis_thunder_alert_key = redis_thunder_alert_keys[0] except: redis_thunder_alert_key = None if redis_thunder_alert_key: p = Process(target=self.spin_thunder_alert, args=(i, redis_thunder_alert_key)) pids.append(p) pid_count += 1 logger.info('starting spin_thunder_process') p.start() spawned_pids.append(p.pid) thunder_alerts_sent += 1 del redis_thunder_alert_keys[0] # Self monitor processes and terminate if any spin_thunder_process # that has run for longer than 30 seconds p_starts = time() while time() - p_starts <= 30: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('%s spin_thunder_process process completed in %.2f seconds' % ( str(len(spawned_pids)), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('timed out, killing all spin_thunder_process processes') for p in pids: p.terminate() # p.join() for p in pids: if p.is_alive(): logger.info('stopping spin_thunder_process - %s' % (str(p.is_alive()))) killing_pid = p.pid logger.info('kill spin_thunder_process with pid: %s' % (str(killing_pid))) p.terminate() logger.info('killed spin_thunder_process process with pid: %s' % (str(killing_pid))) if int(time()) >= (last_sent_to_graphite + 60): logger.info('sending Graphite metrics') logger.info('alerts.sent :: %s' % str(thunder_alerts_sent)) send_metric_name = '%s.thunder_alert.alerts.sent' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, send_metric_name, str(thunder_alerts_sent)) last_sent_to_graphite = int(time()) thunder_alerts_sent = 0