Source code for vista.fetcher

from __future__ import division
import logging
from time import time, sleep
from datetime import datetime, timedelta
from threading import Thread
from multiprocessing import Process
import os
# @modified 20191115 - Branch #3262: py3
# from os import kill, getpid
from os import kill
import traceback
import re
from sys import version_info
import os.path
from ast import literal_eval
try:
    from urllib.parse import quote
except:
    from urllib import quote

# @modified 20191115 - Branch #3262: py3
# from redis import StrictRedis
import requests

import settings
python_version = int(version_info[0])

if True:
    from skyline_functions import (
        # @modified 20220726 - Task #2732: Prometheus to Skyline
        #                      Branch #4300: prometheus
        # Moved send_graphite_metric
        # send_graphite_metric, filesafe_metricname,
        filesafe_metricname,
        # @added 20191111 - Bug #3266: py3 Redis binary objects not strings
        #                   Branch #3262: py3
        get_redis_conn, get_redis_conn_decoded,
        # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url
        #                   Bug #3778: Handle single encoded forward slash requests to Graphite
        sanitise_graphite_url)
    # @added 20220429 - Feature #4536: Handle Redis failure
    from functions.flux.get_last_metric_data import get_last_metric_data
    # @added 20220726 - Task #2732: Prometheus to Skyline
    #                   Branch #4300: prometheus
    from functions.graphite.send_graphite_metric import send_graphite_metric

parent_skyline_app = 'vista'
child_skyline_app = 'fetcher'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s.fetcher' % (
    parent_skyline_app, SERVER_METRIC_PATH)

python_version = int(version_info[0])

this_host = str(os.uname()[1])

try:
    VISTA_ENABLED = settings.VISTA_ENABLED
except:
    VISTA_ENABLED = False

# @added 20210512 - Feature #4064: VERBOSE_LOGGING
try:
    VERBOSE_LOGGING = settings.VISTA_VERBOSE_LOGGING
except:
    VERBOSE_LOGGING = False

USE_FLUX = False
LOCAL_DEBUG = False


