Source code for horizon.prometheus

"""
prometheus.py
"""
from os import kill, getpid
# try:
#     from Queue import Full
# except ImportError:
#     from queue import Full
from multiprocessing import Process
import sys
from time import time, sleep
from timeit import default_timer as timer
from ast import literal_eval
import traceback
import logging
import os.path
from os import remove as os_remove
import json
import gzip
# @added 20220722 - Task #4624: Change all dict copy to deepcopy
import copy
from contextlib import nullcontext

import zlib
import requests
import memray

import settings
# @modified 20220726 - Task #2732: Prometheus to Skyline
#                      Branch #4300: prometheus
# Moved send_graphite_metric
# from skyline_functions import (get_redis_conn_decoded, send_graphite_metric)
from skyline_functions import get_redis_conn_decoded
from functions.prometheus.metric_name_labels_parser import metric_name_labels_parser
# @added 20220726 - Task #2732: Prometheus to Skyline
#                   Branch #4300: prometheus
from functions.graphite.send_graphite_metric import send_graphite_metric

# @added 20230123 - Task #2732: Prometheus to Skyline
#                   Branch #4300: prometheus
from functions.database.queries.get_all_db_metric_names import get_all_db_metric_names

parent_skyline_app = 'horizon'
child_skyline_app = 'prometheus'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

try:
    HORIZON_PROMETHEUS_METRIC_LIMIT = settings.HORIZON_PROMETHEUS_METRIC_LIMIT
except AttributeError:
    HORIZON_PROMETHEUS_METRIC_LIMIT = settings.CHUNK_SIZE

try:
    # @modified 20220722 - Task #4624: Change all dict copy to deepcopy
    # HORIZON_SHARDS = settings.HORIZON_SHARDS.copy()
    HORIZON_SHARDS = copy.deepcopy(settings.HORIZON_SHARDS)
except:
    HORIZON_SHARDS = {}
try:
    HORIZON_SHARD_DEBUG = settings.HORIZON_SHARD_DEBUG
except:
    HORIZON_SHARD_DEBUG = True

try:
    VICTORIAMETRICS_ENABLED = settings.VICTORIAMETRICS_ENABLED
except:
    VICTORIAMETRICS_ENABLED = False
vm_url = None
if VICTORIAMETRICS_ENABLED:
    # For cluster path, default to VictoriaMetrics standalone path
    try:
        jsonl_insert_path = settings.VICTORIAMETRICS_OPTS['jsonl_insert_path']
    except:
        jsonl_insert_path = '/api/v1/import'
    try:
        if settings.VICTORIAMETRICS_OPTS['scheme'] == 'https':
            vm_url = '%s://%s%s' % (
                settings.VICTORIAMETRICS_OPTS['scheme'],
                settings.VICTORIAMETRICS_OPTS['host'],
                jsonl_insert_path
            )
        else:
            vm_url = '%s://%s:%s%s' % (
                settings.VICTORIAMETRICS_OPTS['scheme'],
                settings.VICTORIAMETRICS_OPTS['host'],
                str(settings.VICTORIAMETRICS_OPTS['port']),
                jsonl_insert_path
            )
    except:
        vm_url = None

try:
    MEMRAY_ENABLED = settings.MEMRAY_ENABLED
except:
    MEMRAY_ENABLED = False

number_of_horizon_shards = 0
this_host = str(os.uname()[1])

if this_host == 'skyline-test-1-fra1':
    DEVELOPMENT = True
    HORIZON_SHARDS = {
        'skyline-test-1-fra1': 0,
        'another-test-node-1': 1,
        'another-test-node-2': 2,
    }
else:
    DEVELOPMENT = False


HORIZON_SHARD = 0
if HORIZON_SHARDS:
    number_of_horizon_shards = len(HORIZON_SHARDS)
    HORIZON_SHARD = HORIZON_SHARDS[this_host]

skyline_app_graphite_namespace = 'skyline.%s%s.%s' % (
    parent_skyline_app, SERVER_METRIC_PATH, child_skyline_app)

LOCAL_DEBUG = False


