Source code for vista.bq_backfill

"""
bq_backfill.py
"""
import copy
import gzip
import json
import logging
import os
import sys
import traceback

from ast import literal_eval
from contextlib import nullcontext
from datetime import datetime, timezone
from time import time, sleep

import requests

# For version pinning consider implementing a skyline-bq-py31013 environment
# and adding that to the path so that imports come from there rather than from
# the skyline-py31013 environment which has different versions of some libraries
# e.g. googleapis-common-protos

# VIA webapp and API rather than via config...

sys.path.insert(0, '/opt/skyline/github/skyline/skyline')
import settings
from skyline_functions import get_redis_conn_decoded, get_graphite_metric, mkdir_p
from functions.database.queries.get_all_inactive_db_metric_names import get_all_inactive_db_metric_names
from functions.database.queries.set_metric_ids_as_active import set_metric_ids_as_active
from functions.settings.get_bq_accounts_settings import get_bq_accounts_settings
from functions.settings.get_external_settings import get_external_settings
from functions.redis.populate_redis_metric import populate_redis_metric

try:
    VISTA_BQ_ACCOUNTS = copy.deepcopy(settings.VISTA_BQ_ACCOUNTS)
except:
    VISTA_BQ_ACCOUNTS = {}
try:
    VISTA_BQ_VIRTUALENV_PATH = settings.VISTA_BQ_VIRTUALENV_PATH
except:
    VISTA_BQ_VIRTUALENV_PATH = None
if VISTA_BQ_VIRTUALENV_PATH:
    sys.path.insert(0, VISTA_BQ_VIRTUALENV_PATH)
    from functions.bigquery.bq_df_to_metrics import bq_df_to_metrics
    from functions.bigquery.bq_credentials import get_bq_credentials
    import pandas as pd
    import pandas_gbq
else:
    bq_df_to_metrics = nullcontext()
    get_bq_credentials = nullcontext()
    get_bq_credentials = nullcontext()
    pandas_gbq = nullcontext()


parent_skyline_app = 'vista'
child_skyline_app = 'bq_backfill'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)

# To ensure that the Graphite MAX_CREATES_PER_MINUTE is not breached.
# Send to flux but only send Graphite MAX_CREATES_PER_MINUTE per minute.
# This also limits the number of metrics that have to be created in the
# DB as well and mitigates any thundering herd operations.
MAX_CREATES_PER_MINUTE = 450

# In order to prevent throwing tons on data at Graphite, horizon, DB, Redis, etc
# the backfill is limited to how many submissions can be made per minute.
SECONDS_BETWEEN_SUBMISSION = 5

DEBUG_LOGGING = False

