Source code for horizon.worker

"""
worker.py
"""
from __future__ import division
# from os import kill, system
from os import kill
from multiprocessing import Process
try:
    from Queue import Empty
except ImportError:
    from queue import Empty
from time import time, sleep

import traceback
import logging
# import socket

# import sys
import os.path
from os import remove as os_remove

# @added 20190130 - Task #2690: Test Skyline on Python-3.6.7
#                   Branch #3262: py3
from sys import version_info
from sys import exit as sys_exit

from msgpack import packb
from redis import StrictRedis, WatchError

import settings
# @modified 20220216 - Feature #4446: Optimise horizon worker in_skip_list
# Added get_redis_conn_decoded
from skyline_functions import send_graphite_metric, get_redis_conn_decoded

parent_skyline_app = 'horizon'
child_skyline_app = 'worker'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s.%s' % (
    parent_skyline_app, SERVER_METRIC_PATH, child_skyline_app)

LOCAL_DEBUG = False

# @added 20170319 - Feature #1978: worker - DO_NOT_SKIP_LIST
try:
    # @modified 20200606 - Bug #3572: Apply list to settings import
    DO_NOT_SKIP_LIST = list(settings.DO_NOT_SKIP_LIST)
except:
    DO_NOT_SKIP_LIST = []

# @added 20201103 - Feature #3820: HORIZON_SHARDS
try:
    HORIZON_SHARDS = settings.HORIZON_SHARDS.copy()
except:
    HORIZON_SHARDS = {}
try:
    HORIZON_SHARD_DEBUG = settings.HORIZON_SHARD_DEBUG
except:
    HORIZON_SHARD_DEBUG = True

number_of_horizon_shards = 0
this_host = str(os.uname()[1])
HORIZON_SHARD = 0
if HORIZON_SHARDS:
    import zlib
    number_of_horizon_shards = len(HORIZON_SHARDS)
    HORIZON_SHARD = HORIZON_SHARDS[this_host]

# @added 20190130 - Task #2690: Test Skyline on Python-3.6.7
#                   Branch #3262: py3
python_version = int(version_info[0])


