Source code for vista.worker

from __future__ import division
import logging
from time import time, sleep
from multiprocessing import Process
import os
from os import kill
from os import remove as os_remove
import traceback
from sys import version_info
import os.path
from ast import literal_eval
import datetime

from redis import StrictRedis
import requests
import pandas as pd

import settings
from skyline_functions import send_graphite_metric

parent_skyline_app = 'vista'
child_skyline_app = 'worker'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s.worker' % (parent_skyline_app, SERVER_METRIC_PATH)

python_version = int(version_info[0])

this_host = str(os.uname()[1])

LOCAL_DEBUG = True


[docs]class Worker(Process): """ The worker process retrieves time series published to the Vista Fetcher Redis set, vista.fetcher.metrics.json (and in future the vista.fetcher.metrics.csv Redis set) and validates the latest data points for each metric and submits it to Skyline Flux or directly to Graphite. """ def __init__(self, parent_pid): super(Worker, self).__init__() self.parent_pid = parent_pid self.daemon = True if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs] def check_if_parent_is_alive(self): """ Self explanatory. """ try: kill(self.parent_pid, 0) except: exit(0)
[docs] def run(self): """ Called when the process intializes. """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log # In Vista the log management is handled be fetcher, the worker just # waits for the fetcher to do the log managment now = int(time()) log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = int(time()) else: now = log_wait_for + 1 logger.info('worker :: starting log management') if os.path.isfile(skyline_app_loglock): logger.error('error :: worker :: bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os_remove(skyline_app_loglock) logger.info('worker :: log lock file removed') except OSError: logger.error('error :: worker :: failed to remove %s, continuing' % skyline_app_loglock) pass else: logger.info('worker :: bin/%s.d log management done' % skyline_app) logger.info('worker :: starting worker') try: VISTA_ENABLED = settings.VISTA_ENABLED logger.info('worker :: VISTA_ENABLED is set to %s' % str(VISTA_ENABLED)) except: VISTA_ENABLED = False logger.info('worker :: warning :: VISTA_ENABLED is not declared in settings.py, defaults to False') last_sent_to_graphite = int(time()) metrics_sent_to_flux = 0 # python-2.x and python3.x handle while 1 and while True differently # while 1: running = True while running: # Make sure Redis is up redis_up = False while not redis_up: try: redis_up = self.redis_conn.ping() if LOCAL_DEBUG: logger.info('worker :: redis is up') except: logger.error('worker :: cannot connect to redis at socket path %s' % (settings.REDIS_SOCKET_PATH)) sleep(2) if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) metrics_data = [] redis_set = 'vista.fetcher.metrics.json' try: # Get a metric to validate from the Redis set metrics_data = self.redis_conn.smembers(redis_set) if LOCAL_DEBUG: logger.info('worker :: got redis set data - %s' % redis_set) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: retrieving Redis set %s data' % str(redis_set)) if not metrics_data: if LOCAL_DEBUG: logger.info('worker :: no data from Redis set %s' % str(redis_set)) sleep(5) for str_metric_data in metrics_data: delete_set_record = False remote_host_type = None try: if python_version == 3: str_metric_data = str_metric_data.decode('UTF-8') metric_data = literal_eval(str_metric_data) remote_host_type = str(metric_data[0]['remote_host_type']) if LOCAL_DEBUG: logger.info('worker :: got data from Redis set for remote_host_type %s' % str(remote_host_type)) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to determine remote_host_type from %s' % str(str_metric_data)) delete_set_record = True if not delete_set_record: try: remote_target = str(metric_data[0]['remote_target']) if LOCAL_DEBUG: logger.info('worker :: got data from Redis set for target %s' % str(remote_target)) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to determine target from %s' % str(str_metric_data)) delete_set_record = True metric = None if not delete_set_record: try: metric = str(metric_data[0]['metric']) if LOCAL_DEBUG: logger.info('worker :: got data from Redis set for metric %s' % str(metric)) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to determine metric from %s' % str(str_metric_data)) delete_set_record = True namespace_prefix = '' if not delete_set_record: try: namespace_prefix = str(metric_data[0]['namespace_prefix']) namespace_prefix = '%s.' % namespace_prefix if not namespace_prefix: namespace_prefix = '' if LOCAL_DEBUG: logger.info('worker :: got data from Redis set for namespace_prefix %s' % str(namespace_prefix)) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to determine namespace_prefix from %s' % str(str_metric_data)) delete_set_record = True have_data = False if not delete_set_record: last_flux_metric_data = None cache_key = 'flux.last.%s' % (metric) try: if python_version == 3: redis_last_flux_metric_data = self.redis_conn.get(cache_key).decode('UTF-8') else: redis_last_flux_metric_data = self.redis_conn.get(cache_key) redis_last_flux_metric_data = redis_last_flux_metric_data last_flux_metric_data = literal_eval(redis_last_flux_metric_data) if LOCAL_DEBUG: logger.info('worker :: got last_flux_metric_data from Redis') except: logger.error(traceback.format_exc()) logger.error('error :: worker :: retrieving Redis key %s data' % str(cache_key)) last_flux_metric_data = False last_flux_timestamp = None if last_flux_metric_data: try: last_flux_timestamp = int(last_flux_metric_data[0]) if LOCAL_DEBUG: logger.info('worker :: got last_flux_timestamp - %s' % str(last_flux_timestamp)) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed determining last_flux_timestamp') last_flux_timestamp = False # Determine the timestamp of the current minute to apply # VISTA_DO_NOT_SUBMIT_CURRENT_MINUTE time_now = int(time()) # current_minute = datetime.datetime.utcfromtimestamp(time_now).strftime('%Y-%m-%d %H:%M') current_minute_hour = int(datetime.datetime.utcfromtimestamp(time_now).strftime('%H')) current_minute_minute = int(datetime.datetime.utcfromtimestamp(time_now).strftime('%M')) current_datetime = datetime.datetime.utcfromtimestamp(time_now).replace(hour=current_minute_hour, minute=current_minute_minute, second=0, microsecond=0) current_minute_timestamp_start = int(current_datetime.strftime('%s')) datapoint = None last_timestamp_with_data = None timeseries = [] try: if python_version == 3: datapoints_str = literal_eval(metric_data[0]['datapoints']) metric_datapoints = literal_eval(datapoints_str) else: # metric_datapoints = metric_data[0]['datapoints'] datapoints_str = literal_eval(metric_data[0]['datapoints']) metric_datapoints = literal_eval(datapoints_str) # for value, timestamp in metric_data[0]['datapoints']: if LOCAL_DEBUG: len_metric_datapoints = len(metric_datapoints) logger.info('worker :: got %s metric_datapoints - %s' % ( str(len_metric_datapoints), str(metric_datapoints))) for metric_datapoint in metric_datapoints: if remote_host_type == 'graphite': value = float(metric_datapoint[0]) timestamp = int(metric_datapoint[1]) if remote_host_type == 'prometheus': value = float(metric_datapoint[1]) timestamp = int(metric_datapoint[0]) append_to_timeseries = False if last_flux_timestamp: if int(timestamp) > last_flux_timestamp: # timeseries.append([timestamp, value]) append_to_timeseries = True else: # timeseries.append([timestamp, value]) append_to_timeseries = True # Here if the timestamp of the data point falls # within the current minute, it is discarded and not # sent to flux, to ensure that high frequency metrics # can have their minutely bins fully populated before # they are submitted to Graphite if settings.VISTA_DO_NOT_SUBMIT_CURRENT_MINUTE: if int(timestamp) >= current_minute_timestamp_start: append_to_timeseries = False if append_to_timeseries: timeseries.append([timestamp, value]) last_timestamp_with_data = 0 for timestamp, value in timeseries[::-1]: has_value = False if value == 0.0: has_value = True if value: has_value = True if has_value: last_timestamp_with_data = int(timestamp) datapoint = value break if last_timestamp_with_data: have_data = True except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to determine datapoints from %s' % ( str(metric_data))) delete_set_record = True if not timeseries: logger.info('worker :: after processing, there were no valid data points in %s' % ( str(metric_data))) delete_set_record = True if not have_data and timeseries: logger.error('error :: worker :: failed to determine last_timestamp_with_data from %s' % ( str(metric_data))) delete_set_record = True if delete_set_record: try: redis_set = 'vista.fetcher.metrics.json' self.redis_conn.srem(redis_set, str_metric_data) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to delete data from Redis set %s, data - ' % ( str(redis_set), str(str_metric_data))) continue if not metric: continue valid_data = True if last_flux_timestamp and last_timestamp_with_data: if int(last_timestamp_with_data) <= last_flux_timestamp: valid_data = False if not valid_data: redis_set = 'vista.fetcher.metrics.json' logger.info('worker :: no valid data in fetched data removing from Redis set %s - data - %s' % ( redis_set, str(str_metric_data))) try: self.redis_conn.srem(redis_set, str_metric_data) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to delete data from Redis set %s, data - %s' % ( redis_set, str(str_metric_data))) continue if valid_data: flux_host = 'http://%s:%s' % (settings.FLUX_IP, settings.FLUX_PORT) # Resample resample_at = None if resample_at == 'none' or resample_at == '0Min': resample_at = False if resample_at == 'None' or resample_at == '0min': resample_at = False if resample_at is None or resample_at == '0' or resample_at == 0: resample_at = False if resample_at: try: df = pd.DataFrame(timeseries) df.columns = ['timestamp', 'value'] df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s', origin='unix') df = df.set_index('timestamp') resampled_df = df.resample(resample_at).sum() resampled_timeseries = [] for index, row in resampled_df.iterrows(): timestamp = int(index.strftime('%s')) resampled_timeseries.append([timestamp, row[0]]) timeseries = resampled_timeseries timeseries_length = len(timeseries) logger.info('worker :: time series resampled at %s resulting in %s data points to send to Graphite' % ( str(resample_at), str(timeseries_length))) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to resample time series at %s for %s with time series %s' % ( str(resample_at), str(metric), str(timeseries))) for timestamp, value in timeseries: flux_url = '%s/metric_data?metric=%s&value=%s&timestamp=%s&key=%s' % ( flux_host, metric, str(datapoint), str(timestamp), settings.FLUX_SELF_API_KEY) success = False try: response = requests.get(flux_url) if response.status_code == 200: success = True elif response.status_code == 204: success = True except: logger.info(traceback.format_exc()) logger.error('error :: worker :: failed to request %s' % str(flux_url)) if not success: logger.error('error :: worker :: http status code - %s, reason - %s' % ( str(response.status_code), str(response.reason))) if success: metrics_sent_to_flux += 1 redis_set = 'vista.fetcher.metrics.json' logger.info('worker :: data submitted to flux OK, removing data from Redis set %s' % ( redis_set)) try: self.redis_conn.srem(redis_set, str_metric_data) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to delete data from Redis set %s, data - %s' % ( redis_set, str(str_metric_data))) redis_set = 'vista.fetcher.unique_metrics' try: self.redis_conn.sadd(redis_set, remote_target) except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to add %s to Redis set %s' % ( remote_target, redis_set)) time_now = int(time()) if (time_now - last_sent_to_graphite) >= 60: logger.info('worker :: metrics sent_to_flux in last 60 seconds - %s' % str(metrics_sent_to_flux)) send_metric_name = '%s.metrics_sent_to_flux' % skyline_app_graphite_namespace try: send_graphite_metric(parent_skyline_app, send_metric_name, str(metrics_sent_to_flux)) last_sent_to_graphite = int(time()) metrics_sent_to_flux = 0 except: logger.error(traceback.format_exc()) logger.error('error :: worker :: failed to send_graphite_metric %s with %s' % ( send_metric_name, str(metrics_sent_to_flux)))