Source code for thunder.thunder

import logging
import traceback
from time import time, sleep
from threading import Thread
from multiprocessing import Process

import os
from os import kill, getpid
import os.path
from ast import literal_eval

import settings
from skyline_functions import (
    mkdir_p, get_redis_conn, get_redis_conn_decoded, send_graphite_metric,
    write_data_to_file)
from thunder_alerters import thunder_alert
from functions.redis.update_set import update_redis_set
from functions.filesystem.remove_file import remove_file
from functions.thunder.check_thunder_failover_key import check_thunder_failover_key
from functions.thunder.alert_on_stale_metrics import alert_on_stale_metrics
from functions.thunder.alert_on_no_data import alert_on_no_data
# @added 20220202 - Feature #4412: flux - quota - thunder alert
from functions.thunder.alert_on_quota_exceeded import alert_on_quota_exceeded

skyline_app = 'thunder'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

this_host = str(os.uname()[1])
thunder_redis_set = 'thunder.events'
thunder_done_redis_set = 'thunder.events.done'

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

# The required THUNDER directories which are failed over to and
# used in the event that Redis is down
THUNDER_EVENTS_DIR = '%s/thunder/events' % settings.SKYLINE_TMP_DIR
THUNDER_KEYS_DIR = '%s/thunder/keys' % settings.SKYLINE_TMP_DIR

skyline_app_graphite_namespace = 'skyline.%s%s' % (skyline_app, SERVER_METRIC_PATH)

LOCAL_DEBUG = False