[docs]class Worker(Process): """ The worker processes chunks from the queue and appends the latest datapoints to their respective timesteps in Redis. """ # @modified 20201017 - Feature #3788: snab_flux_load_test # Feature #3680: horizon.worker.datapoints_sent_to_redis # Added worker_number def __init__(self, queue, parent_pid, skip_mini, worker_number, canary=False): super(Worker, self).__init__() # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: # @modified 20191014 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH, charset='utf-8', decode_responses=True) else: # self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH, charset='utf-8', decode_responses=True) self.q = queue self.parent_pid = parent_pid self.daemon = True self.canary = canary self.skip_mini = skip_mini # @added 20201017 - Feature #3788: snab_flux_load_test # Feature #3680: horizon.worker.datapoints_sent_to_redis # Added worker_number self.worker_number = worker_number # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list # Added get_redis_conn_decoded self.redis_conn_decoded = get_redis_conn_decoded(parent_skyline_app)
[docs] def check_if_parent_is_alive(self): """ Self explanatory. """ try: kill(self.parent_pid, 0) except: # @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics # Log warning logger.warning('warning :: parent process is dead') sys_exit(0)
[docs] def in_skip_list(self, metric_name): """ Check if the metric is in SKIP_LIST. # @added 20170319 - Feature #1978: worker - DO_NOT_SKIP_LIST The SKIP_LIST allows for a string match or a match on dotted elements within the metric namespace. """ # @modified 20170319 - Feature #1978: worker - DO_NOT_SKIP_LIST # Allow for dotted element matches and DO_NOT_SKIP_LIST # for to_skip in settings.SKIP_LIST: # if to_skip in metric_name: # return True # return False # @added 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings if python_version == 3: str_metric_name = str(metric_name) metric_name = str_metric_name metric_namespace_elements = metric_name.split('.') process_metric = True for to_skip in settings.SKIP_LIST: if to_skip in metric_name: process_metric = False break to_skip_namespace_elements = to_skip.split('.') elements_matched = set(metric_namespace_elements) & set(to_skip_namespace_elements) if len(elements_matched) == len(to_skip_namespace_elements): process_metric = False break if not process_metric: for do_not_skip in DO_NOT_SKIP_LIST: if do_not_skip in metric_name: process_metric = True break do_not_skip_namespace_elements = do_not_skip.split('.') elements_matched = set(metric_namespace_elements) & set(do_not_skip_namespace_elements) if len(elements_matched) == len(do_not_skip_namespace_elements): process_metric = True break if not process_metric: # skip return True return False
# @added 20201103 - Feature #3820: HORIZON_SHARDS
[docs] def in_shard(self, metric_name): """ Check if the metric belongs to the Horizon instance shard. """ if not HORIZON_SHARDS: return True metric_as_bytes = str(metric_name).encode() value = zlib.adler32(metric_as_bytes) modulo_result = value % number_of_horizon_shards if modulo_result == HORIZON_SHARD: return True else: return False return True
[docs] def run(self): """ Called when the process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log # @modified 20201017 - Feature #3788: snab_flux_load_test # Feature #3680: horizon.worker.datapoints_sent_to_redis # Only do log management with the canary worker if self.canary: if os.path.isfile(skyline_app_logwait): try: os_remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s worker number %s run' % (skyline_app, self.worker_number)) # @modified 20201017 - Feature #3788: snab_flux_load_test # Feature #3680: horizon.worker.datapoints_sent_to_redis if self.canary: if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os_remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) else: logger.info('bin/%s.d log management done' % skyline_app) logger.info('%s :: started worker %s' % (skyline_app, str(self.worker_number))) # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list skip_metrics_dict = {} do_not_skip_metrics_dict = {} skip_metrics_list = [] do_not_skip_metrics_list = [] checked_for_skip_count = 0 checked_through_skip_list = [] # @added 20201103 - Feature #3820: HORIZON_SHARDS if HORIZON_SHARDS: logger.info('%s :: HORIZON_SHARDS declared, this horizon instance is assigned shard %s' % (skyline_app, str(HORIZON_SHARD))) FULL_NAMESPACE = settings.FULL_NAMESPACE MINI_NAMESPACE = settings.MINI_NAMESPACE MAX_RESOLUTION = settings.MAX_RESOLUTION full_uniques = '%sunique_metrics' % FULL_NAMESPACE mini_uniques = '%sunique_metrics' % MINI_NAMESPACE pipe = self.redis_conn.pipeline() last_send_to_graphite = time() queue_sizes = [] # @added 20210416 - Task #4020: Add horizon.queue_size_60s_avg metric last_log_queue_size = time() queue_sizes_60_seconds = [] # @added 20200815 - Feature #3680: horizon.worker.datapoints_sent_to_redis datapoints_sent_to_redis = 0 last_datapoints_to_redis = int(time()) # @added 20201103 - Feature #3820: HORIZON_SHARDS if HORIZON_SHARDS: horizon_shard_assigned_metrics = [] horizon_shard_dropped_metrics = [] # @added 20201120 - Feature #3820: HORIZON_SHARDS metrics_received = [] # python-2.x and python3.x handle while 1 and while True differently # while 1: running = True while running: # Make sure Redis is up try: self.redis_conn.ping() except: logger.error('%s :: cannot connect to Redis at socket path %s' % (skyline_app, settings.REDIS_SOCKET_PATH)) sleep(10) # @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow if settings.REDIS_PASSWORD: # @modified 20191014 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH, charset='utf-8', decode_responses=True) else: # self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH, charset='utf-8', decode_responses=True) pipe = self.redis_conn.pipeline() continue # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list if not skip_metrics_dict: try: skip_metrics_dict = self.redis_conn_decoded.hgetall('horizon.skip_metrics') logger.info('%s :: got %s metrics from horizon.skip_metrics from Redis hash as not set' % ( skyline_app, str(len(skip_metrics_dict)))) except Exception as err: logger.error('%s :: error on hgetall horizon.skip_metrics: %s' % (skyline_app, str(err))) if not do_not_skip_metrics_dict: try: do_not_skip_metrics_dict = self.redis_conn_decoded.hgetall('horizon.do_not_skip_metrics') logger.info('%s :: got %s metrics from horizon.do_not_skip_metrics from Redis hash as not set' % ( skyline_app, str(len(do_not_skip_metrics_dict)))) except Exception as err: logger.error('%s :: error on hgetall horizon.do_not_skip_metrics: %s' % (skyline_app, str(err))) try: # Get a chunk from the queue with a 15 second timeout chunk = self.q.get(True, 15) # @modified 20170317 - Feature #1978: worker - DO_NOT_SKIP_LIST # now = time() now = int(time()) for metric in chunk: # @added 20201120 - Feature #3820: HORIZON_SHARDS try: metrics_received.append(metric[0]) except: pass # @added 20201103 - Feature #3820: HORIZON_SHARDS # If a metric does not map to the HORIZON_SHARD, drop it # and continue if HORIZON_SHARDS and number_of_horizon_shards: if not self.in_shard(metric[0]): horizon_shard_dropped_metrics.append(metric[0]) continue horizon_shard_assigned_metrics.append(metric[0]) # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list found_in_do_not_skip_dict = False if do_not_skip_metrics_dict: try: found_in_do_not_skip_dict = do_not_skip_metrics_dict[metric[0]] except KeyError: found_in_do_not_skip_dict = False if found_in_do_not_skip_dict: do_not_skip_metrics_list.append(metric[0]) found_in_skip_dict = False if not found_in_do_not_skip_dict and skip_metrics_dict: try: found_in_skip_dict = skip_metrics_dict[metric[0]] except KeyError: found_in_skip_dict = False if found_in_skip_dict: skip_metrics_list.append(metric[0]) continue # @modified 20220216 - Feature #4446: Optimise horizon worker in_skip_list if not found_in_do_not_skip_dict: checked_for_skip_count += 1 checked_through_skip_list.append(metric[0]) # Check if we should skip it if self.in_skip_list(metric[0]): # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list skip_metrics_list.append(metric[0]) continue # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list if not found_in_do_not_skip_dict: do_not_skip_metrics_list.append(metric[0]) # Bad data coming in # @modified 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings # if metric[1][0] < now - MAX_RESOLUTION: # continue try: if int(metric[1][0]) < now - MAX_RESOLUTION: continue except: pass # Append to messagepack main namespace # @modified 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings # key = ''.join((FULL_NAMESPACE, metric[0])) key = ''.join((FULL_NAMESPACE, str(metric[0]))) if LOCAL_DEBUG: logger.info('debug :: worker :: adding metric Redis key - %s' % str(key)) # @modified 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings # pipe.append(key, packb(metric[1])) # pipe.sadd(full_uniques, key) try: pipe.append(str(key), packb(metric[1])) # @added 20200815 - Feature #3680: horizon.worker.datapoints_sent_to_redis datapoints_sent_to_redis += 1 except Exception as err: logger.error('error :: worker :: error on pipe.append to %s: %s' % ( str(key), str(err))) try: # pipe.sadd(full_uniques, key) pipe.sadd(full_uniques, str(key)) except Exception as e: logger.error('%s :: error on pipe.sadd: %s' % (skyline_app, str(e))) if not self.skip_mini: # Append to mini namespace # @modified 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings # mini_key = ''.join((MINI_NAMESPACE, metric[0])) mini_key = ''.join((MINI_NAMESPACE, str(metric[0]))) pipe.append(mini_key, packb(metric[1])) pipe.sadd(mini_uniques, mini_key) # @modified 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings # pipe.execute() try: pipe.execute() except Exception as e: logger.error('%s :: error on pipe.execute: %s' % (skyline_app, str(e))) except Empty: logger.info('%s :: worker queue is empty and timed out' % skyline_app) except WatchError: logger.error('%s :: WatchError - %s' % (skyline_app, str(key))) except NotImplementedError: pass except Exception as e: # @added 20190130 - Task #2690: Test Skyline on Python-3.6.7 # Branch #3262: py3 # Bug #3266: py3 Redis binary objects not strings # Added traceback logger.error(traceback.format_exc()) logger.error('%s :: error: %s' % (skyline_app, str(e))) # Log progress if self.canary: # @modified 20210511 # Do not logg queue size for every chunk # logger.info('%s :: queue size at %d' % (skyline_app, self.q.qsize())) queue_sizes.append(self.q.qsize()) # Only send average queue mertics to graphite once per 10 seconds now = time() # @modified 20210416 - Task #4020: Add horizon.queue_size_60s_avg metric # last_sent_graphite = now - last_send_to_graphite # if last_sent_graphite > 10: last_logged_queue_size = now - last_log_queue_size if last_logged_queue_size > 10: number_queue_sizes = len(queue_sizes) total_of_queue_sizes = sum(queue_sizes) if total_of_queue_sizes > 0: average_queue_size = total_of_queue_sizes / number_queue_sizes else: average_queue_size = 0 logger.info('%s :: total queue size for the last 10 seconds - %s' % (skyline_app, str(total_of_queue_sizes))) logger.info('%s :: total queue values known for the last 10 seconds - %s' % (skyline_app, str(number_queue_sizes))) logger.info('%s :: average queue size for the last 10 seconds - %s' % (skyline_app, str(average_queue_size))) # self.send_graphite_metric('skyline.horizon.' + SERVER_METRIC_PATH + 'queue_size', self.q.qsize()) # self.send_graphite_metric('queue_size', average_queue_size) send_metric_name = '%s.queue_size' % skyline_app_graphite_namespace send_graphite_metric(skyline_app, send_metric_name, average_queue_size) # @added 20210416 - Task #4020: Add horizon.queue_size_60s_avg metric for size_value in queue_sizes: queue_sizes_60_seconds.append(size_value) # reset queue_sizes queue_sizes = [] # last_send_to_graphite = time() last_log_queue_size = time() # @added 20210416 - Task #4020: Add horizon.queue_size_60s_avg metric # Moved from the 10 second block above to its own 60 seconds block last_sent_graphite = now - last_send_to_graphite if last_sent_graphite >= 60: try: number_queue_sizes = len(queue_sizes_60_seconds) total_of_queue_sizes = sum(queue_sizes_60_seconds) if total_of_queue_sizes > 0: average_queue_size = total_of_queue_sizes / number_queue_sizes else: average_queue_size = 0 logger.info('%s :: total queue size for the last 60 seconds - %s' % (skyline_app, str(total_of_queue_sizes))) logger.info('%s :: total queue values known for the last 60 seconds - %s' % (skyline_app, str(number_queue_sizes))) logger.info('%s :: average queue size for the last 60 seconds - %s' % (skyline_app, str(average_queue_size))) send_metric_name = '%s.queue_size_60s_avg' % skyline_app_graphite_namespace send_graphite_metric(skyline_app, send_metric_name, average_queue_size) # reset queue_sizes_60_seconds and last_sent_graphite queue_sizes_60_seconds = [] last_send_to_graphite = time() except Exception as e: logger.error(traceback.format_exc()) logger.error('%s :: error: %s' % (skyline_app, str(e))) # @added 20210520 - Branch #1444: thunder # Added Redis hash keys for thunder to monitor the horizon # listen state, inferred via the # horizon.worker.average_queue_size, without incurring the # addition of Redis to horizon/listen. This is used by # Thunder notify on degradation of service and operational # changes. cache_key = 'horizon.worker.average_queue_size' try: self.redis_conn.hset(cache_key, 'value', int(average_queue_size)) self.redis_conn.hset(cache_key, 'timestamp', int(now)) except Exception as e: logger.error('error :: horizon :: worker :: could not update the Redis %s key - %s' % ( cache_key, e)) # @added 20200815 - Feature #3680: horizon.worker.datapoints_sent_to_redis # @modified 20201017 - Feature #3788: snab_flux_load_test # Feature #3680: horizon.worker.datapoints_sent_to_redis # Send for each worker last_datapoints_count_to_redis = now - last_datapoints_to_redis if last_datapoints_count_to_redis >= 60: logger.info('%s :: datapoints_sent_to_redis in last 60 seconds - %s' % (skyline_app, str(datapoints_sent_to_redis))) if self.canary: send_metric_name = '%s.datapoints_sent_to_redis' % ( skyline_app_graphite_namespace) else: send_metric_name = '%s.datapoints_sent_to_redis_%s' % ( skyline_app_graphite_namespace, str(self.worker_number)) send_graphite_metric(skyline_app, send_metric_name, datapoints_sent_to_redis) datapoints_sent_to_redis = 0 last_datapoints_to_redis = int(time()) # @added 20201120 - Feature #3820: HORIZON_SHARDS if HORIZON_SHARDS: try: self.redis_conn.sadd('horizon.shards.metrics_assigned', *set(horizon_shard_assigned_metrics)) except Exception as e: logger.error(traceback.format_exc()) logger.error('%s :: error adding horizon.shard.metrics_assigned: %s' % (skyline_app, str(e))) try: self.redis_conn.sadd('horizon.shard.metrics_dropped', *set(horizon_shard_dropped_metrics)) except Exception as e: logger.error(traceback.format_exc()) logger.error('%s :: error adding horizon.shard.metrics_dropped: %s' % (skyline_app, str(e))) # @added 20201103 - Feature #3820: HORIZON_SHARDS if HORIZON_SHARDS and HORIZON_SHARD_DEBUG: horizon_shard_assigned_metrics_count = len(horizon_shard_assigned_metrics) horizon_shard_dropped_metrics_count = len(horizon_shard_dropped_metrics) logger.info('%s :: %s assigned metrics to HORIZON_SHARD %s and dropped %s metrics in last 60 seconds' % ( skyline_app, str(horizon_shard_assigned_metrics_count), str(HORIZON_SHARD), str(horizon_shard_dropped_metrics_count))) if LOCAL_DEBUG: logger.info('%s :: horizon_shard_assigned_metrics - %s' % ( skyline_app, str(horizon_shard_assigned_metrics))) logger.info('%s :: horizon_shard_dropped_metrics - %s' % ( skyline_app, str(horizon_shard_dropped_metrics))) horizon_shard_assigned_metrics = [] horizon_shard_dropped_metrics = [] # @added 20201120 - Feature #3820: HORIZON_SHARDS if metrics_received: try: self.redis_conn.sadd('horizon.metrics_received', *set(metrics_received)) except: logger.error(traceback.format_exc()) logger.error('%s :: error adding horizon.metrics_received' % (skyline_app)) if self.canary: try: logger.info('%s :: renaming key horizon.metrics_received to aet.horizon.metrics_received' % skyline_app) self.redis_conn.rename('horizon.metrics_received', 'aet.horizon.metrics_received') except Exception as e: logger.info(traceback.format_exc()) logger.error('error :: %s failed to rename Redis key horizon.metrics_received to aet.horizon.metrics_received - %s' % ( skyline_app, e)) metrics_received_count = 0 try: metrics_received_count = self.redis_conn.scard('aet.horizon.metrics_received') except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: %s :: running redis_conn.scard(\'aet.horizon.metrics_received\') - %s' % ( skyline_app, e)) cache_key = 'thunder.horizon.worker.metrics_received' try: self.redis_conn.hset(cache_key, 'value', int(metrics_received_count)) self.redis_conn.hset(cache_key, 'timestamp', int(now)) except Exception as e: logger.error('error :: horizon :: worker :: could not update the Redis %s key - %s' % ( cache_key, e)) send_metric_name = '%s.metrics_received' % ( skyline_app_graphite_namespace) send_graphite_metric(skyline_app, send_metric_name, metrics_received_count) # @added 20210520 - Branch #1444: thunder # Added Redis hash keys for thunder to monitor the horizon # listen state, inferred via the # horizon.worker.average_queue_size, without incurring the # addition of Redis to horizon/listen. This is used by # Thunder notify on degradation of service and operational # changes. cache_key = 'horizon' try: self.redis_conn.setex(cache_key, 120, int(now)) except Exception as e: logger.error('error :: horizon :: worker :: could not update the Redis %s key - %s' % ( cache_key, e)) metrics_received = [] # @added 20220216 - Feature #4446: Optimise horizon worker in_skip_list if skip_metrics_list: update_skip_metrics = {} for metric in list(set(skip_metrics_list)): update_entry = False if skip_metrics_dict: try: # Only update skip metrics every hour to save on # Redis overhead last_update_timestamp_str = skip_metrics_dict[metric] if last_update_timestamp_str: last_update_timestamp = int(float(last_update_timestamp_str)) if (now - last_update_timestamp) >= 3600: update_entry = True except KeyError: update_entry = True except: update_entry = True else: update_entry = True if update_entry: update_skip_metrics[metric] = int(now) if update_skip_metrics: try: self.redis_conn_decoded.hset('horizon.skip_metrics', mapping=update_skip_metrics) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: listen :: failed to update horizon.skip_metrics Redis hash - %s' % ( err)) if do_not_skip_metrics_list: update_metrics = {} for metric in list(set(do_not_skip_metrics_list)): update_entry = False if do_not_skip_metrics_dict: try: # Only update skip metrics every hour to save on # Redis overhead last_update_timestamp_str = skip_metrics_dict[metric] if last_update_timestamp_str: last_update_timestamp = int(float(last_update_timestamp_str)) if (now - last_update_timestamp) >= 3600: update_entry = True except KeyError: update_entry = True except: update_entry = True else: update_entry = True if update_entry: update_metrics[metric] = int(now) if update_metrics: try: self.redis_conn_decoded.hset('horizon.do_not_skip_metrics', mapping=update_metrics) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: listen :: failed to update horizon.do_not_skip_metrics Redis hash - %s' % ( err)) try: skip_metrics_dict = self.redis_conn_decoded.hgetall('horizon.skip_metrics') logger.info('%s :: got %s metrics from horizon.skip_metrics from Redis hash on minutely run' % ( skyline_app, str(len(skip_metrics_dict)))) except Exception as err: logger.error('%s :: error on hgetall horizon.skip_metrics: %s' % (skyline_app, str(err))) try: do_not_skip_metrics_dict = self.redis_conn_decoded.hgetall('horizon.do_not_skip_metrics') logger.info('%s :: got %s metrics from horizon.do_not_skip_metrics from Redis hash on minutely run' % ( skyline_app, str(len(do_not_skip_metrics_dict)))) except Exception as err: logger.error('%s :: error on hgetall horizon.do_not_skip_metrics: %s' % (skyline_app, str(err))) logger.info('%s :: checked %s metrics through in_skip_list' % ( skyline_app, str(checked_for_skip_count))) if checked_through_skip_list: try: self.redis_conn_decoded.sadd('horizon.checked_through_skip_list', *set(checked_through_skip_list)) self.redis_conn_decoded.expire('horizon.checked_through_skip_list', 86400) logger.info('%s :: updated horizon.checked_through_skip_list Redis set on minutely run' % ( skyline_app)) except Exception as err: logger.error('%s :: sadd failed on horizon.checked_through_skip_list - %s' % (skyline_app, str(err))) checked_through_skip_list = [] skip_metrics_list = [] do_not_skip_metrics_list = []