[docs]def create_global_reload_skip_dict(): """ skip_dict in shared memory """ global global_reload_skip_dict global_reload_skip_dict = False
[docs]def get_global_reload_skip_dict(): """ Reload shared memory skip_dict """ return global_reload_skip_dict
[docs]def update_global_reload_skip_dict(value): """ Update shared memory skip_dict """ global global_reload_skip_dict global_reload_skip_dict = value return global_reload_skip_dict
create_global_reload_skip_dict()
[docs]def create_global_skip_dict(): global global_skip_dict global_skip_dict = {}
[docs]def get_global_skip_dict(): return global_skip_dict
[docs]def update_global_skip_dict(skip_dict): global global_skip_dict global_skip_dict = skip_dict return global_skip_dict
create_global_skip_dict()
[docs]def create_global_prom_to_vm_metrics_dict(): global global_prom_to_vm_metrics_dict global_prom_to_vm_metrics_dict = {}
[docs]def get_global_prom_to_vm_metrics_dict(): return global_prom_to_vm_metrics_dict
[docs]def update_global_prom_to_vm_metrics_dict(prom_to_vm_metrics_dict): global global_prom_to_vm_metrics_dict global_prom_to_vm_metrics_dict = prom_to_vm_metrics_dict return global_prom_to_vm_metrics_dict
create_global_prom_to_vm_metrics_dict()
[docs]def create_global_shard_metrics_dict(): global global_shard_metrics_dict global_shard_metrics_dict = {}
[docs]def get_global_shard_metrics_dict(): return global_shard_metrics_dict
[docs]def update_global_shard_metrics_dict(prom_to_vm_metrics_dict): global global_shard_metrics_dict global_shard_metrics_dict = prom_to_vm_metrics_dict return global_shard_metrics_dict
create_global_shard_metrics_dict()
[docs]def create_global_metrics_type_dict(): global global_metrics_type_dict global_metrics_type_dict = {}
[docs]def get_global_metrics_type_dict(): return global_metrics_type_dict
[docs]def update_global_metrics_type_dict(metrics_type_dict): global global_metrics_type_dict global_metrics_type_dict = metrics_type_dict return global_metrics_type_dict
create_global_metrics_type_dict()
[docs]def create_global_metrics_with_id_dict(): global global_metrics_with_id_dict global_metrics_with_id_dict = {}
[docs]def get_global_metrics_with_id_dict(): return global_metrics_with_id_dict
[docs]def update_global_metrics_with_id_dict(metrics_with_id_dict): global global_metrics_with_id_dict global_metrics_with_id_dict = metrics_with_id_dict return global_metrics_with_id_dict
create_global_metrics_with_id_dict()
[docs]class PrometheusMetrics(Process): """ The PrometheusMetrics process is responsible for getting metrics submitted to flux from the flux.prometheus_metrics Redis hash set and adding them to the Horizon worker queue to be submitted to Redis. """ def __init__(self, parent_pid): # super(PrometheusMetrics, self).__init__() super().__init__() self.daemon = True self.parent_pid = parent_pid self.current_pid = getpid() self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self): """ Self explanatory """ try: kill(self.current_pid, 0) kill(self.parent_pid, 0) except: # Log warning logger.warning('warning :: parent or current process dead') sys.exit(0)
[docs] def metric_in_shard(self, metric_name): """ Check if the metric belongs to the Horizon instance shard. """ shard_number = 0 if not HORIZON_SHARDS: return shard_number metric_as_bytes = str(metric_name).encode() value = zlib.adler32(metric_as_bytes) for shard_node in list(HORIZON_SHARDS.keys()): modulo_result = value % number_of_horizon_shards shard_number = HORIZON_SHARDS[shard_node] if modulo_result == shard_number: return shard_number
[docs] def submit_PrometheusMetrics(self): """ Get Prometheus metrics from the Redis flux.prometheus_metrics set every 60 seconds, aggregated them to 60 seconds and submit them to Redis. """ now_ts = int(time()) next_run_ts = (now_ts // 60 * 60) + 60 test = True untyped_metrics_key = 'horizon.prometheus.untyped_metrics' metrics_with_no_id_key = 'horizon.labelled_metrics.data.no_id' # @added 20220730 - Info #4620: memray memray_file = '%s/horizon.prometheus.bin' % settings.SKYLINE_TMP_DIR memray_file_last = '%s.last' % memray_file if MEMRAY_ENABLED: logger.info('horizon.prometheus :: memray Tracker enabled') else: logger.info('horizon.prometheus :: memray Tracker NOT enabled') # @added 20230107 - Task #4778: v4.0.0 - update dependencies # Address bandit B108:hardcoded_tmp_directory, do not hardcoded tmp # Probable insecure usage of temp file/directory (CWE-377) vm_send_data_jsonl_file = '%s/horizon.vm_send_data.jsonl' % settings.SKYLINE_TMP_DIR running = True while running: # @added 20220730 - Info #4620: memray if os.path.isfile(memray_file_last): os.remove(memray_file_last) logger.info('horizon.prometheus :: removed %s' % str(memray_file_last)) if os.path.isfile(memray_file): os.rename(memray_file, memray_file_last) logger.info('horizon.prometheus :: ranamed %s to %s' % (str(memray_file), str(memray_file_last))) if MEMRAY_ENABLED: memray_tracker = memray.Tracker(memray_file) else: memray_tracker = nullcontext() try: with memray_tracker: self.check_if_parent_is_alive() now_ts = int(time()) sleep_for = next_run_ts - now_ts logger.info('horizon.prometheus :: sleeping for %s seconds until next run' % str(sleep_for)) while now_ts < next_run_ts: # self.check_if_parent_is_alive() now_ts = int(time()) # if not now_ts % 10: # logger.info('horizon.prometheus :: sleeping for %s seconds until next run' % str(sleep_for)) sleep_for = next_run_ts - now_ts if sleep_for < 0: sleep_for = 0.01 logger.info('horizon.prometheus :: sleeping for %s seconds until next run' % str(sleep_for)) sleep(sleep_for) current_run_ts = int(next_run_ts) last_run_ts = next_run_ts - 60 next_run_ts = (now_ts // 60 * 60) + 60 metrics_with_no_id_data_list = [] # @added 20230123 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus all_db_base_names = {} all_db_base_names_with_ids = {} # @added 20230205 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus if not VICTORIAMETRICS_ENABLED or not vm_url: logger.error('error :: horizon.prometheus :: no victoriametrics store to write to is known, will drop data') logger.info('horizon.prometheus :: checking for prometheus metrics submitted to flux') new_key = 'flux.prometheus_metrics.%s' % str(current_run_ts) try: self.redis_conn_decoded.rename('flux.prometheus_metrics', new_key) except Exception as err: if str(err) == 'no such key': logger.info('horizon.prometheus :: flux.prometheus_metrics set does not exist') else: logger.error('error :: horizon.prometheus :: failed to rename Redis set flux.prometheus_metrics to %s - %s' % ( new_key, str(err))) # Manage global skip dicts # global skip dict variables are created and updated when necessary # meaning when a metric arrives that is not in the skip_dict or # do_not_skip_dict, the metric will be dropped and the global # reload_skip_sets variable will be set to true, so the subsequent # run will reload the skip dicts and when the metric is present, # which might only be after a number of runs until it is updated in # metrics manager, at which point it will be processed accordingly. # This minimises the amount of hgetall requests needed to be made for # skip dicts. check_skip = False if check_skip: skipped_metrics = [] skip_dict = {} do_reload_skip_dict = False try: do_reload_skip_dict = get_global_reload_skip_dict() except Exception as err: logger.error('error :: horizon.prometheus :: get_global_reload_skip_dict failed - %s' % ( str(err))) if do_reload_skip_dict: logger.info('horizon.prometheus :: get_global_reload_skip_dict returned True reloading skip_dict') updated_prometheus_skip_dict = 0 try: updated_prometheus_skip_dict = self.redis_conn_decoded.get('metrics_manager.updated.prometheus_skip_dict') except Exception as err: logger.error('error :: metrics_manager :: failed to set metrics_manager.updated.prometheus_skip_dict - %s' % ( err)) if updated_prometheus_skip_dict: do_reload_skip_dict = True logger.info('horizon.prometheus :: metrics_manager has updated metrics_manager.prometheus_skip_dict reloading skip_dict') try: self.redis_conn_decoded.expire('metrics_manager.updated.prometheus_skip_dict', 50) except Exception as err: logger.error('error :: metrics_manager :: failed to expire metrics_manager.updated.prometheus_skip_dict - %s' % ( err)) if not do_reload_skip_dict: try: skip_dict = get_global_skip_dict() except Exception as err: logger.error('error :: horizon.prometheus :: get_global_skip_dict failed - %s' % ( str(err))) skip_dict = {} if not skip_dict: logger.info('horizon.prometheus :: skip_dict empty reloading skip_dict') do_reload_skip_dict = True if do_reload_skip_dict: # @modified 20220722 - Task #4624: Change all dict copy to deepcopy # previous_skip_dict = skip_dict.copy() previous_skip_dict = copy.deepcopy(skip_dict) try: skip_dict = self.redis_conn_decoded.hgetall('metrics_manager.prometheus_skip_dict') except Exception as err: logger.error('error :: horizon.prometheus :: failed to hgetall Redis hash metrics_manager.prometheus_skip_dict - %s' % ( str(err))) if not skip_dict and previous_skip_dict: skip_dict = copy.deepcopy(previous_skip_dict) del previous_skip_dict if skip_dict: try: new_global_skip_dict = update_global_skip_dict(skip_dict) if new_global_skip_dict: skip_dict = new_global_skip_dict except Exception as err: logger.error('error :: horizon.prometheus :: update_global_skip_dict failed - %s' % ( str(err))) try: do_reload_skip_dict = update_global_reload_skip_dict(False) except Exception as err: logger.error('error :: horizon.prometheus :: update_global_reload_skip_dict failed - %s' % ( str(err))) # Create the flux.prometheus_namespace_cardinality.last with counts last_cardinality_key = 'flux.prometheus_namespace_cardinality.%s' % str(last_run_ts) prometheus_namespace_cardinality_dict = {} try: prometheus_namespace_cardinality_dict = self.redis_conn_decoded.hgetall(last_cardinality_key) except Exception as err: logger.error('error :: horizon.prometheus :: failed to hgetall Redis hash %s - %s' % ( last_cardinality_key, str(err))) namespace_cardinality_dict = {} for namespace in list(prometheus_namespace_cardinality_dict.keys()): try: namespace_cardinality_dict[namespace] = len(literal_eval(prometheus_namespace_cardinality_dict[namespace])) except Exception as err: logger.error('error :: horizon.prometheus :: failed to determine last cardinality count for %s - %s' % ( namespace, str(err))) del prometheus_namespace_cardinality_dict if namespace_cardinality_dict: try: self.redis_conn_decoded.hset('flux.prometheus_namespace_cardinality.last', mapping=namespace_cardinality_dict) logger.info('horizon.prometheus :: hset flux.prometheus_namespace_cardinality.last Redis hash with %s namespace counts' % str(len(namespace_cardinality_dict))) except Exception as err: logger.error('error :: horizon.prometheus :: failed to hset flux.prometheus_namespace_cardinality.last Redis hash - %s' % ( str(err))) # Create a dict for metric types metrics_type_dict = {} update_metrics_type_dict = 0 try: metrics_type_dict = get_global_metrics_type_dict() except Exception as err: logger.error('error :: horizon.prometheus :: get_metrics_type_dict failed - %s' % ( str(err))) metrics_type_dict = {} logger.info('horizon.prometheus :: got metric types for %s tenant_ids from get_global_metrics_type_dict' % str(len(metrics_type_dict))) # Create a dict for metric types metrics_with_id_dict = {} update_metrics_with_id_dict = False try: metrics_with_id_dict = get_global_metrics_with_id_dict() except Exception as err: logger.error('error :: horizon.prometheus :: get_metrics_with_id_dict failed - %s' % ( str(err))) metrics_with_id_dict = {} logger.info('horizon.prometheus :: got %s metrics in metrics_with_id_dict from get_global_metrics_with_id_dict' % str(len(metrics_with_id_dict))) if not metrics_with_id_dict: logger.info('horizon.prometheus :: getting metrics_with_id_dict from Redis aet.metrics_manager.active_labelled_metrics_with_id') try: metrics_with_id_dict = self.redis_conn_decoded.hgetall('aet.metrics_manager.active_labelled_metrics_with_id') logger.info('horizon.prometheus :: got %s metrics in metrics_with_id_dict from Redis aet.metrics_manager.active_labelled_metrics_with_id' % str(len(metrics_with_id_dict))) except Exception as err: logger.error('error :: horizon.prometheus :: hgetall aet.metrics_manager.active_labelled_metrics_with_id failed - %s' % ( str(err))) if metrics_with_id_dict: update_metrics_with_id_dict = True if update_metrics_with_id_dict: try: new_metrics_with_id_dict = update_global_metrics_with_id_dict(metrics_with_id_dict) if new_metrics_with_id_dict: metrics_with_id_dict = new_metrics_with_id_dict logger.info('horizon.prometheus :: updated global_metrics_with_id_dict with %s metrics' % str(len(metrics_with_id_dict))) except Exception as err: logger.error('error :: horizon.prometheus :: update_global_metrics_with_id_dict failed - %s' % ( str(err))) update_metrics_with_id_dict = False aggregated_metrics_data_list = [] metrics_added = 0 metrics_submitted = 0 metrics_dropped = 0 metric_data_dropped = [] metric_data_accepted = [] metrics_data_list = [] try: metrics_data_list = list(self.redis_conn_decoded.smembers(new_key)) except Exception as err: logger.error('error :: horizon.prometheus :: failed to query Redis set %s - %s' % ( new_key, str(err))) flux_prometheus_received = len(metrics_data_list) logger.info('horizon.prometheus :: flux received %s prometheus entries' % str(flux_prometheus_received)) if metrics_data_list: try: self.redis_conn_decoded.expire(new_key, 120) except Exception as err: logger.error('error :: horizon.prometheus :: failed to expire Redis set %s - %s' % ( new_key, str(err))) # @added 20230203 # We have to fetch the flux.horizon_prometheus_metrics.<ALIGNED_TIMESTAMPS> # and add them to the metrics_data_list if number_of_horizon_shards: current_aligned_ts = int(int(time()) // 60 * 60) check_flux_timestamps = [current_aligned_ts, (current_aligned_ts - 60)] for check_timestamp in check_flux_timestamps: flux_metrics_data_list = [] check_key = 'flux.horizon_prometheus_metrics.%s' % str(check_timestamp) try: flux_metrics_data_list = list(self.redis_conn_decoded.smembers(check_key)) except Exception as err: logger.error('error :: horizon.prometheus :: failed to query Redis set %s - %s' % ( check_key, str(err))) if flux_metrics_data_list: logger.info('horizon.prometheus :: adding an additional %s metrics received via flux from another horizon shard' % ( str(len(flux_metrics_data_list)))) metrics_data_list = metrics_data_list + flux_metrics_data_list try: self.redis_conn_decoded.delete(check_key) except Exception as err: logger.error('error :: horizon.prometheus :: failed to delete Redis set %s - %s' % ( check_key, str(err))) # Create a dict for victoriametrics json to submit victoriametrics_data = {} try: prom_to_vm_metrics_dict = get_global_prom_to_vm_metrics_dict() except Exception as err: logger.error('error :: horizon.prometheus :: get_global_prom_to_vm_metrics_dict failed - %s' % ( str(err))) prom_to_vm_metrics_dict = {} logger.info('horizon.prometheus :: got %s vm_metrics from get_global_prom_to_vm_metrics_dict' % str(len(prom_to_vm_metrics_dict))) # TODO - route to correct shard/vmagent - as quick as possible shard_metrics_dict = {} other_shard_metrics = {} update_shard_metrics_dict = False if number_of_horizon_shards: for shard_node in list(HORIZON_SHARDS.keys()): if shard_node == this_host: continue other_shard_metrics[HORIZON_SHARDS[shard_node]] = [] logger.info('horizon.prometheus :: created other_shard_metrics: %s' % str(other_shard_metrics)) try: shard_metrics_dict = get_global_shard_metrics_dict() except Exception as err: logger.error('error :: horizon.prometheus :: get_global_shard_metrics_dict failed - %s' % ( str(err))) shard_metrics_dict = {} logger.info('horizon.prometheus :: got %s shard metrics from get_global_shard_metrics_dict' % str(len(shard_metrics_dict))) metrics_data_dict = {} metrics_metadata_dict = {} error_logged = False for metric_data_str in metrics_data_list: data_list = [] try: data_list = metric_data_str.split(' ') except Exception as err: if not error_logged: logger.error('error :: horizon.prometheus :: interpolate data from - %s - %s' % ( str(metric_data_str), str(err))) error_logged = True if not data_list: continue try: # Handle spaces in metric name # metric_name_list = data_list[2:] # if len(metric_name_list) == 1: # metric = data_list[2] # else: # metric = ''.join(data_list[2:]) # # metric = metric.replace('\\', ' ') # Convert millisecond timestamp to unix timestamp tenant_id = str(data_list[0]) server_id = str(data_list[1]) timestamp_ms = int(data_list[2]) timestamp = int(int(data_list[2]) / 1000) value = float(data_list[3]) timestamp_str = data_list[2] value_str = data_list[3] if value_str == 'inf': continue remove_data = '%s %s %s %s ' % (tenant_id, server_id, timestamp_str, value_str) # Handle spaces in metric name metric = metric_data_str.replace(remove_data, '') # skip_metric = 1 skip_metric = 0 if check_skip: try: skip_metric = int(str(skip_dict[metric])) except: # metric unknown as not in skip_dict so skip until known skip_metric = 1 # Set so set global reload_skip_dict to True try: do_reload_skip_dict = update_global_reload_skip_dict(True) except Exception as err: logger.error('error :: horizon.prometheus :: update_reload_skip_dict failed - %s' % ( str(err))) if skip_metric: skipped_metrics.append(metric) continue # TODO - route to correct shard/vmagent if number_of_horizon_shards: metric_shard = None try: metric_shard = shard_metrics_dict[metric] except: try: metric_shard = self.metric_in_shard(metric) shard_metrics_dict[metric] = metric_shard update_shard_metrics_dict = True except Exception as err: if not error_logged: logger.error(traceback.format_exc()) logger.error('error :: horizon.prometheus :: self.metric_in_shard failed for %s - %s' % ( str(metric), str(err))) # error_logged = True continue # @modified 20230303 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus # Handle the 0 shard append_to_other_shard_metrics = False if metric_shard and metric_shard != HORIZON_SHARD: append_to_other_shard_metrics = True if metric_shard == 0 and metric_shard != HORIZON_SHARD: append_to_other_shard_metrics = True if append_to_other_shard_metrics: # try: # other_shard_metric_data = other_shard_metrics[metric_shard][metric] # except: # other_shard_metric_data = [] other_shard_metric_data = { 'metric': metric, 'server_id': server_id, 'tenant_id': tenant_id, 'timestamp': timestamp_ms, 'value': value} try: # other_shard_metric_data.append([timestamp_ms, value]) other_shard_metrics[metric_shard].append(other_shard_metric_data) except Exception as err: if not error_logged: logger.error(traceback.format_exc()) logger.error('error :: horizon.prometheus :: failed appending to other_shard_metrics[%s] - %s %s - %s' % ( str(metric_shard), str(metric), str(other_shard_metric_data), str(err))) logger.debug('debug :: horizon.prometheus :: failed with metric_data_str: %s' % str(metric_data_str)) error_logged = True if not DEVELOPMENT: continue try: metric_data = metrics_data_dict[metric] except KeyError: metric_data = [] metrics_metadata_dict[metric] = {'tenant_id': tenant_id, 'server_id': server_id} except Exception as err: if not error_logged: logger.error(traceback.format_exc()) logger.error('error :: horizon.prometheus :: interpolate metric_data from - %s - %s' % ( str(metric_data_str), str(err))) error_logged = True continue # Deduplicate if [timestamp, value] in metric_data: continue metric_data.append([timestamp, value]) metrics_data_dict[metric] = metric_data vm_metric = None vm_metric_name = None try: vm_metric = prom_to_vm_metrics_dict[metric]['metric'] # dict vm_metric_name = prom_to_vm_metrics_dict[metric]['metric_name'] # str except: vm_metric = None vm_metric_name = None update_prom_to_vm_metrics_dict = False if not vm_metric: try: metric_dict = metric_name_labels_parser('horizon', metric) metric_name = metric_dict['metric'] vm_metric = {} vm_metric['__name__'] = metric_name vm_metric_labels = metric_dict['labels'] labels = '' for key in list(metric_dict['labels'].keys()): vm_metric[key] = metric_dict['labels'][key] if labels == '': labels = '"%s":"%s"' % (key, metric_dict['labels'][key]) else: labels = '%s,"%s":"%s"' % (labels, key, metric_dict['labels'][key]) vm_metric_name = '{"__name__":"%s",%s}' % (metric_name, labels) update_prom_to_vm_metrics_dict = True except Exception as err: if not error_logged: logger.error(traceback.format_exc()) logger.error('error :: horizon.prometheus :: could not determine vm_metric name for metric %s - %s' % ( str(metric), str(err))) error_logged = True continue if update_prom_to_vm_metrics_dict: prom_to_vm_metrics_dict[metric] = {} prom_to_vm_metrics_dict[metric]['metric'] = vm_metric prom_to_vm_metrics_dict[metric]['metric_name'] = vm_metric_name prom_to_vm_metrics_dict[metric]['labels'] = vm_metric_labels prom_to_vm_metrics_dict[metric]['name'] = metric_name try: new_prom_to_vm_metrics_dict = update_global_prom_to_vm_metrics_dict(prom_to_vm_metrics_dict) if new_prom_to_vm_metrics_dict: prom_to_vm_metrics_dict = new_prom_to_vm_metrics_dict except Exception as err: logger.error('error :: horizon.prometheus :: update_global_prom_to_vm_metrics_dict failed - %s' % ( str(err))) if update_shard_metrics_dict: try: new_shard_metrics_dict = update_global_shard_metrics_dict(shard_metrics_dict) if new_shard_metrics_dict: shard_metrics_dict = new_shard_metrics_dict except Exception as err: logger.error('error :: horizon.prometheus :: update_global_shard_metrics_dict failed - %s' % ( str(err))) # victoriametrics json parser does not handle Infinity, null or # false # VictoriaMetrics/lib/protoparser/vmimport/parser.go error with # cannot unmarshal json line \"values\": [null, null]}": cannot # unmarshal value at position 0: value doesn't contain number; # it contains null; skipping it # Same for Infinity and false if value == float('inf'): continue try: victoriametrics_data[vm_metric_name]['timestamps'].append(timestamp_ms) victoriametrics_data[vm_metric_name]['values'].append(value) except: try: victoriametrics_data[vm_metric_name] = {} victoriametrics_data[vm_metric_name]['metric'] = vm_metric victoriametrics_data[vm_metric_name]['timestamps'] = [timestamp_ms] victoriametrics_data[vm_metric_name]['values'] = [value] except Exception as err: logger.error('error :: horizon.prometheus :: failed to add %s to victoriametrics_data - %s' % ( str(vm_metric), str(err))) del metrics_data_list # All prometheus metrics need to be aggregated to 60 seconds as # there is no gaurantee of what interval they are submitted at error_logged = False untyped_metrics = {} # Infer type from metric name as per # https://docs.newrelic.com/docs/infrastructure/prometheus-integrations/install-configure-remote-write/set-your-prometheus-remote-write-integration/#override-mapping counter_metricname_patterns = ['_bucket{', '_count{', '_total{', '_sum{'] for metric in list(metrics_data_dict.keys()): metrics_added += 1 metric_timeseries = metrics_data_dict[metric] if len(metric_timeseries) == 1: aggregated_metrics_data_list.append([metric, metric_timeseries]) continue metric_type = None namespace = metric.split('{', maxsplit=1)[0] query_redis_hash = False try: metric_type = metrics_type_dict[tenant_id][server_id][namespace] except: query_redis_hash = True if query_redis_hash: tenant_id = metrics_metadata_dict[metric]['tenant_id'] server_id = metrics_metadata_dict[metric]['server_id'] metrics_type_key = 'metrics_manager.prometheus.metrics_type.%s.%s' % ( str(tenant_id), str(server_id)) try: raw_metric_type = self.redis_conn_decoded.hget(metrics_type_key, namespace) if raw_metric_type: metric_type = str(raw_metric_type) except Exception as err: if not error_logged: logger.error('error :: horizon.prometheus :: failed to determine metric_type for %s from Redis hash %s - %s' % ( str(metric), metrics_type_key, str(err))) error_logged = True if not metric_type: for counter_metricname_pattern in counter_metricname_patterns: if counter_metricname_pattern in metric: metric_type = 'COUNTER' try: self.redis_conn_decoded.hset(metrics_type_key, namespace, metric_type) except Exception as err: if not error_logged: logger.error('error :: horizon.prometheus :: failed to add metric_type for %s to Redis hash %s - %s' % ( str(namespace), metrics_type_key, str(err))) error_logged = True untyped_metric_key = '%s.%s.%s' % ( str(tenant_id), str(server_id), metric) try: self.redis_conn_decoded.hdel(untyped_metrics_key, untyped_metric_key) except Exception as err: if not error_logged: logger.error('error :: horizon.prometheus :: failed to hdel %s from Redis set %s - %s' % ( str(untyped_metric_key), untyped_metrics_key, str(err))) error_logged = True break if metric_type: if tenant_id not in metrics_type_dict: metrics_type_dict[tenant_id] = {} if server_id not in metrics_type_dict[tenant_id]: metrics_type_dict[tenant_id][server_id] = {} if metric not in metrics_type_dict[tenant_id][server_id]: metrics_type_dict[tenant_id][server_id][namespace] = metric_type update_metrics_type_dict += 1 if not metric_type: metric_type = 'UNTYPED' untyped_metric_key = '%s.%s.%s' % ( str(tenant_id), str(server_id), metric) untyped_metric_dict = {'metric': metric, 'tenant_id': tenant_id, 'server_id': server_id} untyped_metrics[untyped_metric_key] = str(untyped_metric_dict) # @modified 20220810 - Do not aggregate the timeseries # data because at times this results in counter data # going down instead of up. aggregate_timeseries = False if aggregate_timeseries: # Group aggregates by 60 second period so that if lag/delay or # multiple submissions are added in a single period, they are # submitted for the relevant 60 seconds period, rather than # aggregating multiple periods into the current period. timestamps = [ts for ts, value in metric_timeseries] all_aligned_timestamps = [int(ts // 60 * 60) for ts in timestamps] del timestamps aligned_timestamps = list(set(all_aligned_timestamps)) for timestamp in aligned_timestamps: metric_values = [] for ts, value in metric_timeseries: if int(ts // 60 * 60) == timestamp: last_ts = ts metric_values.append(value) if metric_values: try: if metric_type == 'GAUGE': aggregate_value = sum(metric_values) / len(metric_values) else: aggregate_value = sum(metric_values) aggregated_metrics_data_list.append([metric, [last_ts, aggregate_value]]) except Exception as err: logger.error('error :: horizon.prometheus :: failed to calculate add aggregate_value for %s - %s' % ( str(metric), str(err))) del aligned_timestamps else: for ts, value in metric_timeseries: aggregated_metrics_data_list.append([metric, [ts, value]]) del metrics_data_dict if update_metrics_type_dict: try: new_metrics_type_dict = update_global_metrics_type_dict(metrics_type_dict) if new_metrics_type_dict: metrics_type_dict = new_metrics_type_dict logger.info('horizon.prometheus :: updated global metrics_type_dict with %s metric namespaces' % str(update_metrics_type_dict)) except Exception as err: logger.error('error :: horizon.prometheus :: update_global_metrics_type_dict failed - %s' % ( str(err))) if untyped_metrics: try: self.redis_conn_decoded.hset(untyped_metrics_key, mapping=untyped_metrics) except Exception as err: logger.error('error :: horizon.prometheus :: could hset to Redis key %s - %s' % ( untyped_metrics_key, str(err))) logger.info('horizon.prometheus :: recorded %s metrics due to no known metric type in %s' % ( str(len(untyped_metrics)), untyped_metrics_key)) logger.info('horizon.prometheus :: flux prometheus entries aggregrated to %s metrics' % str(len(aggregated_metrics_data_list))) if metric_data_dropped: dropped_key = 'horizon.dropped.prometheus_metrics.%s' % str(current_run_ts) logger.warning('horizon.prometheus :: adding %s dropped metrics to the %s Redis set' % ( str(len(metric_data_dropped)), dropped_key)) try: self.redis_conn_decoded.setex(dropped_key, 86400, str(metric_data_dropped)) except Exception as err: logger.error('error :: horizon.prometheus :: could add metric_data_dropped to Redis key %s - %s' % ( dropped_key, str(err))) del metric_data_dropped if test and metric_data_accepted: accepted_key = 'horizon.accepted.prometheus_metrics.%s' % str(current_run_ts) try: self.redis_conn_decoded.setex(accepted_key, 600, str(metric_data_accepted)) except Exception as err: logger.error('error :: horizon.prometheus :: could add metric_data_accepted to Redis key %s - %s' % ( accepted_key, str(err))) # logger.info('horizon.prometheus :: skipped %s metrics' % str(len(skipped_metrics))) logger.info('horizon.prometheus :: submitted %s metrics to redis in the last 60 seconds' % str(metrics_submitted)) submitted_metrics_metric_name = '%s.submitted' % skyline_app_graphite_namespace victoriametrics_data_list = [] error_logged = False if VICTORIAMETRICS_ENABLED: for metric in list(victoriametrics_data.keys()): try: data_dict = { 'metric': victoriametrics_data[metric]['metric'], 'timestamps': victoriametrics_data[metric]['timestamps'], 'values': victoriametrics_data[metric]['values'], } victoriametrics_data_list.append(data_dict) except Exception as err: if not error_logged: logger.error('error :: horizon.prometheus :: could add data to victoriametrics_data_list - %s - %s' % ( victoriametrics_data[metric], str(err))) error_logged = True vm_metrics_key = 'horizon.prometheus_metrics.to.victoriametrics.%s' % str(current_run_ts) if victoriametrics_data_list: try: self.redis_conn_decoded.sadd(vm_metrics_key, str(victoriametrics_data_list)) self.redis_conn_decoded.expire(vm_metrics_key, 120) except Exception as err: logger.error('error :: horizon.prometheus :: could add victoriametrics_data_list to Redis key %s - %s' % ( vm_metrics_key, str(err))) # To send victoriametrics headers = {'Accept': 'application/json', 'Content-Type': 'application/json'} # Limit to 1000 metrics per request victoriametrics_submissions = [] for i in range(0, len(victoriametrics_data_list), 1000): victoriametrics_submissions.append(victoriametrics_data_list[i:(i + 1000)]) for victoriametrics_data in victoriametrics_submissions: try: # Handle cluster import url # url = '%s/api/v1/import' % vm_url url = str(vm_url) # @modified 20230107 - Task #4778: v4.0.0 - update dependencies # Address bandit B108:hardcoded_tmp_directory # with open('/tmp/skyline/horizon.vm_send_data.jsonl', 'w') as outfile: with open(vm_send_data_jsonl_file, 'w') as outfile: for entry in victoriametrics_data: json.dump(entry, outfile) outfile.write('\n') except Exception as err: logger.error('error :: horizon.prometheus :: failed to write data to %s - %s' % ( vm_send_data_jsonl_file, str(err))) try: # @modified 20230107 - Task #4778: v4.0.0 - update dependencies # Address bandit B108:hardcoded_tmp_directory # r = requests.post(url, data=open('/tmp/skyline/horizon.vm_send_data.jsonl', 'rb'), headers=headers) r = requests.post(url, data=open(vm_send_data_jsonl_file, 'rb'), headers=headers) logger.info('horizon.prometheus :: sent %s metrics to victoriametrics with response code: %s' % (str(len(victoriametrics_data)), str(r.status_code))) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: horizon.prometheus :: failed to send data to victoriametrics - %s' % ( str(err))) del victoriametrics_submissions logger.info('horizon.prometheus :: %s prometheus/victoriametrics in prom_to_vm_metrics_dict' % str(len(prom_to_vm_metrics_dict))) del victoriametrics_data del victoriametrics_data_list logger.info('horizon.prometheus :: %s metrics in shard_metrics_dict' % str(len(shard_metrics_dict))) other_shard_submissions = [] # if other_shard_metrics and DEVELOPMENT: if other_shard_metrics: # What would you do here, how to send to other shards # send to other flux on flux/prometheus/write as json with # horizon-shard header logger.info('horizon.prometheus :: %s metrics in other_shard_metrics' % str(len(other_shard_metrics))) other_shard_metrics_key = 'horizon.prometheus_metrics.other_shard_metrics.%s' % str(current_run_ts) for shard_number in list(other_shard_metrics.keys()): # @modified 20230303 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus # Handle the 0 shard # if other_shard_metrics[shard_number]: if other_shard_metrics[shard_number] or other_shard_metrics[shard_number] == 0: try: self.redis_conn_decoded.hset(other_shard_metrics_key, shard_number, str(other_shard_metrics[shard_number])) self.redis_conn_decoded.expire(other_shard_metrics_key, 120) logger.info('horizon.prometheus :: added to Redis hash %s' % other_shard_metrics_key) except Exception as err: logger.error('error :: horizon.prometheus :: could not add other_shard_metrics_key to Redis key %s - %s' % ( other_shard_metrics_key, str(err))) # What would you do here, how to send to other shards # send to other flux on flux/prometheus/write as json with # horizon-shard header and determine flux URL from # REMOTE_SKYLINE_INSTANCES = [ # ['https://skyline-test-2.example.org', 'admin', 'xxxx', 'skyline-test-2'], # ['https://skyline-test-3.example.org', 'admin', 'xxxx', 'skyline-test-3'], # ] for shard_host in list(HORIZON_SHARDS.keys()): if shard_number == HORIZON_SHARDS[shard_host]: other_shard_host = shard_host for item in settings.REMOTE_SKYLINE_INSTANCES: if item[3] == other_shard_host: url = '%s/flux/prometheus/write' % item[0] # TODO - handle large payload and split into smaller chunks so that 413 is not # returned other_shard_submissions = [] for i in range(0, len(other_shard_metrics[shard_number]), 2500): other_shard_submissions.append(other_shard_metrics[shard_number][i:(i + 2500)]) for other_shard_submission in other_shard_submissions: try: # flux_horizon_dict = { # "key": settings.FLUX_SELF_API_KEY, # "metrics": other_shard_metrics[shard_number] # } flux_horizon_dict = { "key": settings.FLUX_SELF_API_KEY, "metrics": other_shard_submission } logger.info('horizon.prometheus :: flux_horizon_dict size: %s bytes, string length: %s' % ( str(sys.getsizeof(flux_horizon_dict)), str(len(str(flux_horizon_dict))))) payload = gzip.compress(json.dumps(flux_horizon_dict).encode('utf-8')) logger.info('horizon.prometheus :: flux_horizon_dict gzipped size: %s' % str(sys.getsizeof(payload))) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', 'Content-Encoding': 'gzip', 'horizon-shard': this_host } # Compress payload # r = requests.post(url, data=json.dumps(flux_horizon_dict), headers=headers) r = requests.post(url, data=payload, headers=headers) logger.info('horizon.prometheus :: sent %s metrics to %s for shard %s with response code: %s' % ( str(len(other_shard_submission)), url, str(shard_number), str(r.status_code))) except Exception as err: logger.error('error :: horizon.prometheus :: failed to send data to %s - %s' % ( url, str(err))) logger.info('horizon.prometheus :: sent %s metrics shard %s' % ( str(len(other_shard_metrics[shard_number])), str(shard_number))) del other_shard_metrics del other_shard_submissions previous_metrics_with_no_id_data_strs = [] try: previous_metrics_with_no_id_data_strs = list(self.redis_conn_decoded.smembers(metrics_with_no_id_key)) except Exception as err: logger.error('error :: horizon.prometheus :: could not smembers to Redis set %s - %s' % ( metrics_with_no_id_key, str(err))) previous_metrics_with_no_id_data = [] error_logged = False removed_previous_metrics_with_no_id_data = 0 if previous_metrics_with_no_id_data_strs: logger.info('horizon.prometheus :: prefixing aggregated_metrics_data_list with %s entries for metrics without known ids' % ( str(len(previous_metrics_with_no_id_data_strs)))) for item in previous_metrics_with_no_id_data_strs: try: previous_data = literal_eval(item) if isinstance(previous_data[1][0], list): previous_data_timestamp = int(previous_data[1][0][0]) else: previous_data_timestamp = int(previous_data[1][0]) if previous_data_timestamp >= int(int(now_ts) - 300): previous_metrics_with_no_id_data.append(previous_data) else: removed_previous_metrics_with_no_id_data += 1 except Exception as err: if not error_logged: logger.error('error :: horizon.prometheus :: failed to add to previous_metrics_with_no_id_data %s - %s' % ( str(item), str(err))) error_logged = True del previous_metrics_with_no_id_data_strs logger.info('horizon.prometheus :: removing %s metrics with no id entries from %s' % ( str(removed_previous_metrics_with_no_id_data), metrics_with_no_id_key)) if previous_metrics_with_no_id_data: try: self.redis_conn_decoded.delete(metrics_with_no_id_key) except Exception as err: logger.error('error :: horizon.prometheus :: could delete Redis set %s - %s' % ( metrics_with_no_id_key, str(err))) aggregated_metrics_data_list = previous_metrics_with_no_id_data + aggregated_metrics_data_list global_metrics_with_id_dict_updated = False labelled_metrics = [] retention_msecs = int(settings.FULL_DURATION) * 1000 redis_timeseries_errors = [] redis_timeseries_failed_to_insert = [] logger.info('horizon.prometheus :: inserting %s entries into redistimeseries' % ( str(len(aggregated_metrics_data_list)))) start_insert = timer() inserted_count = 0 time_now = int(time()) no_id_keys = 0 unique_labelled_metrics_key = 'labelled_metrics.unique_labelled_metrics' metrics_with_unknown_id = [] metrics_with_id_found_in_db = {} updated_global_metrics_with_id_dict_count = 0 for item in aggregated_metrics_data_list: try: metric = item[0] metric_id = 0 if metrics_with_id_dict: try: metric_id = int(float(metrics_with_id_dict[metric])) except: metric_id = 0 if not metric_id and not global_metrics_with_id_dict_updated: logger.info('horizon.prometheus :: getting metrics_with_id_dict from Redis aet.metrics_manager.active_labelled_metrics_with_id to update global_metrics_with_id_dict') try: metrics_with_id_dict = self.redis_conn_decoded.hgetall('aet.metrics_manager.active_labelled_metrics_with_id') logger.info('horizon.prometheus :: got %s metrics in metrics_with_id_dict from Redis aet.metrics_manager.active_labelled_metrics_with_id' % str(len(metrics_with_id_dict))) global_metrics_with_id_dict_updated = True except Exception as err: logger.error('error :: horizon.prometheus :: hgetall aet.metrics_manager.active_labelled_metrics_with_id failed - %s' % ( str(err))) if metrics_with_id_dict: try: new_metrics_with_id_dict = update_global_metrics_with_id_dict(metrics_with_id_dict) if new_metrics_with_id_dict: metrics_with_id_dict = new_metrics_with_id_dict # Logs every metric when service restarts # logger.info('horizon.prometheus :: updated global_metrics_with_id_dict with %s metrics' % str(len(metrics_with_id_dict))) except Exception as err: logger.error('error :: horizon.prometheus :: update_global_metrics_with_id_dict failed - %s' % ( str(err))) try: metric_id = int(float(metrics_with_id_dict[metric])) except: metric_id = 0 if not metric_id: try: metric_id = int(float(self.redis_conn_decoded.hget('aet.metrics_manager.metric_names_with_ids', metric))) except: pass # @added 20230123 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus # Check metric if metric/s are in DB and not in # Redis for metrrics_manager to reset as active if not metric_id: if not all_db_base_names_with_ids: try: with_ids = True all_db_base_names, all_db_base_names_with_ids = get_all_db_metric_names(skyline_app, with_ids) logger.info('horizon.prometheus :: got %s metrics in all_db_base_names_with_ids with get_all_db_metric_names' % str(len(all_db_base_names_with_ids))) except Exception as err: logger.error('error :: horizon.prometheus :: get_all_db_metric_names failed - %s' % ( str(err))) metrics_with_unknown_id.append(metric) try: metric_id = int(all_db_base_names_with_ids[metric]) metrics_with_id_found_in_db[metric_id] = metric except: pass if metric_id: metrics_with_id_dict[metric] = metric_id try: new_metrics_with_id_dict = update_global_metrics_with_id_dict(metrics_with_id_dict) updated_global_metrics_with_id_dict_count += 1 if new_metrics_with_id_dict: metrics_with_id_dict = new_metrics_with_id_dict # Logs every metric when service restarts # logger.info('horizon.prometheus :: updated global_metrics_with_id_dict with %s metrics' % str(len(metrics_with_id_dict))) except Exception as err: logger.error('error :: horizon.prometheus :: update_global_metrics_with_id_dict failed - %s' % ( str(err))) if not metric_id: metrics_with_no_id_data_list.append(item) no_id_keys += 1 continue labels = {} try: labels_ = prom_to_vm_metrics_dict[metric]['labels'] labels = {} labels['__name__'] = prom_to_vm_metrics_dict[metric]['name'] # @added 20230626 - Bug #4968: horizon.prometheus - handle TSDB not being able to parse LABELS # Task #4962: Build and test skyline v4.0.0 # https://github.com/RedisTimeSeries/RedisTimeSeries/issues/1478 warn_on_labels = False for key in list(labels_.keys()): # @added 20230626 - Bug #4968: horizon.prometheus - handle TSDB not being able to parse LABELS # Task #4962: Build and test skyline v4.0.0 # https://github.com/RedisTimeSeries/RedisTimeSeries/issues/1478 if key == 'tags': if ',' in labels_[key]: value_str = labels_[key] new_value_str = value_str.replace(',', ' ') logger.warning('warning :: horizon.prometheus :: changing tags key value from %s to %s on %s' % ( str(value_str), new_value_str, str(labels))) labels_[key] = new_value_str warn_on_labels = True labels[key] = labels_[key] # @added 20230626 - Bug #4968: horizon.prometheus - handle TSDB not being able to parse LABELS # Task #4962: Build and test skyline v4.0.0 # https://github.com/RedisTimeSeries/RedisTimeSeries/issues/1478 if warn_on_labels: logger.warning('warning :: horizon.prometheus :: changing tags on %s' % ( str(labels_))) except Exception as err: err_str = 'labels err - %s' % str(err) redis_timeseries_errors.append([item, err_str]) continue labelled_metric_by_id = 'labelled_metrics.%s' % str(metric_id) redis_ts_key = labelled_metric_by_id item_list = list(item[1]) if isinstance(item[1][0], int): item_list = [item[1]] for ts_item in item_list: value = ts_item[1] timestamp = ts_item[0] timestamp_ms = int(timestamp) * 1000 inserted = 0 try: inserted = self.redis_conn_decoded.ts().add(redis_ts_key, timestamp_ms, value, retention_msecs=retention_msecs, labels=labels, chunk_size=128, duplicate_policy='first') labelled_metrics.append(redis_ts_key) except Exception as err: if "TSDB: Couldn't parse LABELS" in str(err): redis_timeseries_errors.append([item, {'labels': labels}, err]) else: redis_timeseries_errors.append(['insert', item, err]) continue if not inserted: redis_timeseries_failed_to_insert.append(item) else: inserted_count += 1 except Exception as err: redis_timeseries_errors.append(['process error', item, err]) end_insert = timer() logger.info('horizon.prometheus :: inserted %s entries into redistimeseries with %s errors, %s warnings in %.6f seconds' % ( str(inserted_count), str(len(redis_timeseries_errors)), str(len(redis_timeseries_failed_to_insert)), (end_insert - start_insert))) if redis_timeseries_errors: logger.error('error :: horizon.prometheus :: last 3 redistimeseries errors: %s' % ( str(redis_timeseries_errors[-3:]))) logger.info('horizon.prometheus :: dropped %s timeseries due to no metric id known' % ( str(no_id_keys))) # @added 20230123 - Task #2732: Prometheus to Skyline # Branch #4300: prometheus # Check metric if metric/s are in DB and not in # Redis for metrrics_manager to reset as active logger.info('horizon.prometheus :: updated_global_metrics_with_id_dict_count: %s' % ( str(updated_global_metrics_with_id_dict_count))) if metrics_with_id_found_in_db: logger.info('horizon.prometheus :: looked up %s metric ids from all_db_base_names_with_ids and found ids for %s' % ( str(len(metrics_with_unknown_id)), str(len(metrics_with_id_found_in_db)))) try: self.redis_conn_decoded.sadd('horizon.active_metrics_not_in_metric_names_with_ids', *set(list(metrics_with_id_found_in_db.keys()))) logger.info('horizon.prometheus :: set horizon.active_metrics_not_in_metric_names_with_ids with %s metric ids for metrics_manager to set as active' % str(len(metrics_with_id_found_in_db))) except Exception as err: logger.error('error :: horizon.prometheus :: sadd on horizon.active_metrics_not_in_metric_names_with_ids failed - %s' % ( str(err))) active_key = 'horizon.active_metrics_not_in_metric_names_with_ids.%s' % str(current_run_ts) try: self.redis_conn_decoded.hset(active_key, mapping=metrics_with_id_found_in_db) self.redis_conn_decoded.expire(active_key, 300) logger.info('horizon.prometheus :: set %s with 300 second expiry' % active_key) except Exception as err: logger.error('error :: horizon.prometheus :: hest on %s failed - %s' % ( active_key, str(err))) insert_results = { 'total': len(aggregated_metrics_data_list), 'inserted': inserted_count, 'errors': redis_timeseries_errors, 'errors_count': len(redis_timeseries_errors), 'failed_to_insert': redis_timeseries_failed_to_insert, 'failed_to_insert_count': len(redis_timeseries_failed_to_insert), 'execution_time': (end_insert - start_insert), 'dropped_no_id': no_id_keys, } del aggregated_metrics_data_list insert_results_key = 'horizon.prometheus_metrics.insert_results.redistimeseries.%s' % str(current_run_ts) expiry_time = 120 if len(redis_timeseries_errors) > 0: expiry_time = 300 if len(redis_timeseries_failed_to_insert) > 0: expiry_time = 300 for key in list(insert_results.keys()): try: self.redis_conn_decoded.hset(insert_results_key, key, str(insert_results[key])) self.redis_conn_decoded.expire(insert_results_key, expiry_time) # logger.info('horizon.prometheus :: added %s to Redis hash %s' % (key, insert_results_key)) except Exception as err: logger.error('error :: horizon.prometheus :: could not add %s to Redis hash %s - %s' % ( key, insert_results_key, str(err))) if labelled_metrics: try: self.redis_conn_decoded.sadd(unique_labelled_metrics_key, *set(labelled_metrics)) except Exception as err: logger.error('error :: horizon.prometheus :: could not sadd to Redis set %s - %s' % ( unique_labelled_metrics_key, str(err))) if metrics_with_no_id_data_list: metrics_with_no_id = [] logger.info('horizon.prometheus :: adding %s entries to Redis set %s for metrics without ids' % ( str(len(metrics_with_no_id_data_list)), metrics_with_no_id_key)) metrics_with_no_id_data_strings_list = [str(item) for item in metrics_with_no_id_data_list] try: self.redis_conn_decoded.sadd(metrics_with_no_id_key, *metrics_with_no_id_data_strings_list) except Exception as err: logger.error('error :: horizon.prometheus :: could not sadd %s metric data for metrics without ids to Redis set %s - %s' % ( str(len(metrics_with_no_id_data_list)), metrics_with_no_id_key, str(err))) metrics_with_no_id = [item[0] for item in metrics_with_no_id_data_list] try: self.redis_conn_decoded.sadd('panorama.horizon.metrics_with_no_id', *metrics_with_no_id) except Exception as err: logger.error('error :: horizon.prometheus :: could not sadd %s metrics without ids to Redis set panorama.horizon.metrics_with_no_id - %s' % ( str(len(metrics_with_no_id_data_list)), str(err))) del metrics_with_no_id del metrics_with_no_id_data_list send_graphite_metric(self, skyline_app, submitted_metrics_metric_name, metrics_submitted) dropped_metrics_metric_name = '%s.prometheus_metrics.dropped' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, dropped_metrics_metric_name, metrics_dropped) flux_prometheus_received_metric_name = '%s.flux_received' % skyline_app_graphite_namespace send_graphite_metric(self, skyline_app, flux_prometheus_received_metric_name, flux_prometheus_received) logger.info('horizon.prometheus :: flux received %s prometheus entries' % str(flux_prometheus_received)) logger.info('horizon.prometheus :: took %.6f seconds to process flux.prometheus_metrics' % ( (time() - now_ts))) continue except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: horizon.prometheus :: error in memray tracker block - %s' % ( str(err)))
[docs] def run(self): """ Called when process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os_remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s Prometheus' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os_remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) else: logger.info('bin/%s.d log management done' % skyline_app) if not VICTORIAMETRICS_ENABLED or not vm_url: logger.error('error :: %s :: starting submit_PrometheusMetrics but no victoriametrics store to write to is known' % skyline_app) logger.info('%s :: starting submit_PrometheusMetrics' % skyline_app) self.submit_PrometheusMetrics()