Source code for vista.fetcher

from __future__ import division
import logging
from time import time, sleep
from datetime import datetime, timedelta
from threading import Thread
from multiprocessing import Process
import os
from os import kill, getpid
import traceback
import re
from sys import version_info
import os.path
from ast import literal_eval
try:
    from urllib.parse import quote
except:
    from urllib import quote

from redis import StrictRedis
import requests

import settings
python_version = int(version_info[0])
from skyline_functions import (
    send_graphite_metric, filesafe_metricname)

parent_skyline_app = 'vista'
child_skyline_app = 'fetcher'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile

try:
    SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
    if SERVER_METRIC_PATH == '.':
        SERVER_METRIC_PATH = ''
except:
    SERVER_METRIC_PATH = ''

skyline_app_graphite_namespace = 'skyline.%s%s.fetcher' % (
    parent_skyline_app, SERVER_METRIC_PATH)

python_version = int(version_info[0])

this_host = str(os.uname()[1])

try:
    VISTA_ENABLED = settings.VISTA_ENABLED
except:
    VISTA_ENABLED = False

USE_FLUX = False
LOCAL_DEBUG = True


[docs]class Fetcher(Thread): """ The fetcher thread asynchronisly retrieves the latest data points for metrics from multiple endpoints using asyncio and aiohttp and submits the data to the Redis set, vista.fetcher.metrics.json for the worker to process. """ def __init__(self, parent_pid): super(Fetcher, self).__init__() self.parent_pid = parent_pid self.daemon = True if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
[docs] def check_if_parent_is_alive(self): """ Self explanatory. """ try: kill(self.parent_pid, 0) except: exit(0)
[docs] def fetch_process(self, i, metrics_to_fetch): fetch_process_start = time() logger.info('fetcher :: fetch_process started') metrics_to_fetch_count = len(metrics_to_fetch) logger.info('fetcher :: fetch_process to fetch %s metrics' % str(metrics_to_fetch_count)) if LOCAL_DEBUG: logger.info('fetcher :: metrics_to_fetch - %s' % str(metrics_to_fetch)) for remote_host_type, frequency, remote_target, graphite_target, metric, url, namespace_prefix, api_key, token, user, password in metrics_to_fetch: success = False try: logger.info('fetcher :: getting data from %s' % str(url)) response = requests.get(url) if response.status_code == 200: success = True except: logger.info(traceback.format_exc()) logger.error('error :: fetcher :: http status code - %s, reason - %s' % ( str(response.status_code), str(response.reason))) logger.error('error :: fetcher :: failed to get data from %s' % str(url)) if not success: continue datapoints = None try: js = response.json() if remote_host_type == 'graphite': datapoints = js[0]['datapoints'] if remote_host_type == 'prometheus': datapoints = js['data']['result'][0]['values'] datapoints_fetched = len(datapoints) logger.info('fetcher :: retrieved %s data points from %s' % ( str(datapoints_fetched), str(url))) except: logger.info(traceback.format_exc()) logger.error('error :: fetcher :: failed to get data from %s' % str(url)) # Example # datapoints[0] # [7.3, 1556817000] # Add each data point and timestamp to the timeseries list so # they can be sent to Graphite if not datapoints: logger.info('fetcher :: failed to get any data from %s' % str(url)) continue # Order the time series by timestamp as the tuple can shift # order resulting in more recent data being added before older # data datapoints.sort() # However check if a metric is known to Flux and if so do not # use all resolutions just from the last.flux known timestamp # for he metric last_flux_timestamp = None redis_last_flux_metric_data = None try: cache_key = 'flux.last.%s' % metric if python_version == 3: redis_last_flux_metric_data = self.redis_conn.get(cache_key).decode('utf-8') else: redis_last_flux_metric_data = self.redis_conn.get(cache_key) if LOCAL_DEBUG: if redis_last_flux_metric_data: logger.info('fetcher :: Redis key %s is present' % str(cache_key)) else: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) except AttributeError: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) last_flux_timestamp = False redis_last_flux_metric_data = False except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: retrieving Redis key %s data - %s' % ( str(cache_key), str(e))) redis_last_flux_metric_data = False if redis_last_flux_metric_data: try: last_flux_metric_data = literal_eval(redis_last_flux_metric_data) last_flux_timestamp = int(last_flux_metric_data[0]) if LOCAL_DEBUG: if last_flux_timestamp: logger.info('fetcher :: Redis key %s last_flux_timestamp %s' % (str(cache_key), str(last_flux_timestamp))) else: logger.info('fetcher :: Redis key %s last_flux_timestamp unknown' % (str(cache_key))) except: logger.error(traceback.format_exc()) logger.error('error :: fetch :: failed determining last_flux_timestamp') last_flux_timestamp = False value = None timestamp = None datapoints_added_to_timeseries = 0 datapoints_already_populated = 0 datapoints_with_no_value = 0 timeseries = [] for datapoint in datapoints: try: if remote_host_type == 'graphite': raw_value = datapoint[0] if raw_value is None: continue value = float(datapoint[0]) timestamp = int(datapoint[1]) if remote_host_type == 'prometheus': timestamp = int(datapoint[0]) # value = float(datapoint[1]) try: value = float(datapoint[1]) except: continue submit_data = True if last_flux_timestamp: if timestamp <= last_flux_timestamp: submit_data = False datapoints_already_populated += 1 if submit_data: new_datapoint = [timestamp, value] timeseries.append(new_datapoint) datapoints_added_to_timeseries += 1 # nosec to exclude from bandit tests except: # nosec datapoints_with_no_value += 1 continue if not timeseries: logger.info('fetcher :: no data in the timeseries list for the time series for %s' % metric) continue # Order the time series by timestamp as the tuple can shift # order resulting in more recent data being added before older # data timeseries.sort() timeseries_length = len(timeseries) logger.info('fetcher :: %s data points to add to vista.fetcher.metrics.json for %s' % ( str(timeseries_length), metric)) payload = None timeseries_str = '"%s"' % timeseries try: payload = [{ 'remote_host_type': remote_host_type, 'remote_target': remote_target, 'graphite_target': graphite_target, 'metric': metric, 'namespace_prefix': namespace_prefix, 'key': settings.FLUX_SELF_API_KEY, 'token': token, 'user': user, 'password': password, 'datapoints': timeseries_str }] except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not build the payload json') redis_set = 'vista.fetcher.metrics.json' data = str(payload) try: self.redis_conn.sadd(redis_set, data) if LOCAL_DEBUG: logger.info('fetcher :: added data from %s to Redis set %s' % ( str(url), redis_set)) except: logger.info(traceback.format_exc()) logger.error('error :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) redis_set = 'vista.fetcher.metrics.fetched' time_now = int(time()) data = [str(remote_target), time_now] try: self.redis_conn.sadd(redis_set, str(data)) except: logger.info(traceback.format_exc()) logger.error('error :: failed to add %s to Redis set %s' % ( str(data), str(redis_set))) fetch_process_end = time() fetch_time = fetch_process_end - fetch_process_start logger.info('fetcher :: metrics fetched in %s seconds' % str(fetch_time)) return
[docs] def run(self): """ - Called when the process intializes. - Determine if Redis is up and discover the number of `unique metrics`. - Wait for the processes to finish. - Send skyline.vista metrics to `GRAPHITE_HOST` """ # Log management to prevent overwriting # Allow the bin/<skyline_app>.d to manage the log if os.path.isfile(skyline_app_logwait): try: os.remove(skyline_app_logwait) except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_logwait) pass now = time() log_wait_for = now + 5 while now < log_wait_for: if os.path.isfile(skyline_app_loglock): sleep(.1) now = time() else: now = log_wait_for + 1 logger.info('starting %s run' % skyline_app) if os.path.isfile(skyline_app_loglock): logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app) try: os.remove(skyline_app_loglock) logger.info('log lock file removed') except OSError: logger.error('error - failed to remove %s, continuing' % skyline_app_loglock) pass else: logger.info('bin/%s.d log management done' % skyline_app) try: SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME if SERVER_METRIC_PATH == '.': SERVER_METRIC_PATH = '' logger.info('SERVER_METRIC_PATH is set from settings.py to %s' % str(SERVER_METRIC_PATH)) except: SERVER_METRIC_PATH = '' logger.info('warning :: SERVER_METRIC_PATH is not declared in settings.py, defaults to \'\'') logger.info('skyline_app_graphite_namespace is set to %s' % str(skyline_app_graphite_namespace)) try: VISTA_ENABLED = settings.VISTA_ENABLED logger.info('VISTA_ENABLED is set to %s' % str(VISTA_ENABLED)) except: VISTA_ENABLED = True logger.info('warning :: VISTA_ENABLED is not declared in settings.py, defaults to True') try: ASYNCIO_LIMIT = settings.VISTA_ASYNCIO_FETCHER_LIMIT logger.info('fetcher :: settings.VISTA_ASYNCIO_FETCHER_LIMIT is set to %s' % str(ASYNCIO_LIMIT)) except: ASYNCIO_LIMIT = 2 logger.info('fetcher :: warning :: VISTA_ASYNCIO_FETCHER_LIMIT is not declared in settings.py, defaults to 2') running = True while running: begin_fetcher_run = int(time()) # Make sure Redis is up redis_up = False while not redis_up: try: redis_up = self.redis_conn.ping() except: logger.info(traceback.format_exc()) logger.error('error :: fetcher :: cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) sleep(2) try: if settings.REDIS_PASSWORD: self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH) else: self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH) continue # Report app up try: self.redis_conn.setex(skyline_app, 120, begin_fetcher_run) except: logger.error('error :: fetcher :: could not update the Redis %s key' % skyline_app) logger.info(traceback.format_exc()) # Known fetcher metrics that are known to have already been fetched, # metrics in this set are named as follows namespace_prefix.metric vista_fetcher_unique_metrics = [] redis_set = 'vista.fetcher.unique_metrics' try: vista_fetcher_unique_metrics = list(self.redis_conn.smembers(redis_set)) except: logger.error('error :: fetcher :: could not determine vista_fetcher_unique_metrics from the Redis set %s' % redis_set) vista_fetcher_unique_metrics = [] vista_unique_metrics = [] if vista_fetcher_unique_metrics: for metric in vista_fetcher_unique_metrics: metric_str = metric.decode('utf-8') vista_unique_metrics.append(metric_str) # Determine metrics to fetch metrics_to_fetch = [] fetcher_sent_to_flux = 0 if LOCAL_DEBUG: try: number_of_metrics = len(settings.VISTA_FETCH_METRICS) logger.info('fetcher :: %s metrics to retrieve' % str(number_of_metrics)) except: pass end_timestamp = int(time()) start_timestamp = end_timestamp - 300 # Refer to settings.VISTA_FETCH_METRICS tuple to determine the # format of the fetch_tuple # for target, graphite_target, fetch_tuple in metrics: for remote_host, remote_host_type, frequency, remote_target, graphite_target, uri, namespace_prefix, api_key, token, user, password, populate_at_resolutions in settings.VISTA_FETCH_METRICS: try: # remote_host_type = fetch_tuple[1] valid_remote_host_type = False if remote_host_type == 'graphite' or remote_host_type == 'prometheus': valid_remote_host_type = True if not valid_remote_host_type: logger.error('error :: invalid remote_host_type for %s in %s' % ( remote_target, str(remote_host_type))) continue if LOCAL_DEBUG: logger.info('fetcher :: processing %s remote_target %s' % ( str(remote_host_type), str(remote_target))) if remote_host_type == 'graphite': remote_graphite_host = remote_host url = '%s%s%s' % (remote_graphite_host, uri, str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: with url %s' % str(url)) default_prometheus_uri = False passed_uri = None if remote_host_type == 'prometheus': remote_prometheus_host = remote_host # Hardcode the Prometheus api uri # uri = str(fetch_tuple[3]) # uri = '/api/v1/query?query=%s[5m]' % str(remote_target) # url encode the Prometheus metric query to handle # labels and query chars in the URI urlencoded_remote_target = quote(remote_target) if uri == 'default': default_prometheus_uri = True uri = '/api/v1/query_range?query=%s&start=%s&end=%s&step=60s' % ( str(urlencoded_remote_target), str(start_timestamp), str(end_timestamp)) else: passed_uri = uri url = '%s%s' % (remote_prometheus_host, uri) if LOCAL_DEBUG: logger.info('fetcher :: with url %s' % str(url)) frequency = int(frequency) if LOCAL_DEBUG: logger.info('fetcher :: with frequency %s' % str(frequency)) if LOCAL_DEBUG: logger.info('fetcher :: with namespace_prefix %s' % str(namespace_prefix)) if namespace_prefix != '': metric = '%s.%s' % (namespace_prefix, graphite_target) metric = filesafe_metricname(metric) else: metric = graphite_target metric = filesafe_metricname(metric) if LOCAL_DEBUG: logger.info('fetcher :: with metric %s' % str(metric)) api_key = str(api_key) if LOCAL_DEBUG: logger.info('fetcher :: with api_key %s' % str(api_key)) token = str(token) if LOCAL_DEBUG: logger.info('fetcher :: with token %s' % str(token)) user = str(user) if LOCAL_DEBUG: logger.info('fetcher :: with user %s' % str(user)) password = str(password) if LOCAL_DEBUG: logger.info('fetcher :: with password %s' % str(password)) populate_at_resolutions_str = str(populate_at_resolutions) if LOCAL_DEBUG: logger.info('fetcher :: with populate_at_resolutions %s' % populate_at_resolutions_str) # Handle if the user passes (None) instead of None if populate_at_resolutions == (): populate_at_resolutions = None except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine the required values in VISTA_FETCH_METRICS tuple for - %s' % ( str(remote_target))) continue # If the metric is not known to Vista and the metric # has a populate_at_resolutions set, send to Flux to # pre-populate Graphite pre_populate_graphite_metric = False if remote_target not in vista_unique_metrics: if remote_host_type == 'graphite' and populate_at_resolutions: pre_populate_graphite_metric = True logger.info('fetcher :: attempting to pre-propulate Graphite metric - %s' % ( metric)) if remote_host_type == 'prometheus' and populate_at_resolutions: pre_populate_graphite_metric = True logger.info('fetcher :: attempting to pre-propulate Prometheus metric - %s' % ( metric)) else: if LOCAL_DEBUG: logger.info('fetcher :: remote_target %s is present in vista_unique_metrics' % str(remote_target)) # However check if a metric is known to Flux and if so do not # use all resolutions just from the last.flux known timestamp # for he metric last_flux_timestamp = None redis_last_flux_metric_data = None try: cache_key = 'flux.last.%s' % metric if python_version == 3: redis_last_flux_metric_data = self.redis_conn.get(cache_key).decode('utf-8') else: redis_last_flux_metric_data = self.redis_conn.get(cache_key) if LOCAL_DEBUG: if redis_last_flux_metric_data: logger.info('fetcher :: Redis key %s is present' % str(cache_key)) else: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) except AttributeError: logger.info('fetcher :: Redis key %s is not present' % str(cache_key)) last_flux_timestamp = False redis_last_flux_metric_data = False except Exception as e: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: retrieving Redis key %s data - %s' % ( str(cache_key), str(e))) redis_last_flux_metric_data = False if redis_last_flux_metric_data: try: last_flux_metric_data = literal_eval(redis_last_flux_metric_data) last_flux_timestamp = int(last_flux_metric_data[0]) if LOCAL_DEBUG: if last_flux_timestamp: logger.info('fetcher :: Redis key %s last_flux_timestamp %s' % (str(cache_key), str(last_flux_timestamp))) else: logger.info('fetcher :: Redis key %s last_flux_timestamp unknown' % (str(cache_key))) except: logger.error(traceback.format_exc()) logger.error('error :: fetch :: failed determining last_flux_timestamp') last_flux_timestamp = False if last_flux_timestamp: time_now = int(time()) last_fetch = time_now - last_flux_timestamp if last_fetch < frequency: if LOCAL_DEBUG: logger.info('fetcher :: last fetch was %s seconds ago, less than frequency %s seconds, not fetching' % (str(last_fetch), str(frequency))) continue if remote_target in vista_unique_metrics and last_flux_timestamp: last_expected_fetch_time = time_now - (frequency + 300) if last_flux_timestamp < last_expected_fetch_time: if populate_at_resolutions: if remote_host_type == 'graphite' or remote_host_type == 'prometheus': pre_populate_graphite_metric = True behind_by_seconds = time_now - last_flux_timestamp logger.info('fetcher :: last_flux_timestamp is behind by %s seconds, attempting to pre-propulate %s' % ( str(behind_by_seconds), metric)) if remote_target in vista_unique_metrics: if not last_flux_timestamp: if populate_at_resolutions: if remote_host_type == 'graphite' or remote_host_type == 'prometheus': pre_populate_graphite_metric = True # Problem with asyncio so using Flux directly if remote_target in vista_unique_metrics and last_flux_timestamp and USE_FLUX: if populate_at_resolutions: if remote_host_type == 'graphite' or remote_host_type == 'prometheus': pre_populate_graphite_metric = True if pre_populate_graphite_metric: logger.info('fetcher :: attempting to build the pre-propulate Graphite metric urls - %s' % ( metric)) if LOCAL_DEBUG: logger.info('fetcher :: pre_populate_graphite_metric - %s - %s' % ( str(pre_populate_graphite_metric), metric)) fetch_resolution_urls = [] # Build remote Graphite URLs if remote_host_type == 'graphite' and pre_populate_graphite_metric: logger.info('fetcher :: building the pre-propulate Graphite metric urls - %s' % ( metric)) try: # Build URLs to submit to flux/HttpPopulateMetric resolutions = [] for resolution in populate_at_resolutions: resolutions.append(resolution) number_of_resolutions = len(resolutions) current_resolution_count = 0 for resolution in resolutions: append_url = True if current_resolution_count < (number_of_resolutions - 1): resolution_url = None if current_resolution_count == 0: next_resolution_count = 1 else: next_resolution_count = current_resolution_count + 1 next_resolution = resolutions[next_resolution_count] # If there is a known last_flux_timestamp only get data # from that time period until now if last_flux_timestamp: if 'days' in resolution: resolution_days = resolution.strip('days') resolution_hours = int(resolution_days) * 24 d = datetime.today() - timedelta(hours=resolution_hours) if 'hours' in resolution: resolution_hours = int(resolution.strip('hours')) d = datetime.today() - timedelta(hours=resolution_hours) resolution_timestamp = int(d.strftime('%s')) if resolution_timestamp < last_flux_timestamp: append_url = False else: append_url = True # If the last_flux_timestamp falls within # the range of the resolution period, append # otherwise the fill will leave an airgap in # the data if 'days' in next_resolution: next_resolution_days = next_resolution.strip('days') next_resolution_hours = int(next_resolution_days) * 24 d = datetime.today() - timedelta(hours=next_resolution_hours) if 'hours' in next_resolution: next_resolution_hours = int(next_resolution.strip('hours')) d = datetime.today() - timedelta(hours=next_resolution_hours) next_resolution_timestamp = int(d.strftime('%s')) if last_flux_timestamp in range(resolution_timestamp, next_resolution_timestamp): append_url = True resolution_url = '%s/render/?from=-%s&until=-%s&format=json&target=%s' % ( str(remote_graphite_host), str(resolution), str(next_resolution), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) else: resolution_url = '%s/render/?from=-%s&until=-%s&format=json&target=%s' % ( str(remote_graphite_host), str(resolution), str(next_resolution), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) current_resolution_count += 1 else: if last_flux_timestamp: if 'days' in resolution: resolution_days = resolution.strip('days') resolution_hours = int(resolution_days) * 24 d = datetime.today() - timedelta(hours=resolution_hours) if 'hours' in resolution: resolution_hours = int(resolution.strip('hours')) d = datetime.today() - timedelta(hours=resolution_hours) resolution_timestamp = int(d.strftime('%s')) if last_flux_timestamp > resolution_timestamp: append_url = True fetch_from_timestamp = last_flux_timestamp - 600 else: append_url = True fetch_from_timestamp = resolution_timestamp resolution_url = '%s/render/?from=%s&format=json&target=%s' % ( str(remote_graphite_host), str(fetch_from_timestamp), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) else: resolution_url = '%s/render/?from=-%s&format=json&target=%s' % ( str(remote_graphite_host), str(resolution), str(remote_target)) if LOCAL_DEBUG: logger.info('fetcher :: resolution_url - %s - %s' % ( str(resolution_url), metric)) if append_url: fetch_resolution_urls.append(resolution_url) if LOCAL_DEBUG: logger.info('fetcher :: appended resolution_url - %s - %s' % ( str(resolution_url), metric)) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine the required resolutions for values in VISTA_FETCH_METRICS tuple for - %s' % ( str(remote_target))) # Build remote Prometheus URLs if remote_host_type == 'prometheus' and pre_populate_graphite_metric: # Assuming Prometheus only has a single retention (resolution) try: start_seconds_ago = 1296000 # default to 15 days for resolution in populate_at_resolutions: # Build URL to submit to flux/HttpPopulateMetric # url encode the Prometheus metric query to handle # labels and query chars in the URI if 'm' in resolution: resolution_int = resolution.strip('m') start_seconds_ago = int(resolution_int) * 60 if 'h' in resolution: resolution_int = resolution.strip('h') start_seconds_ago = (int(resolution_int) * 60) * 60 if 'd' in resolution: resolution_int = resolution.strip('d') start_seconds_ago = ((int(resolution_int) * 24) * 60) * 60 if 'w' in resolution: resolution_int = resolution.strip('w') start_seconds_ago = (((int(resolution_int) * 7) * 24) * 60) * 60 pop_start_timestamp = end_timestamp - int(start_seconds_ago) urlencoded_remote_target = quote(remote_target) # The query_range query does not return more than # 11000 data points as it is limited as per # https://github.com/prometheus/prometheus/issues/2253#issuecomment-346288842 # so resample needed to be reintroduced after being # deleted as the Prometheus query_range was switched # to uri = '/api/v1/query_range?query=%s&start=%s&end=%s&step=60s' % ( str(urlencoded_remote_target), str(pop_start_timestamp), str(end_timestamp)) # Use query endpoint for more than 11000 data points uri = '/api/v1/query?query=%s[%s]' % ( str(urlencoded_remote_target), str(resolution)) resolution_url = '%s%s' % ( str(remote_prometheus_host), uri) fetch_resolution_urls.append(resolution_url) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine the required pre-populate URI for values in VISTA_FETCH_METRICS tuple for - %s' % ( str(populate_at_resolutions))) if fetch_resolution_urls: set_fetch_resolution_urls = set(fetch_resolution_urls) fetch_resolution_urls = list(set_fetch_resolution_urls) flux_url = None try: # Submit to flux/populate_metric protocol = 'http://' flux_url = '%s%s:%s/populate_metric' % ( protocol, str(settings.FLUX_IP), str(settings.FLUX_PORT)) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not build the flux URL') payload = None fetch_resolution_urls_str = '"%s"' % fetch_resolution_urls if fetch_resolution_urls and pre_populate_graphite_metric: try: payload = { 'remote_host_type': remote_host_type, 'remote_target': remote_target, 'metric': metric, 'namespace_prefix': namespace_prefix, 'key': settings.FLUX_SELF_API_KEY, 'token': token, 'user': user, 'password': password, 'fetch_resolution_urls': fetch_resolution_urls_str } except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not build the payload json') if flux_url and payload: try: logger.info('fetcher :: calling %s with payload - %s' % ( flux_url, str(payload))) response = requests.post(flux_url, json=payload) logger.info('fetcher :: flux /populate_metric response code - %s' % ( str(response.status_code))) if response.status_code == 200: fetcher_sent_to_flux += 1 if response.status_code == 204: fetcher_sent_to_flux += 1 except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not post data to flux URL - %s, data - %s' % ( str(flux_url), str(payload))) if not pre_populate_graphite_metric: if last_flux_timestamp and remote_host_type == 'graphite': try: # Best effort to backfill any missing data url_from = re.sub(r'^.*from=[-]', '', url) url_period = re.sub(r'&.*', '', url_from) if 'days' in url_period: resolution_days = url_period.strip('days') d = datetime.today() - timedelta(days=resolution_days) if 'hours' in url_period: resolution_hours = int(url_period.strip('hours')) d = datetime.today() - timedelta(hours=resolution_hours) if 'minutes' in url_period: resolution_minutes = int(url_period.strip('minutes')) d = datetime.today() - timedelta(minutes=resolution_minutes) from_resolution_timestamp = int(d.strftime('%s')) if from_resolution_timestamp < last_flux_timestamp: rep_str = url_period if 'from=-' in url: rep_str = '-%s' % url_period fetch_from_timestamp = last_flux_timestamp - 300 url = re.sub(rep_str, str(fetch_from_timestamp), url) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine backfill parameters') if last_flux_timestamp and remote_host_type == 'prometheus': try: # Best effort to backfill any missing data if default_prometheus_uri: pop_start_timestamp = int(last_flux_timestamp) - 120 urlencoded_remote_target = quote(remote_target) uri = '/api/v1/query_range?query=%s&start=%s&end=%s&step=60s' % ( str(urlencoded_remote_target), str(pop_start_timestamp), str(end_timestamp)) url = '%s%s' % (str(remote_prometheus_host), uri) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not determine backfill parameters') metric_to_fetch = [remote_host_type, frequency, remote_target, graphite_target, metric, url, namespace_prefix, api_key, token, user, password] metrics_to_fetch.append(metric_to_fetch) if LOCAL_DEBUG: logger.info('fetcher :: added metric_to_fetch - %s' % str(metric_to_fetch)) if LOCAL_DEBUG: if metrics_to_fetch: metrics_to_fetch_count = len(metrics_to_fetch) logger.info('fetcher :: there are %s metrics in metrics_to_fetch' % str(metrics_to_fetch_count)) if metrics_to_fetch: # Spawn fetch process/es pids = [] spawned_pids = [] pid_count = 0 for i in range(1, settings.VISTA_FETCHER_PROCESSES + 1): if i > len(metrics_to_fetch): logger.info('fetcher :: WARNING: Skyline Vista fetcher is set for more cores than needed.') break try: p = Process(target=self.fetch_process, args=(i, metrics_to_fetch)) pids.append(p) pid_count += 1 logger.info('fetcher :: starting %s of %s fetch_process/es' % (str(pid_count), str(settings.VISTA_FETCHER_PROCESSES))) p.start() spawned_pids.append(p.pid) except: logger.info(traceback.format_exc()) logger.error('error :: fetcher :: failed to spawn fetch_process') # Self monitor processes and terminate if any fetch_process has run # for longer than VISTA_FETCHER_PROCESS_MAX_RUNTIME seconds p_starts = time() while time() - p_starts <= settings.VISTA_FETCHER_PROCESS_MAX_RUNTIME: if any(p.is_alive() for p in pids): # Just to avoid hogging the CPU sleep(.1) else: # All the processes are done, break now. time_to_run = time() - p_starts logger.info('fetcher :: %s fetch_process/es completed in %.2f seconds' % (str(settings.VISTA_FETCHER_PROCESSES), time_to_run)) break else: # We only enter this if we didn't 'break' above. logger.info('fetcher :: timed out, killing all fetch_process processes') for p in pids: logger.info('fetcher :: killing fetch_process process') p.terminate() # p.join() logger.info('fetcher :: killed fetch_process process') for p in pids: if p.is_alive(): logger.info('fetcher :: stopping fetch_process - %s' % (str(p.is_alive()))) p.join() # Sleep if it went too fast process_runtime = int(time()) - begin_fetcher_run if int(process_runtime) < 60: next_run = int(begin_fetcher_run) + 60 time_now = int(time()) sleep_for = next_run - time_now logger.info('fetcher :: sleeping for %s seconds until next fetch' % str(sleep_for)) sleep(sleep_for) try: del sleep_for except: logger.error('error :: fetcher :: failed to del sleep_for') try: del next_run except: logger.error('error :: fetcher :: failed to del next_run') try: del time_now except: logger.error('error :: fetcher :: failed to del time_now') metrics_fetched = [] metrics_fetched_count = 0 try: redis_set = 'vista.fetcher.metrics.fetched' metrics_fetched = self.redis_conn.smembers(redis_set) metrics_fetched_count = len(list(metrics_fetched)) logger.info('fetcher :: %s metrics were fetched' % str(metrics_fetched_count)) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not get Redis set %s' % redis_set) redis_set = 'vista.worker.to.process' try: redis_set = 'vista.fetcher.metrics.fetched' self.redis_conn.delete(redis_set) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not delete the Redis set %s' % redis_set) if metrics_fetched: timestamps = [] for str_metric_fetched in metrics_fetched: try: if python_version == 3: str_metric_fetched = str_metric_fetched.decode('UTF-8') metric_fetched = literal_eval(str_metric_fetched) timestamp = int(metric_fetched[1]) timestamps.append(timestamp) except: logger.error('error :: fetcher :: failed to determine timestamp from %s' % str(str_metric_fetched)) try: timestamps.sort() last_fetch_timestamp = int(timestamps[-1]) time_to_fetch = last_fetch_timestamp - begin_fetcher_run logger.info('fetcher :: %s metrics fetched this run in %s seconds' % ( str(metrics_fetched_count), str(time_to_fetch))) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: failed to last_fetch_timestamp from timestamps') try: redis_set = 'vista.worker.to.process' metrics_count_for_workers = len(list(self.redis_conn.smembers(redis_set))) logger.info('fetcher :: %s of the metrics fetched from this run still need to be processed by a worker' % str(metrics_count_for_workers)) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not get Redis set %s' % redis_set) try: send_metric_name = '%s.sent_to_flux' % skyline_app_graphite_namespace logger.info('fetcher :: sending Graphite - %s, %s' % ( send_metric_name, str(fetcher_sent_to_flux))) fetcher_sent_to_flux_str = str(fetcher_sent_to_flux) send_graphite_metric(parent_skyline_app, send_metric_name, fetcher_sent_to_flux_str) except: logger.error(traceback.format_exc()) logger.error('error :: fetcher :: could not send %s to Graphite' % send_metric_name) try: del process_runtime except: logger.error('error :: fetcher :: failed to del process_runtime')