[docs]class Fetcher(Thread): """ The fetcher thread asynchronisly retrieves the latest data points for metrics from multiple endpoints using asyncio and aiohttp and submits the data to the Redis set, vista.fetcher.metrics.json for the worker to process. """ def __init__(self, parent_pid): super(Fetcher, self).__init__() self.parent_pid = parent_pid self.daemon = True # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # if settings.REDIS_PASSWORD: # self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) # else: # self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) # @added 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # Added a single functions to deal with Redis connection and the # charset='utf-8', decode_responses=True arguments required in py3 self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self): """ Self explanatory. """ try: kill(self.parent_pid, 0) except: # @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics # Log warning logger.warning('warning :: parent process is dead') exit(0)
[docs] def fetch_process(self, i, metrics_to_fetch): fetch_process_start = time() logger.info('fetcher :: fetch_process started') metrics_to_fetch_count = len(metrics_to_fetch) logger.info('fetcher :: fetch_process to fetch %s metrics' % str(metrics_to_fetch_count)) if LOCAL_DEBUG: logger.info('fetcher :: metrics_to_fetch - %s' % str(metrics_to_fetch)) # @added 20191127 - Feature #3338: Vista - batch Graphite requests def param_value_from_url(url, param_name): param_value = None for i in url.split('?', 1)[-1].split('&'): if i.startswith(param_name + '='): param_value = i.split('=')[-1] return param_value # @added 20220111 - Feature #4370: Manage vista.fetcher.metrics.json Redis set logger.info('fetcher :: managing old entries in Redis set vista.fetcher.metrics.json') vista_list = [] try: vista_list = list(self.redis_conn_decoded.smembers('vista.fetcher.metrics.json')) except Exception as err: logger.error('error :: fetcher :: failed to get Redis set vista.fetcher.metrics.json to manage - %s' % ( err)) vista_list = [] vista_list_manage_error = None vista_list_manage_removed = 0 for item in vista_list: last_metric_ts = 0 try: data_list = literal_eval(item) last_metric_ts = int(literal_eval(literal_eval(data_list[0]['datapoints']))[-1][0]) except Exception as err: vista_list_manage_error = 'error :: fetcher :: failed to determine timestamp from vista.fetcher.metrics.json data - %s - %s' % ( str(item), err) if last_metric_ts: if last_metric_ts < (int(fetch_process_start) - 86400): try: self.redis_conn_decoded.srem('vista.fetcher.metrics.json', item) vista_list_manage_removed += 1 except Exception as err: vista_list_manage_error = 'error :: fetcher :: failed to remove data from vista.fetcher.metrics.json data - %s - %s' % ( str(item), err) logger.info('fetcher :: manage Redis set vista.fetcher.metrics.json had %s entries, removed %s entries' % ( str(len(vista_list)), str(vista_list_manage_removed))) if vista_list_manage_error: logger.error('error :: fetcher :: manage Redis set vista.fetcher.metrics.json had errors') logger.error('%s' % vista_list_manage_error) # @added 20191127 - Feature #3338: Vista - batch Graphite requests # Fetch metrics from the same Graphite host that have the same from # parameter in batches try: graphite_batch_target_count = settings.VISTA_GRAPHITE_BATCH_SIZE except: graphite_batch_target_count = 20 graphite_batches = [] # [batch_no, remote_host, from_timestamp, url] in_batch_responses = [] batch_number = 0 for remote_host_type, frequency, remote_target, graphite_target, metric, url, namespace_prefix, api_key, token, user, password in metrics_to_fetch: if remote_host_type != 'graphite': continue try: url_elements = url.split('/') remote_host = url_elements[2] except Exception as e: logger.error('error :: fetcher :: failed to determine the remote_host from the url - %s - %s' % ( str(url), e)) from_timestamp = None try: from_timestamp_str = param_value_from_url(url, 'from') try: from_timestamp = int(from_timestamp_str) except: from_timestamp = None except Exception as e: logger.error('error :: fetcher :: failed to determine the timestamp from the from url parameter - %s - %s' % ( str(url), e)) if not from_timestamp: continue target = None try: target = param_value_from_url(url, 'target') except Exception as e: logger.error('error :: fetcher :: failed to determine the metric from the target url parameter - %s - %s' % ( str(url), e)) added_to_batch = False add_to_batch = False for batch_number, batch_remote_host, batch_from_timestamp, batch_url in graphite_batches: if added_to_batch: continue try: if remote_host == batch_remote_host: if str(from_timestamp) == str(batch_from_timestamp): try: if batch_url.count('target') < graphite_batch_target_count: add_to_batch = int(batch_number) except Exception as e: logger.error('error :: fetcher :: failed to determine whether to add to batch - %s' % e) continue except: logger.error('error :: fetcher :: failed to determine whether to add to batch') if not add_to_batch: batch_number += 1 batch_url = url batch_data = [batch_number, remote_host, from_timestamp_str, url] graphite_batches.append(batch_data) added_to_batch = batch_number if add_to_batch: new_graphite_batches = [] for batch_number, batch_remote_host, batch_from_timestamp, batch_url in graphite_batches: if batch_number == add_to_batch: new_end = '&target=%s&format=json' % remote_target new_url = batch_url.replace('&format=json', new_end) batch_url = new_url added_to_batch = batch_number batch_data = [batch_number, batch_remote_host, batch_from_timestamp, batch_url] new_graphite_batches.append(batch_data) graphite_batches = new_graphite_batches if added_to_batch: in_batch_responses.append(target) batch_responses = [] start_batch_fetches = int(time()) for batch_number, remote_host, from_timestamp_str, url in graphite_batches: # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url # Bug #3778: Handle single encoded forward slash requests to Graphite sanitised = False sanitised, url = sanitise_graphite_url(skyline_app, url) try: batch_response = requests.get(url) batch_js = batch_response.json() batch_responses.append(batch_js) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to get valid response for batch request %s - %s' % ( str(url), e)) if batch_responses: end_batch_fetches = int(time()) time_to_fetch_batches = end_batch_fetches - start_batch_fetches logger.info('fetcher :: %s metric batch requests of %s metrics per batch were fetched in %s seconds' % ( str(len(batch_responses)), str(graphite_batch_target_count), str(time_to_fetch_batches))) if in_batch_responses: logger.info('fetcher :: %s metrics were fetched in batch requests' % str(len(in_batch_responses))) for remote_host_type, frequency, remote_target, graphite_target, metric, url, namespace_prefix, api_key, token, user, password in metrics_to_fetch: success = False # @added 20191127 - Feature #3338: Vista - batch Graphite requests # Get the metric timeseries from the batch responses and if it is # not found no js variable will be set and the metric will be # requested individually as per the default behaviour js = None batched_response = False if remote_target in in_batch_responses: try: for responses in batch_responses: for i in responses: if str(i['target']) == remote_target: js = i batched_response = True success = True if VERBOSE_LOGGING: logger.info('fetcher :: data for %s was fetched in a batch' % str(remote_target)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to detemine if %s was in batch_responses - %s' % ( str(remote_target), e)) # @added 20201009 - Feature #3780: skyline_functions - sanitise_graphite_url # Bug #3778: Handle single encoded forward slash requests to Graphite sanitised = False sanitised, url = sanitise_graphite_url(skyline_app, url) # @modified 20191127 - Feature #3338: Vista - batch Graphite requests # Wrapped in if not success response = None if not success: try: # @modified 20191011 - Task #3258: Reduce vista logging if LOCAL_DEBUG: logger.info('fetcher :: getting data from %s' % str(url)) response = requests.get(url) if response.status_code == 200: success = True except Exception as err: # @modified 2023024 - Task #4824: vista - warn on remote_error remote_error = False if 'Failed to establish a new connection' is str(err): remote_error = True if 'Errno 111' in str(err): remote_error = True if 'Connection refused' in str(err): remote_error = True if 'Connection aborted' in str(err): remote_error = True if 'RemoteDisconnected' in str(err): remote_error = True if 'Remote end closed connection without response' in str(err): remote_error = True if remote_error: logger.warning('warning :: fetcher :: failed to get data from %s - %s' % ( str(url), err)) else: logger.error(traceback.format_exc()) # @modified 20191115 - Branch #3262: py3 # Do not report last response data # logger.error('error :: fetcher :: http status code - %s, reason - %s' % ( # str(response.status_code), str(response.reason))) logger.error('error :: fetcher :: failed to get data from %s - %s' % ( str(url), err)) if not success: continue # @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints # And set flux.last key is the returned value from the remote is # null so that time series that are mostly null do not keep on # getting added to flux populate_metric by Vista raw_timeseries = [] datapoints = None try: # @modified 20191127 - Feature #3338: Vista - batch Graphite requests # js = response.json() if not js: js = response.json() else: if VERBOSE_LOGGING: logger.info('fetcher :: data for %s was fetched in a batch' % str(remote_target)) if remote_host_type == 'graphite': # @modified 20191127 - Feature #3338: Vista - batch Graphite requests # datapoints = js[0]['datapoints'] if not batched_response: datapoints = js[0]['datapoints'] else: datapoints = js['datapoints'] if remote_host_type == 'prometheus': # TODO: # Maybe iterate through multiple metrics if response has more than one metric # for some public lab metrics datapoints = js['data']['result'][0]['values'] datapoints_fetched = len(datapoints) # @modified 20191011 - Task #3258: Reduce vista logging if LOCAL_DEBUG: logger.info('fetcher :: retrieved %s data points from %s' % ( str(datapoints_fetched), str(url))) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to get data from %s - %s' % ( str(url), e)) # Example # datapoints[0] # [7.3, 1556817000] # Add each data point and timestamp to the timeseries list so # they can be sent to Graphite if not datapoints: logger.info('fetcher :: failed to get any data from %s' % str(url)) continue # @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints valid_datapoints = [] for datapoint in datapoints: value = None timestamp = None if remote_host_type == 'graphite': try: # @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints raw_timeseries.append([datapoint[1], datapoint[0]]) raw_value = datapoint[0] if raw_value is None: continue value = float(datapoint[0]) timestamp = int(datapoint[1]) valid_datapoints.append([value, timestamp]) except: continue if remote_host_type == 'prometheus': try: # @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints raw_timeseries.append([datapoint[0], datapoint[1]]) raw_value = datapoint[1] if raw_value is None: continue timestamp = int(datapoint[0]) value = float(datapoint[1]) except: continue valid_datapoints.append([timestamp, value]) datapoints = valid_datapoints # Order the time series by timestamp as the tuple can shift # order resulting in more recent data being added before older # data datapoints.sort() # However check if a metric is known to Flux and if so do not # use all resolutions just from the last.flux known timestamp # for he metric last_flux_timestamp = None redis_last_flux_metric_data = None cache_key = 'flux.last.%s' % metric # @added 20220429 - Feature #4536: Handle Redis failure # Swap to using a Redis hash instead of the # flux.last.<metric> keys use_old_timestamp_keys = True redis_last_metric_data_dict = {} try: redis_last_metric_data_dict = get_last_metric_data(skyline_app, metric) except Exception as err: logger.error('error :: populate_metric_worker :: get_last_metric_data failed - %s' % ( err)) if redis_last_metric_data_dict: try: last_flux_timestamp = redis_last_metric_data_dict['timestamp'] use_old_timestamp_keys = False except KeyError: last_flux_timestamp = None except Exception as err: logger.error('error :: populate_metric_worker :: failed to get timestamp from - %s - %s' % ( str(redis_last_metric_data_dict), err)) last_flux_timestamp = None # @modified 20220429 - Feature #4536: Handle Redis failure # Swap to using a Redis hash instead of the # flux.last.<metric> keys if use_old_timestamp_keys: try: # if python_version == 3: # redis_last_flux_metric_data = self.redis_conn.get(cache_key).decode('utf-8') # else: # redis_last_flux_metric_data = self.redis_conn.get(cache_key) redis_last_flux_metric_data = self.redis_conn_decoded.get(cache_key) if LOCAL_DEBUG: if redis_last_flux_metric_data: logger.info('fetcher :: Redis key %s is present' % str(cache_key)) else: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) except AttributeError: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) last_flux_timestamp = False redis_last_flux_metric_data = False except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: retrieving Redis key %s data - %s' % ( str(cache_key), str(e))) redis_last_flux_metric_data = False if redis_last_flux_metric_data: try: last_flux_metric_data = literal_eval(redis_last_flux_metric_data) last_flux_timestamp = int(last_flux_metric_data[0]) if LOCAL_DEBUG: if last_flux_timestamp: logger.info('fetcher :: Redis key %s last_flux_timestamp %s' % (str(cache_key), str(last_flux_timestamp))) else: logger.info('fetcher :: Redis key %s last_flux_timestamp unknown' % (str(cache_key))) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetch :: failed determining last_flux_timestamp - %s' % e) last_flux_timestamp = False value = None timestamp = None datapoints_added_to_timeseries = 0 datapoints_already_populated = 0 datapoints_with_no_value = 0 timeseries = [] for datapoint in datapoints: try: if remote_host_type == 'graphite': raw_value = datapoint[0] if raw_value is None: continue value = float(datapoint[0]) timestamp = int(datapoint[1]) if remote_host_type == 'prometheus': timestamp = int(datapoint[0]) # value = float(datapoint[1]) try: value = float(datapoint[1]) except: continue submit_data = True if last_flux_timestamp: if timestamp <= last_flux_timestamp: submit_data = False datapoints_already_populated += 1 if submit_data: new_datapoint = [timestamp, value] timeseries.append(new_datapoint) datapoints_added_to_timeseries += 1 # nosec to exclude from bandit tests except: # nosec datapoints_with_no_value += 1 continue # @added 20191111 - Bug #3312: flux - populate_metric_worker - handle None in datapoints # And set flux.last key is the returned value from the remote is # null so that time series that are mostly null do not keep on # getting added to flux populate_metric by Vista if not timeseries: set_flux_key = False last_ts = None try: sorted_raw_timeseries = sorted(raw_timeseries, key=lambda x: x[0]) last_ts = sorted_raw_timeseries[-1][0] if int(last_ts) > (fetch_process_start - 120): if sorted_raw_timeseries[-1][1] is None: set_flux_key = True except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to determine if last value was null - %s' % e) if set_flux_key: cache_key = 'flux.last.%s' % metric try: # Update Redis flux key metric_data = [int(last_ts), None] self.redis_conn.set(cache_key, str(metric_data)) if VERBOSE_LOGGING: logger.info('fetcher :: even though no data points so as to not loop round on this metric, set the metric Redis key - %s - %s' % ( cache_key, str(metric_data))) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: even though no data points, failed to set Redis key - %s - %s' % ( cache_key, e)) # @added 20220429 - Feature #4536: Handle Redis failure # Swap to using a Redis hash instead of the # flux.last.<metric> keys metric_data_dict = {'timestamp': int(last_ts), 'value': None} try: self.redis_conn.hset('flux.last.metric_data', metric, str(metric_data_dict)) except Exception as err: logger.error('error :: fetcher :: failed to set flux.last.metric_data Redis key - %s' % str(err)) # Adding to the vista.fetcher.unique_metrics Redis set redis_set = 'vista.fetcher.unique_metrics' data = str(metric) try: self.redis_conn.sadd(redis_set, data) if VERBOSE_LOGGING: logger.info('fetcher :: even though no data points, added %s to Redis set %s' % ( remote_target, redis_set)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: even though no data points, failed to add %s to Redis set %s - %s' % ( str(data), str(redis_set), e)) continue if not timeseries: if VERBOSE_LOGGING: logger.info('fetcher :: no new data in the timeseries list for the time series for %s' % metric) continue # Order the time series by timestamp as the tuple can shift # order resulting in more recent data being added before older # data timeseries.sort() timeseries_length = len(timeseries) if VERBOSE_LOGGING: logger.info('fetcher :: %s data points to add to vista.fetcher.metrics.json for %s' % ( str(timeseries_length), metric)) payload = None timeseries_str = '"%s"' % timeseries try: payload = [{ 'remote_host_type': remote_host_type, 'remote_target': remote_target, 'graphite_target': graphite_target, 'metric': metric, 'namespace_prefix': namespace_prefix, 'key': settings.FLUX_SELF_API_KEY, 'token': token, 'user': user, 'password': password, 'datapoints': timeseries_str }] except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not build the payload json - %s' % e) redis_set = 'vista.fetcher.metrics.json' data = str(payload) try: self.redis_conn.sadd(redis_set, data) if LOCAL_DEBUG: logger.info('fetcher :: added data from %s to Redis set %s' % ( str(url), redis_set)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to add %s to Redis set %s - %s' % ( str(data), str(redis_set), e)) redis_set = 'vista.fetcher.metrics.fetched' time_now = int(time()) data = [str(remote_target), time_now] try: self.redis_conn.sadd(redis_set, str(data)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: failed to add %s to Redis set %s - %s' % ( str(data), str(redis_set), e)) fetch_process_end = time() fetch_time = fetch_process_end - fetch_process_start logger.info('fetcher :: metrics fetched in %s seconds' % str(fetch_time)) return
[docs] def run(self): """ - Called when the process intializes. - Determine if Redis is up and discover the number of `unique metrics`. - Wait for the processes to finish. - Send skyline.vista metrics to `GRAPHITE_HOST` """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) # pass now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) # pass else: logger.info('bin/%s.d log management done' % skyline_app) try: SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME if SERVER_METRIC_PATH == '.': SERVER_METRIC_PATH = '' logger.info('SERVER_METRIC_PATH is set from settings.py to %s' % str(SERVER_METRIC_PATH)) except: SERVER_METRIC_PATH = '' logger.info('warning :: SERVER_METRIC_PATH is not declared in settings.py, defaults to \'\'') logger.info('skyline_app_graphite_namespace is set to %s' % str(skyline_app_graphite_namespace)) try: VISTA_ENABLED = settings.VISTA_ENABLED logger.info('VISTA_ENABLED is set to %s' % str(VISTA_ENABLED)) except: VISTA_ENABLED = True logger.info('warning :: VISTA_ENABLED is not declared in settings.py, defaults to True') try: ASYNCIO_LIMIT = settings.VISTA_ASYNCIO_FETCHER_LIMIT logger.info('fetcher :: settings.VISTA_ASYNCIO_FETCHER_LIMIT is set to %s' % str(ASYNCIO_LIMIT)) except: ASYNCIO_LIMIT = 2 logger.info('fetcher :: warning :: VISTA_ASYNCIO_FETCHER_LIMIT is not declared in settings.py, defaults to 2') running = True while running: begin_fetcher_run = int(time()) # Make sure Redis is up redis_up = False while not redis_up: try: redis_up = self.redis_conn.ping() except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(2) try: # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # if settings.REDIS_PASSWORD: # self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) # else: # self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) self.redis_conn = get_redis_conn(skyline_app) self.redis_conn_decoded = get_redis_conn_decoded(skyline_app) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) continue # Report app up try: self.redis_conn.setex(skyline_app, 120, begin_fetcher_run) logger.info('fetcher :: set Redis %s key, to report UP' % skyline_app) except: logger.error('error :: fetcher :: could not update the Redis %s key' % skyline_app) logger.error(traceback.format_exc()) # Known fetcher metrics that are known to have already been fetched, # metrics in this set are named as follows namespace_prefix.metric vista_fetcher_unique_metrics = [] redis_set = 'vista.fetcher.unique_metrics' try: # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # vista_fetcher_unique_metrics = list(self.redis_conn.smembers(redis_set)) vista_fetcher_unique_metrics = list(self.redis_conn_decoded.smembers(redis_set)) except: logger.error('error :: fetcher :: could not determine vista_fetcher_unique_metrics from the Redis set %s' % redis_set) vista_fetcher_unique_metrics = [] vista_unique_metrics = [] if vista_fetcher_unique_metrics: for metric in vista_fetcher_unique_metrics: # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # metric_str = metric.decode('utf-8') metric_str = str(metric) vista_unique_metrics.append(metric_str) # Determine metrics to fetch metrics_to_fetch = [] fetcher_sent_to_flux = 0 if LOCAL_DEBUG: try: number_of_metrics = len(settings.VISTA_FETCH_METRICS) logger.info('fetcher :: %s metrics to retrieve' % str(number_of_metrics)) except: pass end_timestamp = int(time()) start_timestamp = end_timestamp - 300 # Refer to settings.VISTA_FETCH_METRICS tuple to determine the # format of the fetch_tuple # for target, graphite_target, fetch_tuple in metrics: for remote_host, remote_host_type, frequency, remote_target, graphite_target, uri, namespace_prefix, api_key, token, user, password, populate_at_resolutions in settings.VISTA_FETCH_METRICS: try: # remote_host_type = fetch_tuple[1] valid_remote_host_type = False if remote_host_type in ['graphite', 'prometheus']: valid_remote_host_type = True if not valid_remote_host_type: logger.error('error :: invalid remote_host_type for %s in %s' % ( remote_target, str(remote_host_type))) continue if LOCAL_DEBUG: logger.info('fetcher :: processing %s remote_target %s' % ( str(remote_host_type), str(remote_target))) remote_graphite_host = None if remote_host_type == 'graphite': remote_graphite_host = remote_host url = '%s%s%s' % (remote_graphite_host, uri, str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: with url %s' % str(url)) default_prometheus_uri = False remote_prometheus_host = None if remote_host_type == 'prometheus': remote_prometheus_host = remote_host # Hardcode the Prometheus api uri # uri = str(fetch_tuple[3]) # uri = '/api/v1/query?query=%s[5m]' % str(remote_target) # url encode the Prometheus metric query to handle # labels and query chars in the URI urlencoded_remote_target = quote(remote_target) if uri == 'default': default_prometheus_uri = True uri = '/api/v1/query_range?query=%s&start=%s&end=%s&step=60s' % ( str(urlencoded_remote_target), str(start_timestamp), str(end_timestamp)) url = '%s%s' % (remote_prometheus_host, uri) if LOCAL_DEBUG: logger.info('fetcher :: with url %s' % str(url)) frequency = int(frequency) if LOCAL_DEBUG: logger.info('fetcher :: with frequency %s' % str(frequency)) if LOCAL_DEBUG: logger.info('fetcher :: with namespace_prefix %s' % str(namespace_prefix)) if namespace_prefix != '': metric = '%s.%s' % (namespace_prefix, graphite_target) metric = filesafe_metricname(metric) else: metric = graphite_target metric = filesafe_metricname(metric) if LOCAL_DEBUG: logger.info('fetcher :: with metric %s' % str(metric)) api_key = str(api_key) if LOCAL_DEBUG: # @modified 20210421 - Task #4030: refactoring # semgrep - python-logger-credential-disclosure # logger.info('fetcher :: with api_key %s' % str(api_key)) logger.info('fetcher :: with api_key (no disclosure)') token = str(token) if LOCAL_DEBUG: # @modified 20210421 - Task #4030: refactoring # semgrep - python-logger-credential-disclosure # logger.info('fetcher :: with token %s' % str(token)) logger.info('fetcher :: with token (no disclosure)') user = str(user) if LOCAL_DEBUG: logger.info('fetcher :: with user %s' % str(user)) password = str(password) if LOCAL_DEBUG: # @modified 20210421 - Task #4030: refactoring # semgrep - python-logger-credential-disclosure # logger.info('fetcher :: with password %s' % str(password)) logger.info('fetcher :: with password (no disclosure)') populate_at_resolutions_str = str(populate_at_resolutions) if LOCAL_DEBUG: logger.info('fetcher :: with populate_at_resolutions %s' % populate_at_resolutions_str) # Handle if the user passes (None) instead of None if populate_at_resolutions == (): populate_at_resolutions = None except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine the required values in VISTA_FETCH_METRICS tuple for - %s - %s' % ( str(remote_target), e)) continue # If the metric is not known to Vista and the metric # has a populate_at_resolutions set, send to Flux to # pre-populate Graphite pre_populate_graphite_metric = False if remote_target not in vista_unique_metrics: if remote_host_type == 'graphite' and populate_at_resolutions: pre_populate_graphite_metric = True logger.info('fetcher :: attempting to pre-populate Graphite metric - %s' % ( metric)) if remote_host_type == 'prometheus' and populate_at_resolutions: pre_populate_graphite_metric = True logger.info('fetcher :: attempting to pre-populate Prometheus metric - %s' % ( metric)) else: if LOCAL_DEBUG: logger.info('fetcher :: remote_target %s is present in vista_unique_metrics' % str(remote_target)) # However check if a metric is known to Flux and if so do not # use all resolutions just from the last.flux known timestamp # for he metric last_flux_timestamp = None redis_last_flux_metric_data = None # @added 20220429 - Feature #4536: Handle Redis failure # Swap to using a Redis hash instead of the # flux.last.<metric> keys use_old_timestamp_keys = True redis_last_metric_data_dict = {} try: redis_last_metric_data_dict = get_last_metric_data(skyline_app, metric) except Exception as err: logger.error('error :: populate_metric_worker :: get_last_metric_data failed - %s' % ( err)) if redis_last_metric_data_dict: try: last_flux_timestamp = redis_last_metric_data_dict['timestamp'] use_old_timestamp_keys = False except KeyError: last_flux_timestamp = None except Exception as err: logger.error('error :: populate_metric_worker :: failed to get timestamp from - %s - %s' % ( str(redis_last_metric_data_dict), err)) last_flux_timestamp = None # @modified 20220429 - Feature #4536: Handle Redis failure # Swap to using a Redis hash instead of the # flux.last.<metric> keys if use_old_timestamp_keys: cache_key = None try: cache_key = 'flux.last.%s' % metric # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # @modified 20191128 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # if python_version == 3: # redis_last_flux_metric_data = self.redis_conn.get(cache_key).decode('utf-8') # else: # redis_last_flux_metric_data = self.redis_conn.get(cache_key) redis_last_flux_metric_data = self.redis_conn_decoded.get(cache_key) if LOCAL_DEBUG: if redis_last_flux_metric_data: logger.info('fetcher :: Redis key %s is present' % str(cache_key)) else: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) except AttributeError: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) last_flux_timestamp = False redis_last_flux_metric_data = False except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: retrieving Redis key %s data - %s' % ( str(cache_key), str(e))) redis_last_flux_metric_data = False if redis_last_flux_metric_data: try: last_flux_metric_data = literal_eval(redis_last_flux_metric_data) last_flux_timestamp = int(last_flux_metric_data[0]) if LOCAL_DEBUG: if last_flux_timestamp: logger.info('fetcher :: Redis key %s last_flux_timestamp %s' % (str(cache_key), str(last_flux_timestamp))) else: logger.info('fetcher :: Redis key %s last_flux_timestamp unknown' % (str(cache_key))) except: logger.error(traceback.format_exc()) logger.error('error :: fetch :: failed determining last_flux_timestamp') last_flux_timestamp = False time_now = int(time()) if last_flux_timestamp: last_fetch = time_now - last_flux_timestamp if last_fetch < frequency: if LOCAL_DEBUG: logger.info('fetcher :: last fetch was %s seconds ago, less than frequency %s seconds, not fetching' % (str(last_fetch), str(frequency))) continue # @added 20200107 - Task #3376: Enable vista and flux to deal with lower frequency data # Determine the last known resolution of the metric last_vista_metric_resolution = frequency try: cache_key = 'vista.last.resolution.%s' % metric last_vista_metric_resolution_data = self.redis_conn_decoded.get(cache_key) if last_vista_metric_resolution_data is None: last_vista_metric_resolution_int = last_vista_metric_resolution else: last_vista_metric_resolution_int = int(last_vista_metric_resolution_data) if last_vista_metric_resolution_int > 0: last_vista_metric_resolution = last_vista_metric_resolution_int if LOCAL_DEBUG: if last_vista_metric_resolution: logger.info('fetcher :: Redis key %s is present' % str(cache_key)) else: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) except AttributeError: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) last_vista_metric_resolution = False except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: retrieving Redis key %s data - %s' % ( str(cache_key), str(e))) last_vista_metric_resolution = False if remote_target in vista_unique_metrics and last_flux_timestamp: last_expected_fetch_time = time_now - (frequency + 420) if last_flux_timestamp < last_expected_fetch_time: # @added 20200107 - Task #3376: Enable vista and flux to deal with lower frequency data # Added older_than_resolution older_than_resolution = True if last_vista_metric_resolution: try: last_expected_data_time = time_now - (frequency + 420 + last_vista_metric_resolution) if last_flux_timestamp > last_expected_data_time: older_than_resolution = False except: logger.error(traceback.format_exc()) logger.error('error :: fetch :: failed determining last_expected_data_time') # @modified 20200107 - Task #3376: Enable vista and flux to deal with lower frequency data # Added older_than_resolution # if populate_at_resolutions: if populate_at_resolutions and older_than_resolution: if remote_host_type in ['graphite', 'prometheus']: pre_populate_graphite_metric = True behind_by_seconds = time_now - last_flux_timestamp logger.info('fetcher :: last_flux_timestamp is behind by %s seconds, attempting to pre-populate %s' % ( str(behind_by_seconds), metric)) if remote_target in vista_unique_metrics: if not last_flux_timestamp: if populate_at_resolutions: if remote_host_type in ['graphite', 'prometheus']: pre_populate_graphite_metric = True # Problem with asyncio so using Flux directly if remote_target in vista_unique_metrics and last_flux_timestamp and USE_FLUX: if populate_at_resolutions: if remote_host_type in ['graphite', 'prometheus']: pre_populate_graphite_metric = True if pre_populate_graphite_metric: logger.info('fetcher :: attempting to build the pre-populate Graphite metric urls - %s' % ( metric)) if LOCAL_DEBUG: logger.info('fetcher :: pre_populate_graphite_metric - %s - %s' % ( str(pre_populate_graphite_metric), metric)) fetch_resolution_urls = [] # Build remote Graphite URLs if remote_host_type == 'graphite' and pre_populate_graphite_metric: logger.info('fetcher :: building the pre-populate Graphite metric urls - %s' % ( metric)) try: # Build URLs to submit to flux/HttpPopulateMetric resolutions = [] for resolution in populate_at_resolutions: resolutions.append(resolution) number_of_resolutions = len(resolutions) current_resolution_count = 0 for resolution in resolutions: append_url = True if current_resolution_count < (number_of_resolutions - 1): resolution_url = None if current_resolution_count == 0: next_resolution_count = 1 else: next_resolution_count = current_resolution_count + 1 next_resolution = resolutions[next_resolution_count] # If there is a known last_flux_timestamp only get data # from that time period until now if last_flux_timestamp: if 'days' in resolution: resolution_days = resolution.strip('days') resolution_hours = int(resolution_days) * 24 d = datetime.today() - timedelta(hours=resolution_hours) if 'hours' in resolution: resolution_hours = int(resolution.strip('hours')) d = datetime.today() - timedelta(hours=resolution_hours) resolution_timestamp = int(d.strftime('%s')) if resolution_timestamp < last_flux_timestamp: append_url = False else: append_url = True # If the last_flux_timestamp falls within # the range of the resolution period, append # otherwise the fill will leave an airgap in # the data if 'days' in next_resolution: next_resolution_days = next_resolution.strip('days') next_resolution_hours = int(next_resolution_days) * 24 d = datetime.today() - timedelta(hours=next_resolution_hours) if 'hours' in next_resolution: next_resolution_hours = int(next_resolution.strip('hours')) d = datetime.today() - timedelta(hours=next_resolution_hours) next_resolution_timestamp = int(d.strftime('%s')) if last_flux_timestamp in range(resolution_timestamp, next_resolution_timestamp): append_url = True resolution_url = '%s/render/?from=-%s&until=-%s&format=json&target=%s' % ( str(remote_graphite_host), str(resolution), str(next_resolution), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) else: resolution_url = '%s/render/?from=-%s&until=-%s&format=json&target=%s' % ( str(remote_graphite_host), str(resolution), str(next_resolution), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) current_resolution_count += 1 else: if last_flux_timestamp: if 'days' in resolution: resolution_days = resolution.strip('days') resolution_hours = int(resolution_days) * 24 d = datetime.today() - timedelta(hours=resolution_hours) if 'hours' in resolution: resolution_hours = int(resolution.strip('hours')) d = datetime.today() - timedelta(hours=resolution_hours) resolution_timestamp = int(d.strftime('%s')) if last_flux_timestamp > resolution_timestamp: append_url = True fetch_from_timestamp = last_flux_timestamp - 600 else: append_url = True fetch_from_timestamp = resolution_timestamp resolution_url = '%s/render/?from=%s&format=json&target=%s' % ( str(remote_graphite_host), str(fetch_from_timestamp), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) else: resolution_url = '%s/render/?from=-%s&format=json&target=%s' % ( str(remote_graphite_host), str(resolution), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) if append_url: fetch_resolution_urls.append(resolution_url) if LOCAL_DEBUG: logger.info('fetcher :: appended resolution_url - %s - %s' % ( str(resolution_url), metric)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine the required resolutions for values in VISTA_FETCH_METRICS tuple for - %s - %s' % ( str(remote_target), e)) # @added 20200108 - Task #3376: Enable vista and flux to deal with lower frequency data # Prometheus metrics that use a custom uri cannot be pre populated if remote_host_type == 'prometheus' and pre_populate_graphite_metric: if not default_prometheus_uri: logger.info('fetcher :: cannot pre populate Prometheus metric %s as it uses a custom uri' % ( metric)) pre_populate_graphite_metric = False # Build remote Prometheus URLs if remote_host_type == 'prometheus' and pre_populate_graphite_metric: # Assuming Prometheus only has a single retention (resolution) try: start_seconds_ago = 1296000 # default to 15 days for resolution in populate_at_resolutions: # Build URL to submit to flux/HttpPopulateMetric # url encode the Prometheus metric query to handle # labels and query chars in the URI if 'm' in resolution: resolution_int = resolution.strip('m') start_seconds_ago = int(resolution_int) * 60 if 'h' in resolution: resolution_int = resolution.strip('h') start_seconds_ago = (int(resolution_int) * 60) * 60 if 'd' in resolution: resolution_int = resolution.strip('d') start_seconds_ago = ((int(resolution_int) * 24) * 60) * 60 if 'w' in resolution: resolution_int = resolution.strip('w') start_seconds_ago = (((int(resolution_int) * 7) * 24) * 60) * 60 pop_start_timestamp = end_timestamp - int(start_seconds_ago) urlencoded_remote_target = quote(remote_target) # The query_range query does not return more than # 11000 data points as it is limited as per # https://github.com/prometheus/prometheus/issues/2253#issuecomment-346288842 # so resample needed to be reintroduced after being # deleted as the Prometheus query_range was switched # to uri = '/api/v1/query_range?query=%s&start=%s&end=%s&step=60s' % ( str(urlencoded_remote_target), str(pop_start_timestamp), str(end_timestamp)) # Use query endpoint for more than 11000 data points uri = '/api/v1/query?query=%s[%s]' % ( str(urlencoded_remote_target), str(resolution)) resolution_url = '%s%s' % ( str(remote_prometheus_host), uri) fetch_resolution_urls.append(resolution_url) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine the required pre-populate URI for values in VISTA_FETCH_METRICS tuple for - %s - %s' % ( str(populate_at_resolutions), e)) if fetch_resolution_urls: set_fetch_resolution_urls = set(fetch_resolution_urls) fetch_resolution_urls = list(set_fetch_resolution_urls) flux_url = None try: # Submit to flux/populate_metric protocol = 'http://' flux_url = '%s%s:%s/populate_metric' % ( protocol, str(settings.FLUX_IP), str(settings.FLUX_PORT)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not build the flux URL - %s' % e) payload = None fetch_resolution_urls_str = '"%s"' % fetch_resolution_urls if fetch_resolution_urls and pre_populate_graphite_metric: try: payload = { 'remote_host_type': remote_host_type, 'remote_target': remote_target, 'metric': metric, 'namespace_prefix': namespace_prefix, 'key': settings.FLUX_SELF_API_KEY, 'token': token, 'user': user, 'password': password, 'fetch_resolution_urls': fetch_resolution_urls_str } except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not build the payload json - %s' % e) if flux_url and payload: try: # @modified 20191011 - Task #3258: Reduce vista logging if LOCAL_DEBUG: logger.info('fetcher :: calling %s with payload - %s' % ( flux_url, str(payload))) response = requests.post(flux_url, json=payload) # @modified 20191011 - Task #3258: Reduce vista logging if LOCAL_DEBUG: logger.info('fetcher :: flux /populate_metric response code - %s' % ( str(response.status_code))) # @added 20191011 - Task #3258: Reduce vista logging good_response = False if response.status_code == 200: fetcher_sent_to_flux += 1 # @added 20191011 - Task #3258: Reduce vista logging good_response = True if response.status_code == 204: fetcher_sent_to_flux += 1 # @added 20191011 - Task #3258: Reduce vista logging good_response = True # @added 20191011 - Task #3258: Reduce vista logging if not good_response: logger.error('fetcher :: flux /populate_metric did not respond with 200 or 204, status code - %s for %s' % ( str(response.status_code), flux_url)) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not post data to flux URL - %s, data - %s - %s' % ( str(flux_url), str(payload), e)) if not pre_populate_graphite_metric: if last_flux_timestamp and remote_host_type == 'graphite': try: # Best effort to backfill any missing data url_from = re.sub(r'^.*from=[-]', '', url) url_period = re.sub(r'&.*', '', url_from) if 'days' in url_period: resolution_days = int(url_period.strip('days')) d = datetime.today() - timedelta(days=resolution_days) if 'hours' in url_period: resolution_hours = int(url_period.strip('hours')) d = datetime.today() - timedelta(hours=resolution_hours) if 'minutes' in url_period: resolution_minutes = int(url_period.strip('minutes')) d = datetime.today() - timedelta(minutes=resolution_minutes) from_resolution_timestamp = int(d.strftime('%s')) if from_resolution_timestamp < last_flux_timestamp: rep_str = url_period if 'from=-' in url: rep_str = '-%s' % url_period fetch_from_timestamp = last_flux_timestamp - 300 url = re.sub(rep_str, str(fetch_from_timestamp), url) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine backfill parameters - %s' % e) if last_flux_timestamp and remote_host_type == 'prometheus': try: # Best effort to backfill any missing data if default_prometheus_uri: pop_start_timestamp = int(last_flux_timestamp) - 120 urlencoded_remote_target = quote(remote_target) uri = '/api/v1/query_range?query=%s&start=%s&end=%s&step=60s' % ( str(urlencoded_remote_target), str(pop_start_timestamp), str(end_timestamp)) url = '%s%s' % (str(remote_prometheus_host), uri) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine backfill parameters - %s' % e) metric_to_fetch = [remote_host_type, frequency, remote_target, graphite_target, metric, url, namespace_prefix, api_key, token, user, password] metrics_to_fetch.append(metric_to_fetch) if LOCAL_DEBUG: logger.info('fetcher :: added metric_to_fetch - %s' % str(metric_to_fetch)) if LOCAL_DEBUG: if metrics_to_fetch: metrics_to_fetch_count = len(metrics_to_fetch) logger.info('fetcher :: there are %s metrics in metrics_to_fetch' % str(metrics_to_fetch_count)) if metrics_to_fetch: # Spawn fetch process/es pids = [] spawned_pids = [] pid_count = 0 for i in range(1, settings.VISTA_FETCHER_PROCESSES + 1): if i > len(metrics_to_fetch): logger.info('fetcher :: WARNING: Skyline Vista fetcher is set for more cores than needed.') break try: p = Process(target=self.fetch_process, args=(i, metrics_to_fetch)) pids.append(p) pid_count += 1 logger.info('fetcher :: starting %s of %s fetch_process/es' % (str(pid_count), str(settings.VISTA_FETCHER_PROCESSES))) p.start() spawned_pids.append(p.pid) except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to spawn fetch_process - %s' % e) # Self monitor processes and terminate if any fetch_process has run # for longer than VISTA_FETCHER_PROCESS_MAX_RUNTIME seconds p_starts = time() while time() - p_starts <= settings.VISTA_FETCHER_PROCESS_MAX_RUNTIME: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('fetcher :: %s fetch_process/es completed in %.2f seconds' % (str(settings.VISTA_FETCHER_PROCESSES), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('fetcher :: timed out, killing all fetch_process processes') for p in pids: logger.info('fetcher :: killing fetch_process process') p.terminate() # p.join() logger.info('fetcher :: killed fetch_process process') for p in pids: if p.is_alive(): logger.info('fetcher :: stopping fetch_process - %s' % (str(p.is_alive()))) p.join() # Sleep if it went too fast process_runtime = int(time()) - begin_fetcher_run metrics_fetched_count = 0 if int(process_runtime) < 60: next_run = int(begin_fetcher_run) + 60 time_now = int(time()) sleep_for = next_run - time_now logger.info('fetcher :: sleeping for %s seconds until next fetch' % str(sleep_for)) sleep(sleep_for) try: del sleep_for except: logger.error('error :: fetcher :: failed to del sleep_for') try: del next_run except: logger.error('error :: fetcher :: failed to del next_run') try: del time_now except: logger.error('error :: fetcher :: failed to del time_now') metrics_fetched = [] # metrics_fetched_count = 0 try: redis_set = 'vista.fetcher.metrics.fetched' # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # metrics_fetched = self.redis_conn.smembers(redis_set) metrics_fetched = self.redis_conn_decoded.smembers(redis_set) metrics_fetched_count = len(list(metrics_fetched)) logger.info('fetcher :: %s metrics were fetched' % str(metrics_fetched_count)) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not get Redis set %s' % redis_set) redis_set = 'vista.worker.to.process' try: redis_set = 'vista.fetcher.metrics.fetched' self.redis_conn.delete(redis_set) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not delete the Redis set %s' % redis_set) if metrics_fetched: timestamps = [] for str_metric_fetched in metrics_fetched: try: # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # Use get_redis_conn_decoded # if python_version == 3: # str_metric_fetched = str_metric_fetched.decode('UTF-8') metric_fetched = literal_eval(str_metric_fetched) timestamp = int(metric_fetched[1]) timestamps.append(timestamp) except: logger.error('error :: fetcher :: failed to determine timestamp from %s' % str(str_metric_fetched)) try: timestamps.sort() last_fetch_timestamp = int(timestamps[-1]) time_to_fetch = last_fetch_timestamp - begin_fetcher_run logger.info('fetcher :: %s metrics fetched this run in %s seconds' % ( str(metrics_fetched_count), str(time_to_fetch))) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to last_fetch_timestamp from timestamps') try: redis_set = 'vista.worker.to.process' # @modified 20191111 - Bug #3266: py3 Redis binary objects not strings # Branch #3262: py3 # metrics_count_for_workers = len(list(self.redis_conn.smembers(redis_set))) # @modified 20230401 - Feature #4886: analyzer - operation_timings # metrics_count_for_workers = len(list(self.redis_conn_decoded.smembers(redis_set))) metrics_count_for_workers = self.redis_conn_decoded.scard(redis_set) logger.info('fetcher :: %s of the metrics fetched from this run still need to be processed by a worker' % str(metrics_count_for_workers)) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not get Redis set %s' % redis_set) # Use both the metrics sent to pre_populate_graphite_metric and # @added 20220111 - Feature #4370: Manage vista.fetcher.metrics.json Redis set # the ones submitted to flux metrics_sent_to_flux_count = metrics_fetched_count + fetcher_sent_to_flux send_metric_name = '%s.sent_to_flux' % skyline_app_graphite_namespace try: logger.info('fetcher :: sending Graphite - %s, %s' % ( # @modified 20220111 - Feature #4370: Manage vista.fetcher.metrics.json Redis set # send_metric_name, str(fetcher_sent_to_flux))) send_metric_name, str(metrics_sent_to_flux_count))) # @modified 20220111 - Feature #4370: Manage vista.fetcher.metrics.json Redis set # fetcher_sent_to_flux_str = str(fetcher_sent_to_flux) fetcher_sent_to_flux_str = str(metrics_sent_to_flux_count) send_graphite_metric(self, parent_skyline_app, send_metric_name, fetcher_sent_to_flux_str) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not send %s to Graphite' % send_metric_name) # @added 20191011 - Feature #3260: vista - fetcher add time_to_fetch metric # Added time_to_fetch, metrics_to_fetch, metrics_fetched send_metric_name = '%s.time_to_fetch' % skyline_app_graphite_namespace try: logger.info('fetcher :: sending Graphite - %s, %s' % ( send_metric_name, str(process_runtime))) fetcher_time_to_fetch_str = str(process_runtime) send_graphite_metric(self, parent_skyline_app, send_metric_name, fetcher_time_to_fetch_str) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not send %s to Graphite' % send_metric_name) metrics_to_fetch_count = len(metrics_to_fetch) send_metric_name = '%s.metrics_to_fetch' % skyline_app_graphite_namespace try: logger.info('fetcher :: sending Graphite - %s, %s' % ( send_metric_name, str(metrics_to_fetch_count))) fetcher_metrics_to_fetch_count_str = str(metrics_to_fetch_count) send_graphite_metric(self, parent_skyline_app, send_metric_name, fetcher_metrics_to_fetch_count_str) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not send %s to Graphite' % send_metric_name) send_metric_name = '%s.metrics_fetched' % skyline_app_graphite_namespace try: logger.info('fetcher :: sending Graphite - %s, %s' % ( send_metric_name, str(metrics_fetched_count))) fetcher_metrics_fetched_count_str = str(metrics_fetched_count) send_graphite_metric(self, parent_skyline_app, send_metric_name, fetcher_metrics_fetched_count_str) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not send %s to Graphite' % send_metric_name) try: del process_runtime except: logger.error('error :: fetcher :: failed to del process_runtime')