from __future__ import division
import logging
from time import time, sleep
from multiprocessing import Process
import os
from os import kill
from os import remove as os_remove
import traceback
from sys import version_info
import os.path
from ast import literal_eval
import datetime
# @added 20200107 - Task #3376: Enable vista and flux to deal with lower frequency data
from collections import Counter
# from redis import StrictRedis
import requests
import pandas as pd
# @added 20200225 - Bug #3476: vista - handle very large floats
# Handle very large floats
from numpy import format_float_positional
import settings
from skyline_functions import (
# @modified 20220726 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
# Moved send_graphite_metric
# send_graphite_metric,
# @added 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
get_redis_conn, get_redis_conn_decoded)
# @added 20220429 - Feature #4536: Handle Redis failure
from functions.flux.get_last_metric_data import get_last_metric_data
# @added 20220726 - Task #2732: Prometheus to Skyline
# Branch #4300: prometheus
from functions.graphite.send_graphite_metric import send_graphite_metric
parent_skyline_app = 'vista'
child_skyline_app = 'worker'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
# @added 20201020 - Feature #3796: FLUX_CHECK_LAST_TIMESTAMP
try:
FLUX_CHECK_LAST_TIMESTAMP = settings.FLUX_CHECK_LAST_TIMESTAMP
except:
FLUX_CHECK_LAST_TIMESTAMP = True
# @added 20210512 - Feature #4064: VERBOSE_LOGGING
try:
VERBOSE_LOGGING = settings.VISTA_VERBOSE_LOGGING
except:
VERBOSE_LOGGING = False
skyline_app_graphite_namespace = 'skyline.%s%s.worker' % (parent_skyline_app, SERVER_METRIC_PATH)
python_version = int(version_info[0])
this_host = str(os.uname()[1])
LOCAL_DEBUG = False
[docs]class Worker(Process):
"""
The worker process retrieves time series published to the Vista Fetcher
Redis set, vista.fetcher.metrics.json (and in future the
vista.fetcher.metrics.csv Redis set) and validates the latest data points
for each metric and submits it to Skyline Flux or directly to Graphite.
"""
def __init__(self, parent_pid):
super(Worker, self).__init__()
self.parent_pid = parent_pid
self.daemon = True
# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory.
"""
try:
kill(self.parent_pid, 0)
except:
# @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics
# Log warning
logger.warn('warning :: parent process is dead')
exit(0)
[docs] def run(self):
"""
Called when the process intializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
# In Vista the log management is handled be fetcher, the worker just
# waits for the fetcher to do the log managment
now = int(time())
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = int(time())
else:
now = log_wait_for + 1
logger.info('worker :: starting log management')
if os.path.isfile(skyline_app_loglock):
logger.error('error :: worker :: bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os_remove(skyline_app_loglock)
logger.info('worker :: log lock file removed')
except OSError:
logger.error('error :: worker :: failed to remove %s, continuing' % skyline_app_loglock)
pass
else:
logger.info('worker :: bin/%s.d log management done' % skyline_app)
logger.info('worker :: starting worker')
try:
VISTA_ENABLED = settings.VISTA_ENABLED
logger.info('worker :: VISTA_ENABLED is set to %s' % str(VISTA_ENABLED))
except:
VISTA_ENABLED = False
logger.info('worker :: warning :: VISTA_ENABLED is not declared in settings.py, defaults to False')
last_sent_to_graphite = int(time())
metrics_sent_to_flux = 0
# python-2.x and python3.x handle while 1 and while True differently
# while 1:
running = True
while running:
# Make sure Redis is up
redis_up = False
while not redis_up:
try:
redis_up = self.redis_conn.ping()
if LOCAL_DEBUG:
logger.info('worker :: redis is up')
except:
logger.error('worker :: cannot connect to redis at socket path %s' % (settings.REDIS_SOCKET_PATH))
sleep(2)
# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
metrics_data = []
redis_set = 'vista.fetcher.metrics.json'
try:
# Get a metric to validate from the Redis set
# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# metrics_data = self.redis_conn.smembers(redis_set)
metrics_data = self.redis_conn_decoded.smembers(redis_set)
if LOCAL_DEBUG:
logger.info('worker :: got redis set data - %s' % redis_set)
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: retrieving Redis set %s data' % str(redis_set))
if not metrics_data:
if LOCAL_DEBUG:
logger.info('worker :: no data from Redis set %s' % str(redis_set))
sleep(5)
for str_metric_data in metrics_data:
# @added 20200903 - Feature #3728: metric - vista.fetcher.metrics.json set count
time_now = int(time())
if (time_now - last_sent_to_graphite) >= 60:
break
delete_set_record = False
remote_host_type = None
try:
# @modified 20191111 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Rather using get_redis_conn_decoded
# if python_version == 3:
# str_metric_data = str_metric_data.decode('UTF-8')
metric_data = literal_eval(str_metric_data)
remote_host_type = str(metric_data[0]['remote_host_type'])
if LOCAL_DEBUG:
logger.info('worker :: got data from Redis set for remote_host_type %s' % str(remote_host_type))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to determine remote_host_type from %s' % str(str_metric_data))
delete_set_record = True
if not delete_set_record:
try:
remote_target = str(metric_data[0]['remote_target'])
if LOCAL_DEBUG:
logger.info('worker :: got data from Redis set for target %s' % str(remote_target))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to determine target from %s' % str(str_metric_data))
delete_set_record = True
metric = None
if not delete_set_record:
try:
metric = str(metric_data[0]['metric'])
if LOCAL_DEBUG:
logger.info('worker :: got data from Redis set for metric %s' % str(metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to determine metric from %s' % str(str_metric_data))
delete_set_record = True
namespace_prefix = ''
if not delete_set_record:
try:
namespace_prefix = str(metric_data[0]['namespace_prefix'])
namespace_prefix = '%s.' % namespace_prefix
if not namespace_prefix:
namespace_prefix = ''
if LOCAL_DEBUG:
logger.info('worker :: got data from Redis set for namespace_prefix %s' % str(namespace_prefix))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to determine namespace_prefix from %s' % str(str_metric_data))
delete_set_record = True
have_data = False
if not delete_set_record:
last_flux_metric_data = None
# @added 20220429 - Feature #4536: Handle Redis failure
# Swap to using a Redis hash instead of the
# flux.last.<metric> keys
last_flux_timestamp = None
use_old_timestamp_keys = True
redis_last_metric_data_dict = {}
try:
redis_last_metric_data_dict = get_last_metric_data(skyline_app, metric)
except Exception as err:
logger.error('error :: worker :: get_last_metric_data failed - %s' % (
err))
if redis_last_metric_data_dict:
try:
last_flux_timestamp = redis_last_metric_data_dict['timestamp']
use_old_timestamp_keys = False
except KeyError:
last_flux_timestamp = None
except Exception as err:
logger.error('error :: worker :: failed to get timestamp from - %s - %s' % (
str(redis_last_metric_data_dict), err))
last_flux_timestamp = None
# @modified 20220429 - Feature #4536: Handle Redis failure
if use_old_timestamp_keys:
cache_key = 'flux.last.%s' % (metric)
try:
if python_version == 3:
redis_last_flux_metric_data = self.redis_conn.get(cache_key).decode('UTF-8')
else:
redis_last_flux_metric_data = self.redis_conn.get(cache_key)
redis_last_flux_metric_data_str = str(redis_last_flux_metric_data)
last_flux_metric_data = literal_eval(redis_last_flux_metric_data_str)
if LOCAL_DEBUG:
logger.info('worker :: got last_flux_metric_data from Redis')
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: retrieving Redis key %s data' % str(cache_key))
last_flux_metric_data = False
last_flux_timestamp = None
if last_flux_metric_data:
try:
last_flux_timestamp = int(last_flux_metric_data[0])
if LOCAL_DEBUG:
logger.info('worker :: got last_flux_timestamp - %s' % str(last_flux_timestamp))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed determining last_flux_timestamp')
last_flux_timestamp = False
# Determine the timestamp of the current minute to apply
# VISTA_DO_NOT_SUBMIT_CURRENT_MINUTE
time_now = int(time())
# current_minute = datetime.datetime.utcfromtimestamp(time_now).strftime('%Y-%m-%d %H:%M')
current_minute_hour = int(datetime.datetime.utcfromtimestamp(time_now).strftime('%H'))
current_minute_minute = int(datetime.datetime.utcfromtimestamp(time_now).strftime('%M'))
current_datetime = datetime.datetime.utcfromtimestamp(time_now).replace(hour=current_minute_hour, minute=current_minute_minute, second=0, microsecond=0)
current_minute_timestamp_start = int(current_datetime.strftime('%s'))
datapoint = None
last_timestamp_with_data = None
timeseries = []
# @added 20200107 - Task #3376: Enable vista and flux to deal with lower frequency data
metric_resolution = 60
metric_resolution_determined = False
try:
if python_version == 3:
datapoints_str = literal_eval(metric_data[0]['datapoints'])
metric_datapoints = literal_eval(datapoints_str)
else:
# metric_datapoints = metric_data[0]['datapoints']
datapoints_str = literal_eval(metric_data[0]['datapoints'])
metric_datapoints = literal_eval(datapoints_str)
# for value, timestamp in metric_data[0]['datapoints']:
if LOCAL_DEBUG:
len_metric_datapoints = len(metric_datapoints)
logger.info('worker :: got %s metric_datapoints - %s' % (
str(len_metric_datapoints),
str(metric_datapoints)))
# @added 20200107 - Task #3376: Enable vista and flux to deal with lower frequency data
# Determine resolution
resolution_timestamps = []
for metric_datapoint in metric_datapoints:
timestamp = int(metric_datapoint[0])
resolution_timestamps.append(timestamp)
timestamp_resolutions = []
if resolution_timestamps:
last_timestamp = None
for timestamp in resolution_timestamps:
if last_timestamp:
resolution = timestamp - last_timestamp
timestamp_resolutions.append(resolution)
last_timestamp = timestamp
else:
last_timestamp = timestamp
if timestamp_resolutions:
try:
timestamp_resolutions_count = Counter(timestamp_resolutions)
ordered_timestamp_resolutions_count = timestamp_resolutions_count.most_common()
metric_resolution = int(ordered_timestamp_resolutions_count[0][0])
if metric_resolution > 0:
metric_resolution_determined = True
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to determine metric_resolution from %s' % (
str(metric_data)))
if metric_resolution_determined:
cache_key = 'vista.last.resolution.%s' % metric
try:
# Update Redis key
self.redis_conn.setex(cache_key, 3600, metric_resolution)
except:
logger.error(traceback.format_exc())
logger.error('error :: fetcher :: failed to set Redis key - %s' % (
cache_key))
for metric_datapoint in metric_datapoints:
# @20191010 - Branch #3140: vista
# fetcher passes through preformatted data points that
# are in the same format/order for both graphite and
# prometheus
# if remote_host_type == 'graphite':
# value = float(metric_datapoint[0])
# timestamp = int(metric_datapoint[1])
# if remote_host_type == 'prometheus':
# value = float(metric_datapoint[1])
# timestamp = int(metric_datapoint[0])
timestamp = int(metric_datapoint[0])
value = float(metric_datapoint[1])
append_to_timeseries = False
if last_flux_timestamp:
if int(timestamp) > last_flux_timestamp:
# timeseries.append([timestamp, value])
append_to_timeseries = True
else:
# timeseries.append([timestamp, value])
append_to_timeseries = True
# Here if the timestamp of the data point falls
# within the current minute, it is discarded and not
# sent to flux, to ensure that high frequency metrics
# can have their minutely bins fully populated before
# they are submitted to Graphite
if settings.VISTA_DO_NOT_SUBMIT_CURRENT_MINUTE:
if int(timestamp) >= current_minute_timestamp_start:
append_to_timeseries = False
if append_to_timeseries:
timeseries.append([timestamp, value])
last_timestamp_with_data = 0
for timestamp, value in timeseries[::-1]:
has_value = False
if value == 0.0:
has_value = True
if value:
has_value = True
if has_value:
last_timestamp_with_data = int(timestamp)
datapoint = value
break
if last_timestamp_with_data:
have_data = True
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to determine datapoints from %s' % (
str(metric_data)))
delete_set_record = True
if not timeseries:
if VERBOSE_LOGGING:
logger.info('worker :: after processing, there were no valid data points in %s' % (
str(metric_data)))
delete_set_record = True
if not have_data and timeseries:
logger.error('error :: worker :: failed to determine last_timestamp_with_data from %s' % (
str(metric_data)))
delete_set_record = True
if delete_set_record:
try:
redis_set = 'vista.fetcher.metrics.json'
self.redis_conn.srem(redis_set, str_metric_data)
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to delete data from Redis set %s, data - %s' % (
str(redis_set), str(str_metric_data)))
continue
if not metric:
continue
valid_data = True
if last_flux_timestamp and last_timestamp_with_data:
if int(last_timestamp_with_data) <= last_flux_timestamp:
valid_data = False
if not valid_data:
redis_set = 'vista.fetcher.metrics.json'
logger.info('worker :: no valid data in fetched data removing from Redis set %s - data - %s' % (
redis_set, str(str_metric_data)))
try:
self.redis_conn.srem(redis_set, str_metric_data)
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to delete data from Redis set %s, data - %s' % (
redis_set, str(str_metric_data)))
continue
if valid_data:
flux_host = 'http://%s:%s' % (settings.FLUX_IP, settings.FLUX_PORT)
# Resample
resample_at = None
if resample_at == 'none' or resample_at == '0Min':
resample_at = False
if resample_at == 'None' or resample_at == '0min':
resample_at = False
if resample_at is None or resample_at == '0' or resample_at == 0:
resample_at = False
if resample_at:
try:
df = pd.DataFrame(timeseries)
df.columns = ['timestamp', 'value']
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s', origin='unix')
df = df.set_index('timestamp')
resampled_df = df.resample(resample_at).sum()
resampled_timeseries = []
for index, row in resampled_df.iterrows():
timestamp = int(index.strftime('%s'))
resampled_timeseries.append([timestamp, row[0]])
timeseries = resampled_timeseries
timeseries_length = len(timeseries)
logger.info('worker :: time series resampled at %s resulting in %s data points to send to Graphite' % (
str(resample_at), str(timeseries_length)))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to resample time series at %s for %s with time series %s' % (
str(resample_at), str(metric), str(timeseries)))
for timestamp, value in timeseries:
# @added 20200225 - Bug #3476: vista - handle very large floats
# Handle very large floats
# So that flux is never passed a value=1.00243039089e+11
if 'e' in str(value):
datapoint = format_float_positional(value)
else:
datapoint = float(value)
flux_url = '%s/metric_data?metric=%s&value=%s×tamp=%s&key=%s' % (
flux_host, metric, str(datapoint),
str(timestamp),
settings.FLUX_SELF_API_KEY)
success = False
try:
response = requests.get(flux_url)
if response.status_code == 200:
success = True
elif response.status_code == 204:
success = True
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to request %s - %s' % (str(flux_url), err))
if not success:
logger.warning('warning :: worker :: failed to submit data to flux')
logger.debug('debug :: timeseries - %s' % str(timeseries))
if success:
metrics_sent_to_flux += 1
redis_set = 'vista.fetcher.metrics.json'
# @added 20191011 - Task #3258: Reduce vista logging
timeseries_length = len(timeseries)
# @modified 20191011 - Task #3258: Reduce vista logging
# logger.info('worker :: data submitted to flux OK, removing data from Redis set %s' % (
# redis_set))
if VERBOSE_LOGGING:
logger.info('worker :: %s data points submitted to flux OK for %s' % (
str(timeseries_length), metric))
try:
self.redis_conn.srem(redis_set, str_metric_data)
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to delete data from Redis set %s, data - %s' % (
redis_set, str(str_metric_data)))
redis_set = 'vista.fetcher.unique_metrics'
try:
self.redis_conn.sadd(redis_set, remote_target)
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to add %s to Redis set %s' % (
remote_target, redis_set))
# @added 20201020 - Feature #3796: FLUX_CHECK_LAST_TIMESTAMP
# Add internal transformed metric name to a Redis for
# flux to consume and determine if the flux.last Redis
# keys are to be set for a vista metrics even if flux
# FLUX_CHECK_LAST_TIMESTAMP is False, flux still uses
# flux.last Redis keys for vista.metrics
redis_set = 'vista.metrics'
try:
self.redis_conn.sadd(redis_set, str(metric))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to add %s to Redis set %s' % (
metric, redis_set))
time_now = int(time())
if (time_now - last_sent_to_graphite) >= 60:
logger.info('worker :: metrics sent_to_flux in last 60 seconds - %s' % str(metrics_sent_to_flux))
send_metric_name = '%s.metrics_sent_to_flux' % skyline_app_graphite_namespace
try:
send_graphite_metric(self, parent_skyline_app, send_metric_name, str(metrics_sent_to_flux))
last_sent_to_graphite = int(time())
metrics_sent_to_flux = 0
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to send_graphite_metric %s with %s' % (
send_metric_name, str(metrics_sent_to_flux)))
# @added 20200903 - Feature #3728: metric - vista.fetcher.metrics.json set count
# This metric should mostly always be 0, if it starts to
# increment, vista needs to be stopped, the Redis set deleted
# and vista needs to be restarted. Unfortunately this edge case
# cannot be automatically fixed as when it present the
# delete_set_record was already implemented and logging but the
# records were not deleted. This edge case has only been seen
# once on one instance.
redis_set = 'vista.fetcher.metrics.json'
metrics_data_list = []
try:
metrics_data_list = list(self.redis_conn_decoded.smembers(redis_set))
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: retrieving Redis set %s data' % str(redis_set))
logger.info('worker :: vista.fetcher.metrics.json Redis set count - %s' % str(len(metrics_data_list)))
send_metric_name = '%s.vista.fetcher.metrics.json' % skyline_app_graphite_namespace
try:
send_graphite_metric(self, parent_skyline_app, send_metric_name, str(len(metrics_data_list)))
last_sent_to_graphite = int(time())
except:
logger.error(traceback.format_exc())
logger.error('error :: worker :: failed to send_graphite_metric %s with %s' % (
send_metric_name, str(metrics_sent_to_flux)))
# @added 20220329 - Feature #4018: thunder - skyline.errors
# Report app up
try:
self.redis_conn.setex('vista.worker', 120, time_now)
except Exception as err:
logger.error(traceback.format_exc())
logger.error('error :: worker :: could not update the Redis vista.worker key - %s' % err)