[docs]class Thunder(Thread): """ The Thunder class which controls the thunder thread and spawned processes. Thunder is ONLY for alerting on Skyline operations. Thunder checks on the other hand are carried out by analyzer/metrics_manager and other Skyline apps, which send events to thunder. thunder/rolling carries out internal and external checks and sends any events to thunder. """ def __init__(self, parent_pid): """ Initialize the Thunder Create the :obj:`self.redis_conn` connection Create the :obj:`self.redis_conn_decoded` connection """ super(Thunder, self).__init__() self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: logger.warning('warning :: parent or current process dead') exit(0)
[docs] def spin_thunder_process(self, i, validated_event_details, redis_item, event_file): """ Roll some thunder. :param i: python process id :return: anomalous :rtype: boolean """ def create_alert_cache_key(cache_key, expiry, timestamp): try: set_alert_cache_key = self.redis_conn.setex(cache_key, expiry, timestamp) if set_alert_cache_key: logger.info('set Redis key %s with %s TTL' % ( cache_key, str(expiry))) except Exception as e: logger.error('error :: set_alert_cache_key failed setting key - %s - %s' % ( cache_key, e)) # Add a key file thunder_keys_file = '%s/%s' % (THUNDER_KEYS_DIR, cache_key) thunder_keys_file_data = {'timestamp': timestamp, 'expiry': expiry} try: write_data_to_file( skyline_app, thunder_keys_file, 'w', str(thunder_keys_file_data)) logger.info('added Redis failover thunder_keys_file %s' % (thunder_keys_file)) except Exception as e: logger.error('error :: failed to add Redis failover thunder_keys_file - %s - %s' % (thunder_keys_file, e)) def remove_event(redis_item, event_file): if redis_item: # Delete the item from the Redis set try: # removed_item = update_redis_set( # @added 20220303 - Feature #4412: flux - quota - thunder alert do_log = True try: redis_item_dict = literal_eval(redis_item) if isinstance(redis_item_dict, dict): if redis_item_dict['event_type'] == 'metric_quota_exceeded': do_log = False logger.info('not logging total event string as metric_quota_exceeded, sampling with data removed') try: del redis_item_dict['data'] except Exception as err: logger.error('error :: failed to removed data key from dict - %s' % err) logger.info('sample: %s' % str(redis_item_dict)) except Exception as err: logger.error('error :: failed literal_eval the redis_item - %s' % err) update_redis_set( skyline_app, thunder_redis_set, redis_item, # @modified 20220303 - Feature #4412: flux - quota - thunder alert # 'remove', log=True) 'remove', do_log) # if removed_item: # logger.error('error :: could not determine event_details from %s Redis set entry (removed) - %s' % ( # thunder_redis_set, str(redis_item))) except Exception as e: logger.error('error :: could not remove item from Redis set %s - %s' % ( thunder_redis_set, e)) if event_file: # Delete the bad event_file removed_file = False try: removed_file = remove_file(skyline_app, event_file) except Exception as e: logger.error('error :: could not remove event_file %s - %s' % ( event_file, e)) if removed_file: logger.info('event_file removed - %s' % ( str(event_file))) spin_start = time() spin_thunder_process_pid = os.getpid() # @added 20220303 - Feature #4412: flux - quota - thunder alert # Do not log out metric_quota_exceeded as has too much data log_validated_event_details = True event_type = None try: event_type = str(validated_event_details['event_type']) if event_type == 'metric_quota_exceeded': log_validated_event_details = False except Exception as err: logger.error('error :: failed literal_eval the redis_item - %s' % err) if log_validated_event_details: logger.info('spin_thunder_process - %s, processing check - %s' % ( str(spin_thunder_process_pid), str(validated_event_details))) else: logger.info('spin_thunder_process - %s, processing check for %s (not logging validated_event_details)' % ( str(spin_thunder_process_pid), str(event_type))) try: level = str(validated_event_details['level']) event_type = str(validated_event_details['event_type']) message = str(validated_event_details['message']) app = str(validated_event_details['app']) metric = str(validated_event_details['metric']) source = str(validated_event_details['source']) expiry = int(validated_event_details['expiry']) timestamp = validated_event_details['timestamp'] alert_vias = validated_event_details['alert_vias'] data = validated_event_details['data'] event_file = validated_event_details['event_file'] except Exception as e: logger.error('error :: spin_thunder_process :: failed to determine variables from event_details - %s' % ( e)) # return # Handle thunder/rolling alerts first, these are defined by source being # thunder, thunder/rolling does not assign alert_vias per alert the # defaults are used if source == 'thunder' and alert_vias == ['default']: logger.info('spin_thunder_process - thunder rolling event') alert_vias = [] alert_via_smtp = True alert_via_slack = False alert_via_pagerduty = False try: alert_via_smtp = settings.THUNDER_CHECKS[app][event_type]['alert_via_smtp'] if alert_via_smtp: logger.info('spin_thunder_process - alert_via_smtp: %s' % str(alert_via_smtp)) except KeyError: alert_via_smtp = True except Exception as e: logger.error('error :: failed to determine alert_via_smtp for %s.%s check - %s' % ( app, event_type, e)) if alert_via_smtp: alert_vias.append('alert_via_smtp') logger.info('spin_thunder_process - alert_via_smtp appended to alert_vias') try: alert_via_slack = settings.THUNDER_CHECKS[app][event_type]['alert_via_slack'] logger.info('spin_thunder_process - alert_via_slack: %s' % str(alert_via_slack)) except KeyError: logger.error(traceback.format_exc()) logger.error('spin_thunder_process - alert_via_slack KeyError') alert_via_slack = False except Exception as e: logger.error('error :: failed to determine alert_via_slack for %s.%s check - %s' % ( app, event_type, e)) if alert_via_slack: alert_vias.append('alert_via_slack') logger.info('spin_thunder_process - alert_via_slack appended to alert_vias') try: alert_via_pagerduty = settings.THUNDER_CHECKS[app][event_type]['alert_via_pagerduty'] if alert_via_pagerduty: logger.info('spin_thunder_process - alert_via_pagerduty: %s' % str(alert_via_pagerduty)) except KeyError: alert_via_pagerduty = False except Exception as e: logger.error('error :: failed to determine alert_via_smtp for %s.%s check - %s' % ( app, event_type, e)) if alert_via_pagerduty: alert_vias.append('alert_via_pagerduty') logger.info('spin_thunder_process - alert_via_pagerduty appended to alert_vias') subject = message body = str(data) alerts_sent = 0 logger.info('spin_thunder_process - thunder rolling event alert_vias: %s' % str(alert_vias)) for alert_via in alert_vias: alert_sent = False try: if alert_via == 'alert_via_slack': title = 'Skyline Thunder - %s' % level.upper() with_subject = subject.replace(level, '') title = title + with_subject alert_sent = thunder_alert(alert_via, title, body) if alert_via == 'alert_via_smtp': title = 'Skyline Thunder - %s' % level.upper() with_subject = subject.replace(level, '') final_subject = title + with_subject alert_sent = thunder_alert(alert_via, final_subject, data['status']) if alert_via == 'alert_via_pagerduty': alert_sent = thunder_alert(alert_via, subject, str(body)) if alert_sent: logger.info('sent thunder_alert(%s, %s' % ( str(alert_via), str(subject))) alerts_sent += 1 except Exception as e: logger.error('error :: failed to alert_via %s for %s.%s check - %s' % ( alert_via, app, event_type, e)) cache_key = 'thunder.alert.%s.%s' % (app, event_type) if alerts_sent: if level == 'alert': create_alert_cache_key(cache_key, expiry, timestamp) remove_event(redis_item, event_file) logger.info('%s alerts sent for the %s alert_vias' % ( str(alerts_sent), str(len(alert_vias)))) # stale metric alerts if source == 'analyzer' and event_type == 'stale_metrics': alerts_sent_dict = {} try: parent_namespace = data['namespace'] stale_metrics = data['stale_metrics'] alerts_sent_dict = alert_on_stale_metrics(self, level, message, parent_namespace, stale_metrics, data) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: alert_on_stale_metrics failed for %s - %s' % ( parent_namespace, e)) all_sent = False if alerts_sent_dict: all_sent = alerts_sent_dict['all_sent'] logger.info('%s alerts of %s sent for stale_metrics on %s' % ( str(alerts_sent_dict['to_send']), str(alerts_sent_dict['sent']), parent_namespace)) if not all_sent: logger.warning('warning :: all alerts were not sent - %s' % ( str(alerts_sent_dict))) if all_sent: if level == 'alert': cache_key = 'thunder.alert.%s.%s.%s.%s' % ( app, event_type, level, str(timestamp)) create_alert_cache_key(cache_key, expiry, timestamp) remove_event(redis_item, event_file) # no_data alerts if source == 'analyzer' and event_type == 'no_data': alerts_sent_dict = {} parent_namespace = None try: parent_namespace = data['namespace'] except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: could not determine parent_namespace from data %s - %s' % ( str(data), e)) remove_event(redis_item, event_file) send_no_data_alert = True if parent_namespace: if level == 'alert': thunder_no_data_alert_key = 'thunder.alert.no_data.%s' % parent_namespace thunder_no_data_alert_key_exists = False try: thunder_no_data_alert_key_exists = self.redis_conn_decoded.get(thunder_no_data_alert_key) if thunder_no_data_alert_key_exists: send_no_data_alert = False logger.info('Redis key %s exists, not send no_data alert for %s' % ( thunder_no_data_alert_key, parent_namespace)) remove_event(redis_item, event_file) except Exception as e: logger.error('error :: failed Redis key %s - %s' % ( thunder_no_data_alert_key, e)) if parent_namespace and send_no_data_alert: try: alerts_sent_dict = alert_on_no_data(self, level, message, parent_namespace, data) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: could not remove event_file %s - %s' % ( event_file, e)) all_sent = False if alerts_sent_dict: all_sent = alerts_sent_dict['all_sent'] logger.info('%s alerts of %s sent for no_data on %s' % ( str(alerts_sent_dict['to_send']), str(alerts_sent_dict['sent']), parent_namespace)) if not all_sent: logger.warning('warning :: all alerts were not sent - %s' % ( str(alerts_sent_dict))) if all_sent: remove_event(redis_item, event_file) # @added 20220202 - Feature #4412: flux - quota - thunder alert # metrics over quota alerts if source == 'flux' and event_type == 'metric_quota_exceeded': alerts_sent_dict = {} try: parent_namespace = data['namespace'] rejected_metrics = data['rejected_metrics'] alerts_sent_dict = alert_on_quota_exceeded(self, level, message, parent_namespace, expiry, rejected_metrics, data) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: alert_on_quota_exceeded failed for %s - %s' % ( parent_namespace, err)) all_sent = False if alerts_sent_dict: all_sent = alerts_sent_dict['all_sent'] logger.info('%s alerts of %s sent for metric_quota_exceeded on %s' % ( str(alerts_sent_dict['to_send']), str(alerts_sent_dict['sent']), parent_namespace)) if not all_sent: logger.warning('warning :: all alerts were not sent - %s' % ( str(alerts_sent_dict))) if all_sent: cache_key = 'thunder.alert.%s.%s.%s.%s.%s' % ( app, event_type, level, parent_namespace, str(timestamp)) create_alert_cache_key(cache_key, expiry, str(validated_event_details)) remove_event(redis_item, event_file) spin_end = time() - spin_start logger.info('spin_thunder_process took %.2f seconds' % spin_end) return
[docs] def run(self): """ - Called when the process intializes. - Determine if Redis is up and discover checks to run. - If Redis is down and discover checks to run from the filesystem. - Process event. - Wait for the processes to finish. - Repeat. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) else: logger.info('bin/%s.d log management done' % skyline_app) try: SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME if SERVER_METRIC_PATH == '.': SERVER_METRIC_PATH = '' logger.info('SERVER_METRIC_PATH is set from settings.py to %s' % str(SERVER_METRIC_PATH)) except: SERVER_METRIC_PATH = '' logger.info('warning :: SERVER_METRIC_PATH is not declared in settings.py, defaults to \'\'') logger.info('skyline_app_graphite_namespace is set to %s' % str(skyline_app_graphite_namespace)) if not os.path.exists(settings.SKYLINE_TMP_DIR): try: mkdir_p(settings.SKYLINE_TMP_DIR) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to create %s - %s' % (settings.SKYLINE_TMP_DIR, e)) # Create the required THUNDER directories which are failed over to and # used in the event that Redis is down if not os.path.exists(THUNDER_EVENTS_DIR): try: mkdir_p(THUNDER_EVENTS_DIR) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to create %s - %s' % (THUNDER_EVENTS_DIR, e)) if not os.path.exists(THUNDER_KEYS_DIR): try: mkdir_p(THUNDER_KEYS_DIR) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to create %s - %s' % (THUNDER_KEYS_DIR, e)) last_sent_to_graphite = int(time()) thunder_alerts_sent = 0 last_check_for_events_on_filesystem = int(last_sent_to_graphite) while True: now = time() # Make sure Redis is up try: self.redis_conn.ping() logger.info('Redis ping OK') except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: cannot connect to redis at socket path %s - %s' % ( settings.REDIS_SOCKET_PATH, e)) sleep(10) try: self.redis_conn = get_redis_conn(skyline_app) logger.info('connected via get_redis_conn') except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: not connected via get_redis_conn - %s' % e) # continue try: self.redis_conn_decoded.ping() except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: not connected via get_redis_conn_decoded - %s' % e) sleep(10) try: self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) logger.info('connected via get_redis_conn_decoded') except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: cannot connect via get_redis_conn_decoded - %s' % e) # continue # Determine if any metric has been added to process thunder_last_run = int(time()) total_thunder_events_item_count = 0 while True: validated_event_details = {} current_timestamp = int(time()) if total_thunder_events_item_count == 0: if (current_timestamp - thunder_last_run) < 3: sleep(2) thunder_last_run = int(current_timestamp) # Report app AND Redis as up redis_is_up = False try: redis_is_up = self.redis_conn.setex(skyline_app, 120, current_timestamp) if redis_is_up: try: logger.info('set thunder Redis key') self.redis_conn.setex('redis', 120, current_timestamp) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: could not update the Redis redis key - %s' % ( e)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: could not update the Redis %s key - %s' % ( skyline_app, e)) if not settings.THUNDER_ENABLED: logger.info('sleeping 59 seconds settings.THUNDER_ENABLED not set') sleep(59) logger.info('breaking after 59 seconds settings.THUNDER_ENABLED not set') break # break to send metrics if int(time()) >= (last_sent_to_graphite + 60): logger.info('breaking as last_sent_to_graphite was > 59 seconds ago') break # Determine events to process from the Redis set thunder_events = [] if redis_is_up: try: # @modified 20220110 - Bug #4364: Prune old thunder.events # Branch #1444: thunder # thunder_events = self.redis_conn_decoded.smembers(thunder_redis_set) thunder_events = list(self.redis_conn_decoded.smembers(thunder_redis_set)) except Exception as e: logger.error('error :: could not query Redis for set %s - %s' % (thunder_redis_set, e)) # If no data was returned from Redis ensure thunder_events is # a set so that any event_files can be added to the set if thunder_events: logger.info('%s entries in thunder.events Redis set' % str(len(thunder_events))) if not isinstance(thunder_events, set): thunder_events = set(thunder_events) else: logger.info('no entries in thunder.events Redis set') thunder_events = [] # Check the filesystem for failover event files filesystem_check_timestamp = int(time()) if (last_check_for_events_on_filesystem + 60) >= filesystem_check_timestamp: last_check_for_events_on_filesystem = filesystem_check_timestamp logger.info('checking for failover event files in %s' % THUNDER_EVENTS_DIR) thunder_event_files_count = 0 for root, dirs, files in os.walk(THUNDER_EVENTS_DIR): if files: for file in files: event_file = '%s/%s' % (root, file) try: data_dict = None try: with open(event_file, 'r') as f: data_dict_str = f.read() except Exception as e: logger.error('error :: failed to open event_file: %s - %s' % (event_file, e)) try: data_dict = literal_eval(data_dict_str) if data_dict: data_dict['event_file'] = event_file thunder_events.add(str(data_dict)) except Exception as e: logger.error('error :: failed to literal_eval event_file: %s - %s' % (event_file, e)) except Exception as e: logger.error('failed evaluate event_file %s - %s' % ( event_file, e)) logger.info('%s thunder failover event files found' % str(thunder_event_files_count)) # Check the filesystem for failover key files logger.info('checking for failover keys in %s' % THUNDER_KEYS_DIR) thunder_key_files_count = 0 for root, dirs, files in os.walk(THUNDER_KEYS_DIR): if files: for file in files: thunder_key_file = '%s/%s' % (root, file) try: key_dict = None try: with open(thunder_key_file, 'r') as f: key_dict_str = f.read() key_dict = literal_eval(key_dict_str) thunder_key_files_count += 1 except Exception as e: logger.error('error :: failed to open thunder_key_file: %s - %s' % ( thunder_key_file, e)) timestamp = 0 if key_dict: try: timestamp = key_dict['timestamp'] expiry = int(key_dict['expiry']) except Exception as e: logger.error('error :: failed to determine timestamp and expiry from key_dict created from thunder_key_file: %s - %s' % ( thunder_key_file, e)) if timestamp: now = int(time()) if (timestamp + expiry) >= now: expiry = 0 try: removed_file = remove_file(thunder_key_file) if removed_file: logger.info('removed expired thunder_key_file: %s' % ( thunder_key_file)) except Exception as e: logger.error('error :: failed to remove %s, continuing - %s' % ( thunder_key_file, e)) if (timestamp + expiry) <= now: expiry = now - (timestamp + expiry) except Exception as e: logger.error('failed evaluate thunder_key_file: %s - %s' % ( thunder_key_file, e)) logger.info('%s thunder failover key files found' % str(thunder_key_files_count)) # @added 20220222 -Branch #1444: thunder # Create a list for redis_items redis_items = [] total_thunder_events_item_count = len(thunder_events) validated_event_details = {} if thunder_events: logger.info('getting a thunder event to process from the %s events' % str(total_thunder_events_item_count)) for index, event_item in enumerate(thunder_events): # if validated_event_details: # break try: remove_item = False redis_item = event_item try: event_details = literal_eval(event_item) except Exception as e: remove_item = True event_details = None logger.error('error :: could not determine event_details from %s Redis set entry - %s' % ( thunder_redis_set, e)) missing_required_keys = False if event_details: try: event_type = str(event_details['event_type']) except KeyError: event_type = str(event_details['type']) except Exception as e: logger.error('error :: failed to determine type from event_details dict - %s' % ( e)) event_type = False if event_type != 'metric_quota_exceeded': logger.info('validating thunder event_details: %s' % str(event_details)) else: logger.info('validating thunder event_details for %s' % str(event_type)) try: level = str(event_details['level']) except KeyError: level = 'alert' except Exception as e: logger.error('error :: failed to determine level from event_details dict set to alert - %s' % ( e)) level = 'alert' validated_event_details['level'] = level try: event_type = str(event_details['event_type']) except KeyError: event_type = str(event_details['type']) except Exception as e: logger.error('error :: failed to determine type from event_details dict - %s' % ( e)) event_type = False validated_event_details['event_type'] = event_type try: message = str(event_details['message']) except KeyError: message = False except Exception as e: logger.error('error :: failed to determine message from event_details dict - %s' % ( e)) message = False validated_event_details['message'] = message try: app = str(event_details['app']) except KeyError: app = False except Exception as e: logger.error('error :: failed to determine app from event_details dict - %s' % ( e)) app = False validated_event_details['app'] = app try: metric = str(event_details['metric']) except KeyError: metric = False except Exception as e: logger.error('error :: failed to determine metric from event_details dict - %s' % ( e)) metric = False validated_event_details['metric'] = metric try: source = str(event_details['source']) except KeyError: source = False except Exception as e: logger.error('error :: failed to determine source from event_details dict - %s' % ( e)) source = False validated_event_details['source'] = source try: expiry = int(event_details['expiry']) except KeyError: expiry = 900 except Exception as e: logger.error('error :: failed to determine expiry from event_details dict - %s' % ( e)) expiry = 900 validated_event_details['expiry'] = expiry try: timestamp = event_details['timestamp'] except KeyError: timestamp = int(time()) except Exception as e: logger.error('error :: failed to determine timestamp from event_details dict - %s' % ( e)) timestamp = int(time()) validated_event_details['timestamp'] = timestamp try: alert_vias = event_details['alert_vias'] except KeyError: alert_vias = [] except Exception as e: logger.error('error :: failed to determine alert_vias from event_details dict - %s' % ( e)) alert_vias = [] validated_event_details['alert_vias'] = alert_vias if source == 'thunder': validated_event_details['alert_vias'] = ['default'] try: data = event_details['data'] except Exception as e: logger.error('error :: failed to determine data from event_details dict - %s' % ( e)) data = {'status': None} validated_event_details['data'] = data # Add the event_file, this is related to files used # for events and keys where a Redis failure is # experienced try: event_file = event_details['event_file'] except KeyError: event_file = None except Exception as e: logger.error('error :: failed to determine event_file from event_details dict - %s' % ( e)) event_file = None validated_event_details['event_file'] = event_file if not event_type: missing_required_keys = True if not app: missing_required_keys = True if not message: missing_required_keys = True if missing_required_keys or remove_item: logger.info('invalidating thunder event_details, missing_required_keys: %s' % str(missing_required_keys)) validated_event_details = {} if not event_file: # Delete the bad item in the Redis set try: # @modified 20210907 - Bug #4258: cleanup thunder.events # removed_item = update_redis_set( update_redis_set( skyline_app, thunder_redis_set, event_item, 'remove', True) # if removed_item: # logger.error('error :: could not determine event_details from %s Redis set entry (removed) - %s' % ( # thunder_redis_set, str(event_item))) except Exception as e: logger.error('error :: could not remove bad item from Redis set %s - %s' % ( thunder_redis_set, e)) else: # Delete the bad event_file removed_file = False try: removed_file = remove_file(skyline_app, event_file) except Exception as e: logger.error('error :: could not remove bad event_file %s - %s' % ( event_file, e)) if removed_file: logger.error('error :: could not determine event_details from the event_file (removed) - %s' % ( str(event_file))) continue except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: validating and checking event - %s' % ( e)) if validated_event_details: logger.info('thunder event_details validated') if validated_event_details['source'] == 'thunder': validated_event_details['alert_vias'] = ['default'] logger.info('thunder event_details validated') # @added 20220222 -Branch #1444: thunder # Add to list for redis_items redis_items.append(redis_item) # Check if an alert has gone out if so removed the item if validated_event_details and level == 'alert': alert_cache_key = 'thunder.alert.%s.%s.%s.%s' % ( app, event_type, level, str(timestamp)) alerted = None try: alerted = self.redis_conn_decoded.get(alert_cache_key) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to get %s Redis key - %s' % ( alert_cache_key, e)) if not alerted: alerted = check_thunder_failover_key(self, alert_cache_key) if alerted: logger.info('alert already sent for %s, removing event item' % alert_cache_key) validated_event_details = {} if redis_item: # Delete the item from the Redis set try: # @added 20220303 - Feature #4412: flux - quota - thunder alert do_log = True try: redis_item_dict = literal_eval(redis_item) if isinstance(redis_item_dict, dict): if redis_item_dict['event_type'] == 'metric_quota_exceeded': do_log = False logger.info('not logging total event string as metric_quota_exceeded, sampling with data removed') try: del redis_item_dict['data'] except Exception as err: logger.error('error :: failed to removed data key from dict - %s' % err) logger.info('sample: %s' % str(redis_item_dict)) except Exception as err: logger.error('error :: failed literal_eval the redis_item - %s' % err) # @modified 20210907 - Bug #4258: cleanup thunder.events # removed_item = update_redis_set( update_redis_set( skyline_app, thunder_redis_set, redis_item, # @modified 20220303 - Feature #4412: flux - quota - thunder alert # 'remove', log=True) 'remove', do_log) # if removed_item: # logger.info('alert key exists, removed event_details from %s Redis set entry - %s' % ( # thunder_redis_set, str(redis_item))) except Exception as e: logger.error('error :: could not remove item from Redis set %s - %s' % ( thunder_redis_set, e)) if event_file: # Delete the bad event_file removed_file = False try: removed_file = remove_file(skyline_app, event_file) except Exception as e: logger.error('error :: could not remove event_file %s - %s' % ( event_file, e)) if removed_file: logger.info('alert key exists, event_file removed - %s' % ( str(event_file))) continue if validated_event_details: # Check if the event has been actioned in the # current run, if so skip. # until the key expires current_event_cache_key = 'thunder.current.%s.%s.%s.%s' % ( app, event_type, level, str(timestamp)) current_event = None try: current_event = self.redis_conn_decoded.get(current_event_cache_key) if current_event: logger.info('current_event_cache_key exist in Redis %s for this event, skipping' % current_event_cache_key) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to get %s Redis key - %s' % ( current_event_cache_key, e)) if not current_event: current_event = check_thunder_failover_key(self, current_event_cache_key) if current_event: logger.info('current_event_cache_key exist %s as a check_thunder_failover_key for this event, skipping' % current_event_cache_key) if current_event: validated_event_details = {} logger.info('current_event_cache_key exist %s for this event, skipping' % current_event_cache_key) # continue if validated_event_details: try: self.redis_conn_decoded.setex(current_event_cache_key, 59, int(time())) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to setex %s Redis key - %s' % ( current_event_cache_key, e)) try: failover_key_file = '%s/%s' % (THUNDER_KEYS_DIR, current_event_cache_key) failover_key_data = {'timestamp': int(time()), 'expiry': 59} write_data_to_file( skyline_app, failover_key_file, 'w', str(failover_key_data)) logger.info('added Redis failover - failover_key_file - %s' % (failover_key_file)) except Exception as e: logger.error('error :: failed to add Redis failover failover_key_file - %s - %s' % (failover_key_file, e)) # @modified 20220110 - Bug #4364: Prune old thunder.events # Branch #1444: thunder # redis_item = event_item break if not validated_event_details: sleep_for = 30 right_now = int(time()) next_send_to_graphite = last_sent_to_graphite + 60 if right_now >= next_send_to_graphite: sleep_for = 0.1 if (next_send_to_graphite - right_now) < sleep_for: sleep_for = next_send_to_graphite - right_now logger.info('no validated_event_details sleeping for %s seconds' % str(sleep_for)) sleep(sleep_for) if int(time()) >= (last_sent_to_graphite + 60): logger.info('breaking to sending Graphite metrics') break if validated_event_details: logger.info('processing 1 event of %s thunder events to process' % str(total_thunder_events_item_count)) # Spawn processes pids = [] spawned_pids = [] pid_count = 0 THUNDER_PROCESSES = 1 for i in range(1, THUNDER_PROCESSES + 1): p = Process(target=self.spin_thunder_process, args=(i, validated_event_details, redis_item, event_file)) pids.append(p) pid_count += 1 logger.info('starting spin_thunder_process') p.start() spawned_pids.append(p.pid) thunder_alerts_sent += 1 # Send wait signal to zombie processes # for p in pids: # p.join() # Self monitor processes and terminate if any spin_thunder_process # that has run for longer than 58 seconds p_starts = time() while time() - p_starts <= 58: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('1 spin_thunder_process completed in %.2f seconds' % (time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('timed out, killing all spin_thunder_process processes') for p in pids: p.terminate() # p.join() for p in pids: if p.is_alive(): logger.info('stopping spin_thunder_process - %s' % (str(p.is_alive()))) p.join() # @added 20210907 - Bug #4258: cleanup thunder.events # Remove event # @modified 20220222 -Branch #1444: thunder # Iterate redis_items list, rather than assuming a single # redis_item for redis_item in redis_items: try: # @added 20220303 - Feature #4412: flux - quota - thunder alert do_log = True try: redis_item_dict = literal_eval(redis_item) if isinstance(redis_item_dict, dict): if redis_item_dict['event_type'] == 'metric_quota_exceeded': do_log = False logger.info('not logging total event string as metric_quota_exceeded, sampling with data removed') try: del redis_item_dict['data'] except Exception as err: logger.error('error :: failed to removed data key from dict - %s' % err) logger.info('sample: %s' % str(redis_item_dict)) except Exception as err: logger.error('error :: failed literal_eval the redis_item - %s' % err) update_redis_set( skyline_app, thunder_redis_set, redis_item, 'remove', do_log) except Exception as e: logger.error('error :: could not remove item from Redis set %s - %s' % ( thunder_redis_set, e)) if int(time()) >= (last_sent_to_graphite + 60): logger.info('sending Graphite metrics') logger.info('alerts.sent :: %s' % str(thunder_alerts_sent)) send_metric_name = '%s.alerts.sent' % skyline_app_graphite_namespace send_graphite_metric(skyline_app, send_metric_name, str(thunder_alerts_sent)) last_sent_to_graphite = int(time()) thunder_alerts_sent = 0 # @modified 20210909 - Bug #4258: cleanup thunder.events # Not required here # try: # thunder_events = self.redis_conn_decoded.smembers(thunder_redis_set) # except Exception as e: # logger.error('error :: could not query Redis for set %s - %s' % (thunder_redis_set, e)) # @added 20210907 - Bug #4258: cleanup thunder.events # The original version of thunder never removed the # thunder.events after processing, the event was only # removed if it was bad. Therefore no stale, no_data or # recovered events were removed from the thunder.events # Redis set. # This feature works to be able to clean up the # thunder.events of any big thunder.events sets and manages # the set going forward. thunder_events_list = [] logger.info('managing thunder.events Redis set and removing any items older than 86400') try: thunder_events_list = list(self.redis_conn_decoded.smembers(thunder_redis_set)) except Exception as e: logger.error('error :: could not query Redis for set %s - %s' % (thunder_redis_set, e)) if not thunder_events_list: logger.info('managed thunder.events Redis set, no items in set') if thunder_events_list: logger.info('managing %s items in thunder.events Redis set' % str(len(thunder_events_list))) for thunder_event_str in thunder_events_list: try: thunder_event = None remove_item = False try: thunder_event = literal_eval(thunder_event_str) except Exception as err: logger.error('error :: could not literal_eval(thunder_events_str) - %s' % str(err)) thunder_event_timestamp = 0 if thunder_event: try: thunder_event_timestamp = int(thunder_event['timestamp']) except KeyError: # No timestamp, remove event remove_item = thunder_event_str thunder_event_timestamp = None if thunder_event_timestamp: # @modified 20220307 - # if thunder_event_timestamp > (last_sent_to_graphite - 86400): if (current_timestamp - 86400) > thunder_event_timestamp: remove_item = thunder_event_str if remove_item: # Remove event # @added 20220303 - Feature #4412: flux - quota - thunder alert do_log = True try: redis_item_dict = literal_eval(thunder_event_str) if isinstance(redis_item_dict, dict): if redis_item_dict['event_type'] == 'metric_quota_exceeded': do_log = False logger.info('removing event from thunder.events Redis set as the event is older than 86400 seconds') logger.info('not logging total event string as metric_quota_exceeded, sampling with data removed') try: del redis_item_dict['data'] except Exception as err: logger.error('error :: failed to removed data key from dict - %s' % err) logger.info('sample: %s' % str(redis_item_dict)) except Exception as err: logger.error('error :: failed literal_eval the redis_item - %s' % err) if do_log: logger.info('removing event from thunder.events Redis set as the event is older than 86400 seconds. event: %s' % ( str(thunder_event))) try: update_redis_set( # @modified 20220222 -Branch #1444: thunder # Use thunder_event_str rather than redis_item # skyline_app, thunder_redis_set, redis_item, skyline_app, thunder_redis_set, thunder_event_str, 'remove', do_log) except Exception as e: logger.error('error :: could not remove item from Redis set %s - %s' % ( thunder_redis_set, e)) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: failed to manage a thunder_event in the thunder_events_list - %s' % err)