# @added 20240516 - Feature #5352: vista - bigquery
[docs] def bq_backfill(bq_backfill_job_id): """ This function is used to get data from BigQuery, convert it to metrics and submit it to flux and Redis (if it would be dropped by Horizon due to :mod:`settings.MAX_RESOLUTION`) """ start_backfill_job = time() try: redis_conn_decoded = get_redis_conn_decoded(parent_skyline_app) except Exception as err: logger.error('error :: bq_backfill - redis connection failed, err: %s' % err) def remove_key(key): logger.info('bq_backfill :: removing %s from vista.bq_backfill.work' % key) try: redis_conn_decoded.hdel('vista.bq_backfill.work', key) except Exception as err: logger.error('error :: bq_backfill :: failed to hdel %s from vista.bq_backfill.work, err: %s' % ( key, err)) def update_job_status(key, data): logger.info('bq_backfill :: updating job status for %s in vista.bq_backfill.job_status' % key) try: redis_conn_decoded.hset('vista.bq_backfill.job_status', key, str(data)) except Exception as err: # @modified 20241106 - Task #5526: Build v5.0.0 and upgrade deps # bandit - B608:hardcoded_sql_expressions logger.error('error :: bq_backfill :: failed to update job status with hset on %s in vista.bq_backfill.job_status, err: %s' % ( key, err)) # nosec B608 def fail_job(key, data, err_msg): remove_key(key) data['error'] = err_msg data['status'] = 'failed' data['status_code'] = '500' update_job_status(key, data) vista_bq_backfill_work = {} try: vista_bq_backfill_work = redis_conn_decoded.hgetall('vista.bq_backfill.work') except Exception as err: logger.error('error :: bq_backfill :: failed to hgetall on vista.bq_backfill.work, err: %s' % err) vista_bq_backfill_work = {} vista_bq_account_key = None from_timestamp = None until_timestamp = None interval = None replace_query_string = None date_format = None job_id = None external_progress_url = None work_dict = {} # Get a job from the work queue for job_id, work_dict_str in vista_bq_backfill_work.items(): if job_id != bq_backfill_job_id: continue try: work_dict = literal_eval(work_dict_str) except Exception as err: err_msg = 'bq_backfill :: failed to literal_eval work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict_str, err_msg) continue try: vista_bq_account_key = work_dict['vista_bq_account_key'] except Exception as err: err_msg = 'failed to determine vista_bq_account_key from work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) continue try: from_timestamp = work_dict['from_timestamp'] except Exception as err: err_msg = 'failed to determine from_timestamp from work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) continue try: until_timestamp = work_dict['until_timestamp'] except Exception as err: err_msg = 'failed to determine until_timestamp from work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict. err_msg) continue try: interval = work_dict['interval'] except Exception as err: err_msg = 'failed to determine interval from work_dict' logger.error('error :: bq_backfill :: %s:%s, err: %s' % ( err_msg, str(work_dict_str), err)) fail_job(job_id, work_dict, err_msg) continue try: replace_query_string = work_dict['replace_query_string'] except Exception as err: err_msg = 'failed to determine replace_query_string from work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) continue try: date_format = work_dict['date_format'] except Exception as err: err_msg = 'failed to determine date_format from work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) continue try: added_by = work_dict['added_by'] except Exception as err: err_msg = 'failed to determine added_by from work_dict' logger.error('error :: bq_backfill :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) continue # The default is to zero_fill # CAN MAKES SO MANY MORE MERTICS!!!! zero_fill = False try: zero_fill = work_dict['zero_fill'] except: zero_fill = False # The default is to NOT replace_redis_timeseries replace_redis_timeseries = True try: replace_redis_timeseries = work_dict['replace_redis_timeseries'] except: replace_redis_timeseries = False max_creates_per_minute = 500 try: max_creates_per_minute = work_dict['max_creates_per_minute'] except: max_creates_per_minute = 500 # The default is to dry_run dry_run = True try: dry_run = work_dict['dry_run'] except: dry_run = True # The default is to not test flux_test = False try: flux_test = work_dict['flux_test'] except: flux_test = False verify_ssl = settings.VERIFY_SSL try: verify_ssl = work_dict['verify_ssl'] except: verify_ssl = settings.VERIFY_SSL use_archive_data = True try: use_archive_data = work_dict['use_archive_data'] except: use_archive_data = True try: external_progress_url = work_dict['external_progress_url'] except: external_progress_url = None break if added_by in ['webapp', 'api'] and not replace_query_string: err_msg = 'failed to replace_query_string' logger.error('error :: bq_backfill :: %s' % err_msg) fail_job(job_id, work_dict, err_msg) return if added_by == 'vista_bq_fetcher': replace_query_string = None started = int(time()) work_dict['started'] = started job_key_dict = {'started': started, 'job_id': job_id, 'work_dict': work_dict} try: redis_conn_decoded.set('vista.bq_backfill.running', str(job_key_dict)) logger.info('bq_backfill set vista.bq_backfill.running Redis key with %s' % str(job_key_dict)) except Exception as err: logger.error('error :: bq_backfill :: failed to set on vista.bq_backfill.running from Redis, err: %s' % err) """ # Example work_dict that is submitted via /bq_backfill by the webapp, direct # or by vista_bq_fetcher work_data = { 'job_id': job_id, 'vista_bq_account_key': vista_bq_account_key, 'from_timestamp': from_timestamp, 'until_timestamp': until_timestamp, 'interval': interval, 'date_format': date_format, 'replace_query_string': replace_query_string, 'zero_fill': zero_fill, 'dry_run': dry_run, 'replace_redis_timeseries': replace_redis_timeseries, 'max_creates_per_minute': max_creates_per_minute, 'use_archive_data': use_archive_data, 'flux_test': flux_test, 'verify_ssl': verify_ssl, 'status': 'pending', 'status_url': status_url, 'percent_complete': 0, 'intervals_to_fetch': [], 'intervals_fetched': [], 'external_progress_url': external_progress_url, 'added_by': added_by, } """ # Remove the work remove_key(job_id) # Update the job status work_dict['status'] = 'processing' intervals_to_fetch_timestamps = {} intervals_to_fetch = [] if len(work_dict['intervals_to_fetch']) == 0: end_timestamp = int(until_timestamp) + 1 ctime = int(from_timestamp) while ctime < end_timestamp: try: interval_date_fromat = datetime.fromtimestamp(int(ctime)).strftime(date_format) intervals_to_fetch.append(interval_date_fromat) intervals_to_fetch_timestamps[interval_date_fromat] = int(ctime) except Exception as err: logger.error('error :: bq_backfill :: failed to convert %s to date_format, err: %s' % ( str(ctime), err)) ctime = ctime + interval if ctime > end_timestamp: break work_dict['intervals_to_fetch'] = intervals_to_fetch try: update_job_status(job_id, str(work_dict)) except Exception as err: logger.error('error :: bq_backfill :: update_job_status failed, job_id: %s, work_dict: %s, err: %s' % ( job_id, str(work_dict), err)) vista_bq_accounts = {} try: vista_bq_accounts = get_bq_accounts_settings(parent_skyline_app) except Exception as err: logger.error('error :: bq_fetcher :: get_bq_accounts_settings failed, err: %s' % err) bq_account_settings = {} try: bq_account_settings = vista_bq_accounts[vista_bq_account_key] except Exception as err: logger.error('error :: bq_fetcher :: failed to determine settings for %s, err: %s' % ( vista_bq_account_key, err)) if not bq_account_settings: err_msg = 'failed to determine settings for %s' % vista_bq_account_key logger.error('error :: bq_fetcher :: %s' % err_msg) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err) return query = None try: query = bq_account_settings['query'] except Exception as err: err_msg = 'failed to determine query from settings for %s' % vista_bq_account_key logger.error('error :: bq_fetcher :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err) return metric_format = 'graphite' try: metric_format = bq_account_settings['metric_format'] except Exception as err: logger.error('error :: bq_backfill :: failed to ddetermine metric_format from bq_account_settings defaulting to Graphite, err: %s' % err) metric_format = 'graphite' flux_api_token = None try: flux_api_token = bq_account_settings['flux_api_token'] except Exception as err: err_msg = 'failed to determine flux_api_token from settings for %s' % vista_bq_account_key logger.error('error :: bq_fetcher :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err) return try: full_duration = bq_account_settings['full_duration'] except Exception as err: err_msg = 'failed to determine full_duration from settings for %s' % vista_bq_account_key logger.error('error :: bq_fetcher :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err) return try: schedule_interval = bq_account_settings['schedule_interval'] except Exception as err: err_msg = 'failed to determine schedule_interval from settings for %s' % vista_bq_account_key logger.error('error :: bq_fetcher :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err) return flux_key_namespace_prefixes = { settings.FLUX_SELF_API_KEY: None } for flux_token in settings.FLUX_API_KEYS: flux_key_namespace_prefixes[flux_token] = settings.FLUX_API_KEYS[flux_token] skyline_external_settings = {} try: skyline_external_settings = get_external_settings(parent_skyline_app, None, True) except Exception as err: logger.error('error :: bq_backfill :: get_external_settings failed, err: %s' % ( err)) if skyline_external_settings: for settings_key in list(skyline_external_settings.keys()): external_setting_keys = list(skyline_external_settings[settings_key].keys()) if 'flux_token' in external_setting_keys: flux_token = skyline_external_settings[settings_key]['flux_token'] if flux_token and flux_token not in flux_key_namespace_prefixes.keys(): flux_key_namespace_prefixes[flux_token] = skyline_external_settings[settings_key]['namespace'] namespace_prefix = None if flux_api_token: try: namespace_prefix = flux_key_namespace_prefixes[flux_api_token] except Exception as err: logger.error('error :: bq_backfill :: failed to determine flux namespace_prefix from flux_api_token, err: %s' % ( err)) namespace_prefix = None # Credentials can be passed in a json file or as a dict (as from # external_settings) credentials = None use_credentials = None oauth_credentials = {} service_credentials = {} oauth_credentials_file = None service_credentials_file = None project_id = None if 'project_id' in bq_account_settings.keys(): project_id = bq_account_settings['project_id'] if 'oauth_credentials' in bq_account_settings.keys(): oauth_credentials = bq_account_settings['oauth_credentials'] if 'oauth_credentials_file' in bq_account_settings.keys(): oauth_credentials_file = bq_account_settings['oauth_credentials_file'] if 'service_credentials' in bq_account_settings.keys(): service_credentials = bq_account_settings['service_credentials'] if 'service_credentials_file' in bq_account_settings.keys(): service_credentials_file = bq_account_settings['service_credentials_file'] try: credentials = get_bq_credentials( parent_skyline_app, credentials=use_credentials, project_id=project_id, oauth_credentials=oauth_credentials, oauth_credentials_file=oauth_credentials_file, service_credentials=service_credentials, service_credentials_file=service_credentials_file) except Exception as err: logger.error('error :: bq_backfill :: get_bq_credentials failed, err: %s' % ( err)) if not credentials: logger.error('error :: bq_backfill :: no credentials retrieved for %s' % vista_bq_account_key) err_msg = 'failed to get credentials for %s' % vista_bq_account_key logger.error('error :: bq_fetcher :: %s' % err_msg) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err) return else: use_credentials = credentials if dry_run: work_dict['results'] = {} # Resources to determine what metric ids need to be set to active again all_db_metric_ids_with_names = {} try: all_db_metric_ids_with_names = redis_conn_decoded.hgetall('metrics_manager.all_db_metric_ids_with_names') except Exception as err: logger.error('error :: bq_update :: failed hgetall metrics_manager.all_db_metric_ids_with_names, err: %s' % ( err)) all_base_names_with_ids = {} all_ids_with_base_names = {} for id_str, i_base_name in all_db_metric_ids_with_names.items(): metric_id = int(float(id_str)) all_base_names_with_ids[i_base_name] = metric_id all_ids_with_base_names[metric_id] = i_base_name # get inactive metric_ids inactive_metric_names_with_id = {} try: inactive_metric_names, inactive_metric_names_with_id = get_all_inactive_db_metric_names(parent_skyline_app, with_ids=True) except Exception as err: logger.error('error :: bq_update :: get_all_inactive_db_metric_names failed, err: %s' % ( err)) inactive_metric_ids = set(list(inactive_metric_names_with_id.values())) metric_ids_to_set_as_active_again = [] # Track the number of unique metrics to control how many are sent to flux to # ensure that the Graphite MAX_CREATES_PER_MINUTE is not breached initially unique_metrics = set() flux_url = '%s/flux/metric_data_post' % settings.SKYLINE_URL headers = {'content-encoding': 'gzip', "content-type": "application/json"} estimated_time = 0 failed_job = False failed_job_err = None total_intervals_to_fetch = len(intervals_to_fetch) logger.info('bq_backfill :: %s intervals_to_fetch for %s' % ( str(total_intervals_to_fetch), vista_bq_account_key)) # Track all the new metrics created to rate limit and not breach Graphite # MAX_CREATES_PER_MINUTE metrics_created = set() # Record all timestamps to determine whether Redis needs to be populated # for vista_bq_fetcher jobs timestamps_submitted = set() last_submission_timestamp = 0 redis_populated_metrics = [] if metric_format == 'graphite': work_dict['redis_metrics_populated'] = len(redis_populated_metrics) known_metrics = [] try: known_metrics = redis_conn_decoded.hkeys('aet.metrics_manager.metric_names_with_ids') except Exception as err: logger.error('error :: bq_backfill :: hkeys failed on aet.metrics_manager.metric_names_with_ids, err: %s' % ( err)) start_bq_to_flux = int(time()) intervals_fetched = [] for fetch_interval in intervals_to_fetch: if failed_job: break start_fetch_interval = int(time()) sql = str(query) if added_by != 'vista_bq_fetcher': try: unquoted_replace_string = str(fetch_interval) replace_string = "'%s'" % unquoted_replace_string sql = query.replace(replace_query_string, replace_string) except Exception as err: err_msg = 'failed to generate sql with replace_query_string and %s' % str(fetch_interval) logger.error('error :: bq_backfill :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) failed_job = True failed_job_err = str(err_msg) break logger.info('bq_backfill :: fetching data for %s with sql: %s' % (vista_bq_account_key, sql)) # Just to ensure the credentials are refreshed if they have expired # during the loop if use_credentials: try: credentials = get_bq_credentials( parent_skyline_app, credentials=use_credentials, project_id=project_id, oauth_credentials=oauth_credentials, oauth_credentials_file=oauth_credentials_file, service_credentials=service_credentials, service_credentials_file=service_credentials_file) except Exception as err: logger.error('error :: bq_backfill :: get_bq_credentials failed, err: %s' % ( err)) if credentials: use_credentials = credentials df = None do_not_send_to_flux = False # Load archive data if it exists and has been requested # the date_folder must be the date and time it was downloaded not the # fetch_interval # date_folder = datetime.strftime(datetime.strptime(fetch_interval, date_format), '%Y%m%d%H%M%S') date_folder_ts = int(start_backfill_job) date_folder = datetime.fromtimestamp(date_folder_ts, tz=timezone.utc).strftime('%Y%m%d%H%M%S') # Search the path for the latest archive for the archive_data_search_path = '%s/vista/bq_archives/%s' % ( settings.SKYLINE_DIR, vista_bq_account_key) bq_archives = [] if os.path.isdir(archive_data_search_path) and use_archive_data: for dir_path, folders, files in os.walk(archive_data_search_path): try: if files: for i in files: path_and_file = '%s/%s' % (dir_path, i) if i.endswith('.csv'): bq_archives.append(path_and_file) except Exception as err: logger.error('error :: bq_backfill :: os.walk failed on %s, err: %s' % ( str(archive_data_search_path), err)) # Sort latest to oldest bq_archives = sorted(bq_archives, reverse=True) logger.info('bq_backfill :: %s bq_archives found' % (str(len(bq_archives)))) archive_csv_filename = '%s.%s.csv' % (vista_bq_account_key, str(fetch_interval)) archive_csv = None # Find latest archive if one exists for i in bq_archives: if archive_csv_filename in i: archive_csv = str(i) break archive_data_used = False if archive_csv: if os.path.isfile(archive_csv): logger.info('bq_backfill :: loading data for %s from latest archive data file %s' % (str(fetch_interval), archive_csv)) try: df = pd.read_csv(archive_csv) except Exception as err: err_msg = 'pd.read_csv failed with archive_csv: %s' % archive_csv logger.error('error :: bq_backfill :: %s, err: %s' % ( err_msg, err)) if df is not None: logger.info('bq_backfill :: loaded %s data rows for %s from %s' % ( str(len(df)), str(fetch_interval), archive_csv)) archive_data_used = True # do_not_send_to_flux = True if df is None: try: df = pandas_gbq.read_gbq(sql, project_id=bq_account_settings['project_id'], credentials=credentials, progress_bar_type=None) except Exception as err: err_msg = 'pandas_gbq.read_gbq failed with sql: %s' % sql logger.error(traceback.format_exc()) logger.error('error :: bq_backfill :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) failed_job = True failed_job_err = str(err_msg) break # Save the archive data if it exists and has been requested # The date_folder must be the date and time it was downloaded not the # fetch_interval date_folder_ts = int(start_backfill_job) date_folder = datetime.fromtimestamp(date_folder_ts, tz=timezone.utc).strftime('%Y%m%d%H%M%S') archive_data_path = '%s/vista/bq_archives/%s/%s' % ( settings.SKYLINE_DIR, vista_bq_account_key, date_folder) new_archive_csv = '%s/%s' % (str(archive_data_path), archive_csv_filename) if df is not None and not archive_data_used: if not os.path.isdir(archive_data_path): try: mkdir_p(archive_data_path) except Exception as err: err_msg = 'failed to create archive_data_path: %s' % archive_data_path logger.error('error :: bq_backfill :: %s, err: %s' % ( err_msg, err)) if not os.path.isfile(new_archive_csv): logger.info('bq_backfill :: archiving data for %s to %s' % (str(fetch_interval), archive_csv)) try: df.to_csv(new_archive_csv, index=False) except Exception as err: err_msg = 'df.to_csv failed to create archive_csv: %s' % new_archive_csv logger.error('error :: bq_backfill :: %s, err: %s' % ( err_msg, err)) if os.path.isfile(new_archive_csv): logger.info('bq_backfill :: archived data for %s to %s' % (str(fetch_interval), new_archive_csv)) else: logger.error('error :: bq_backfill :: df.to_csv failed to create archive_csv: %s' % new_archive_csv) intervals_fetched.append(fetch_interval) logger.info('bq_backfill :: %s rows returned for interval: %s' % ( str(len(df)), str(fetch_interval))) metrics = [] if len(df) > 0: try: metrics = bq_df_to_metrics(parent_skyline_app, bq_account_settings, df, backfill=True, zero_fill=zero_fill, return_nan_metrics=False) except Exception as err: logger.error(traceback.format_exc()) err_msg = 'bq_df_to_metrics failed for fetch_interval: %s' % str(fetch_interval) logger.error('error :: bq_backfill :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) failed_job = True failed_job_err = str(err_msg) break logger.info('bq_backfill :: %s metrics generated for interval: %s' % ( str(len(metrics)), str(fetch_interval))) # Update the unique_metrics metrics_list = [] if metrics: metrics_list = [metric_data['metric'] for metric_data in metrics] unique_metrics.update(metrics_list) timestamp_list = list(set([metric_data['timestamp'] for metric_data in metrics])) timestamps_submitted.update(timestamp_list) if dry_run: work_dict['results'][fetch_interval] = metrics # Set any inactive metrics to active again metric_ids_to_set_as_active_again = [] for base_name in metrics_list: metric_id = None try: metric_id = all_base_names_with_ids[base_name] except: pass if metric_id: if metric_id in inactive_metric_ids: metric_ids_to_set_as_active_again.append(metric_id) if len(metric_ids_to_set_as_active_again): logger.info('bq_backfill :: %s metrics to set as active' % str(len(metric_ids_to_set_as_active_again))) set_metrics_as_active_count = None try: set_metrics_as_active_count = set_metric_ids_as_active(parent_skyline_app, metric_ids_to_set_as_active_again) except Exception as err: logger.error('error :: bq_backfill :: set_metric_ids_as_active failed - %s' % ( str(err))) logger.info('bq_backfill :: %s metrics set as active' % str(set_metrics_as_active_count)) logger.info('bq_backfill :: the job now has %s unique metrics, submitting to flux' % ( str(len(unique_metrics)))) # Send to flux but only send Graphite MAX_CREATES_PER_MINUTE per minute. # This also limits the number of metrics that have to be created in the # DB as well and mitigates any thundering herd operations. start_flux_metrics_submission = int(time()) start_batch = 0 metrics_sent_in_batch = [] metrics_sent = [] batch_created_metrics = [] failed_count = 0 if metric_format == 'graphite' and metrics: # Create batches metrics_payloads = [] for i in range(0, len(metrics), max_creates_per_minute): metrics_payload = metrics[i:(i + max_creates_per_minute)] metrics_payloads.append(metrics_payload) # Send metrics in a controlled manner for index, metrics_payload in enumerate(metrics_payloads): if dry_run: if not flux_test: sleep(3) continue if failed_job: break if do_not_send_to_flux: continue # Only make x submissions per minute if last_submission_timestamp: next_submission = last_submission_timestamp + SECONDS_BETWEEN_SUBMISSION else: next_submission = int(time()) + 1 while next_submission < int(time()): sleep(1) if not start_batch: start_batch = int(time()) if int(time()) >= (start_batch + 59): # Start a new batch start_batch = int(time()) metrics_sent_in_batch = [] batch_created_metrics = [] # send payload = { "key": flux_api_token, "metrics": metrics_payload, } if flux_test: payload['test'] = True if index == 0 and DEBUG_LOGGING: logger.debug('debug :: bq_backfill :: flux payload: %s' % str(payload)) metrics_to_create = [metric_data['metric'] for metric_data in metrics_payload if metric_data['metric'] not in list(metrics_created)] unknown_metrics_to_create = [] if len(known_metrics) > 0: for metric in metrics_to_create: known_metric = len([km for km in known_metrics if metric in km]) if not known_metric: unknown_metrics_to_create.append(metric) metrics_to_create = list(unknown_metrics_to_create) logger.info('bq_backfill :: %s new metrics to create in batch' % str(len(metrics_to_create))) now = int(time()) if now < (start_batch + 59): # Do not breach Graphite MAX_CREATES_PER_MINUTE if len(batch_created_metrics) >= max_creates_per_minute: sleep_for = (start_batch + 59) - now logger.info('bq_backfill :: sleeping for %s seconds to not breach Graphite MAX_CREATES_PER_MINUTE' % str(sleep_for)) sleep(sleep_for) # Start a new batch start_batch = int(time()) metrics_sent_in_batch = [] batch_created_metrics = [] # Send success = False while not success: r = None try: request_body = gzip.compress(json.dumps(payload).encode('utf-8')) r = requests.post(flux_url, data=request_body, headers=headers, timeout=50, verify=verify_ssl) except Exception as err: logger.error('error :: bq_backfill :: failed to POST %s data to flux, err: %s' % ( str(fetch_interval), err)) failed_count += 1 if failed_count == 3: failed_job = True failed_job_err = 'the submission of metrics for %s failed 3 times, backfill job for %s failed' % ( str(fetch_interval), vista_bq_account_key) break if r: if r.status_code in [200, 204]: success = True logger.info('bq_backfill :: posted %s metrics to flux for %s' % ( str(len(metrics_payload)), fetch_interval)) elif r.status_code == 207: logger.warning('warning :: bq_backfill :: flux responded with status_code of 207, some metrics for %s not accepted' % str(fetch_interval)) success = True if '207_responses' not in work_dict.keys(): work_dict['207_responses'] = {} work_dict['207_responses'][fetch_interval] = r.json() else: logger.warning('warning :: bq_backfill :: flux responded with status_code of %s for %s' % ( str(r.status_code), str(fetch_interval))) ##### TEMP failed_job = True logger.error('error :: bq_backfill :: failed to POST data to flux, data: %s' % str(payload)) failed_job_err = 'the submission of metrics for %s, backfill job for %s failed' % ( str(fetch_interval), vista_bq_account_key) break # After send update the lists and sets last_submission_timestamp = int(time()) metrics_sent_in_batch = [metric_data['metric'] for metric_data in metrics_payload] for i_metric in metrics_sent_in_batch: metrics_sent.append(i_metric) if len(metrics_to_create) > 0: metrics_created.update(metrics_to_create) batch_created_metrics = batch_created_metrics + metrics_to_create if success: logger.info('bq_backfill :: %s metrics submitted to flux, total new batch_created_metrics: %s, total job metrics_created: %s' % ( str(len(metrics_sent_in_batch)), str(len(batch_created_metrics)), str(len(metrics_created)))) failed_count = 0 work_dict['intervals_fetched'] = intervals_fetched work_dict['intervals_fetched_count'] = len(intervals_fetched) work_dict['total_intervals_to_fetch_count'] = total_intervals_to_fetch work_dict['unique_metrics_count'] = len(unique_metrics) percent_complete = float(100 * (len(intervals_fetched) / total_intervals_to_fetch)) if metric_format == 'graphite': # Redis also needs to be populated so this is only half the work percent_complete = percent_complete / 2 work_dict['percent_complete'] = percent_complete work_dict['elasped_seconds'] = time() - start_backfill_job try: update_job_status(job_id, str(work_dict)) except Exception as err: logger.error('error :: bq_backfill :: update_job_status failed, job_id: %s, work_dict: %s, err: %s' % ( job_id, str(work_dict), err)) logger.info('bq_backfill :: %s BigQuery data metrics submitted to flux in %s seconds' % ( str(len(unique_metrics)), str(int(time()) - start_bq_to_flux))) # Populate Redis - the populate_redis_metric functions handles any sharding do_populate_redis = False submit_timestamps = [] if not dry_run and metric_format == 'graphite': do_populate_redis = True # Only populate Redis if the data is older than MAX_RESOLUTION # and would have been dropped by Horizon when Graphite pickled it on # from flux max_resolution_timestamp = int(time()) - settings.MAX_RESOLUTION submit_timestamps = [int(ts) for ts in sorted(list(timestamps_submitted)) if int(ts) <= max_resolution_timestamp] if not submit_timestamps: do_populate_redis = False if do_populate_redis: logger.info('bq_backfill :: sleeping for 30 seconds to allow Graphite to process the data') sleep(30) start_populate_redis = int(time()) use_from_timestamp = submit_timestamps[0] if replace_redis_timeseries: # Populate Redis with the full_duration from Graphite use_from_timestamp = until_timestamp - full_duration if until_timestamp == use_from_timestamp: use_from_timestamp = use_from_timestamp - 1 until_timestamp = until_timestamp + 1 logger.info('bq_backfill :: populating Redis with %s BigQuery data metrics with namespace_prefix: %s, from_timestamp: %s, until_timestamp: %s' % ( str(len(unique_metrics)), str(namespace_prefix), str(use_from_timestamp), str(until_timestamp))) if added_by == 'vista_bq_fetcher': replace_redis_timeseries = False for base_name in unique_metrics: populated_timeseries = [] use_base_name = str(base_name) if namespace_prefix: use_base_name = '%s.%s' % (namespace_prefix, base_name) # Surface the timeseries from the local Graphite because if this is # in a cluster it is possible that the remote graphite/webapp will # not have access to the metric until the remote carbon flushes it metric_timeseries = [] try: metric_timeseries = get_graphite_metric(parent_skyline_app, use_base_name, from_timestamp, until_timestamp, 'list', 'object') except Exception as err: logger.error('error :: bq_backfill :: get_graphite_metric failed, err: %s' % err) try: populated_timeseries = populate_redis_metric(parent_skyline_app, use_base_name, from_timestamp=use_from_timestamp, until_timestamp=until_timestamp, timeseries=metric_timeseries, replace=replace_redis_timeseries, verify_ssl=verify_ssl) except Exception as err: logger.error('error :: bq_backfill :: populate_redis_metric failed for %s, job_id: %s, err: %s' % ( base_name, job_id, err)) logger.info('bq_backfill :: populated Redis with %s data points for %s' % ( str(len(populated_timeseries)), use_base_name)) redis_populated_metrics.append(use_base_name) work_dict['percent_complete'] = (float(100 * (len(redis_populated_metrics) / len(unique_metrics))) / 2) + 50 work_dict['redis_metrics_populated'] = len(redis_populated_metrics) work_dict['elasped_seconds'] = time() - start_backfill_job try: update_job_status(job_id, str(work_dict)) except Exception as err: logger.error('error :: bq_backfill :: update_job_status failed, job_id: %s, work_dict: %s, err: %s' % ( job_id, str(work_dict), err)) logger.info('bq_backfill :: %s BigQuery data metrics populated to Redis in %s seconds' % ( str(len(unique_metrics)), str(int(time()) - start_populate_redis))) # @added 20240627 - Feature #5372: vista - bq_update reprocess_metrics = False if len(intervals_to_fetch) == 1 and added_by == 'vista_bq_update': today_date_fromat = datetime.fromtimestamp(int(time())).strftime(date_format) interval_to_fetch = intervals_to_fetch[0] if interval_to_fetch == today_date_fromat: reprocess_metrics = True if reprocess_metrics: # Add a reprocess job to ensure that the newly submitted metrics are # processed reprocess_url = '%s/reprocess_batch_metrics' % settings.SKYLINE_URL user = str(settings.WEBAPP_AUTH_USER) password = str(settings.WEBAPP_AUTH_USER_PASSWORD) fetch_interval_timestamp = intervals_to_fetch_timestamps[interval_to_fetch] update_namespace = bq_account_settings['batch_processing_namespace'] # Reprocess current data post_data = { 'data': { 'namespaces': [], 'metrics': metrics_sent, 'from_timestamp': fetch_interval_timestamp, 'until_timestamp': fetch_interval_timestamp, 'dry_run': False, 'oneshot': False, } } r = None try: headers = {"content-type": "application/json"} r = requests.post(reprocess_url, auth=(user, password), json=post_data, headers=headers, timeout=5, verify=verify_ssl) logger.info('bq_backfill :: reprocess API request for namespace %s for %s returned status_code: %s' % ( update_namespace, str(fetch_interval_timestamp), str(r.status_code))) logger.info('bq_backfill :: reprocess API request returned status_code: %s' % ( str(r.status_code))) if r.status_code != 200: logger.error('error :: bq_backfill :: reprocess request did not retrun 200 for %s , reason: %s' % ( update_namespace, str(r.reason))) except Exception as err: logger.error('error :: bq_backfill :: reprocess request failed for %s on %s, err: %s' % ( update_namespace, str(reprocess_url), err)) work_dict['status'] = 'complete' if failed_job: work_dict['status'] = 'failed' logger.error('error :: bq_backfill :: failed job, err: %s' % str(failed_job_err)) fail_job(job_id, work_dict, str(failed_job_err)) work_dict['percent_complete'] = 100.0 try: update_job_status(job_id, str(work_dict)) except Exception as err: logger.error('error :: bq_backfill :: update_job_status failed, job_id: %s, work_dict: %s, err: %s' % ( job_id, str(work_dict), err)) # @added 20240609 # sleep and then run updates? # updates = bq_update() try: redis_conn_decoded.delete('vista.bq_backfill.running') except Exception as err: logger.error('error :: bq_backfill :: failed to delete on vista.bq_backfill.running from Redis, err: %s' % err)