Source code for vista.bq_update

"""
bq_update.py
"""
import copy
import logging
import os
import sys
import traceback

from ast import literal_eval
from contextlib import nullcontext
from datetime import datetime, timezone
from time import time, sleep

import pandas as pd
import numpy as np
import requests

# For version pinning consider implementing a skyline-bq-py31013 environment
# and adding that to the path so that imports come from there rather than from
# the skyline-py31013 environment which has different versions of some libraries
# e.g. googleapis-common-protos

sys.path.insert(0, '/opt/skyline/github/skyline/skyline')
import settings
from skyline_functions import get_redis_conn_decoded, get_graphite_metric, mkdir_p
from functions.cluster.is_shard_metric import is_shard_metric
from functions.database.queries.get_all_inactive_db_metric_names import get_all_inactive_db_metric_names
from functions.database.queries.query_anomalies import get_anomalies_for_period
from functions.database.queries.set_metric_ids_as_active import set_metric_ids_as_active
from functions.database.queries.update_anomaly import update_anomaly
from functions.graphite.update_graphite_metric import update_graphite_metric
from functions.metrics.get_metric_id_from_base_name import get_metric_id_from_base_name
from functions.numpy.percent_different import get_percent_different
from functions.redis.populate_redis_metric import populate_redis_metric
from functions.settings.get_bq_accounts_settings import get_bq_accounts_settings
from functions.settings.get_external_settings import get_external_settings

from functions.vista.create_bq_archive import create_bq_archive

try:
    VISTA_BQ_ACCOUNTS = copy.deepcopy(settings.VISTA_BQ_ACCOUNTS)
except:
    VISTA_BQ_ACCOUNTS = {}
try:
    VISTA_BQ_VIRTUALENV_PATH = settings.VISTA_BQ_VIRTUALENV_PATH
except:
    VISTA_BQ_VIRTUALENV_PATH = None
if VISTA_BQ_VIRTUALENV_PATH:
    sys.path.insert(0, VISTA_BQ_VIRTUALENV_PATH)
    from functions.bigquery.bq_df_to_metrics import bq_df_to_metrics
    from functions.bigquery.bq_credentials import get_bq_credentials
    import pandas_gbq
else:
    bq_df_to_metrics = nullcontext()
    get_bq_credentials = nullcontext()
    get_bq_credentials = nullcontext()
    pandas_gbq = nullcontext()

try:
    HORIZON_SHARDS = copy.deepcopy(settings.HORIZON_SHARDS)
except:
    HORIZON_SHARDS = {}

parent_skyline_app = 'vista'
child_skyline_app = 'bq_update'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s' % parent_skyline_app
this_host = str(os.uname()[1])


# @added 20240617 - Feature #5372: vista - bq_update
#                   Feature #5352: vista - bigquery
[docs] def bq_update(bq_update_job_id): """ This function is used to get data from BigQuery and compare it to the last data fetched from BigQuery for the period (if an archive is available, if not an archive is created using data from Redis and/or Graphite). The archive/current values are compared to the values for the timestamp from the latest BigQuery data for the period and if the BigQuery value has changed, the values are updated Graphite and Redis. The metrics that have changed are then reprocessed up to the timestamp of the changed data and if no anomaly is detected for the updated timestamp and previously an anomaly WAS detected for the timestamp the existing anomaly is updated with the new value and the previous value and new value for the anomaly is recorded in the database anomalies_updated table. """ start_bq_update_job = time() try: redis_conn_decoded = get_redis_conn_decoded(skyline_app) except Exception as err: logger.error('error :: bq_update - redis connection failed, err: %s' % err) def remove_key(key): logger.info('bq_update :: removing %s from vista.bq_update.work' % key) try: redis_conn_decoded.hdel('vista.bq_update.work', key) except Exception as err: logger.error('error :: bq_update :: failed to hdel %s from vista.bq_update.work, err: %s' % ( key, err)) def fail_job(key, data, err_msg): remove_key(key) vista_bq_update_work = {} try: vista_bq_update_work = redis_conn_decoded.hgetall('vista.bq_update.work') except Exception as err: logger.error('error :: bq_update :: failed to hgetall on vista.bq_update.work, err: %s' % err) vista_bq_update_work = {} vista_bq_account_key = None from_timestamp = None until_timestamp = None interval = None replace_query_string = None date_format = None job_id = None work_dict = {} # Get the job from the work queue for job_id, work_dict_str in vista_bq_update_work.items(): if job_id != bq_update_job_id: continue try: work_dict = literal_eval(work_dict_str) except Exception as err: err_msg = 'bq_update :: failed to literal_eval work_dict' logger.error('error :: bq_update :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict_str, err_msg) work_dict = {} break try: vista_bq_account_key = work_dict['vista_bq_account_key'] except Exception as err: err_msg = 'failed to determine vista_bq_account_key from work_dict' logger.error('error :: bq_update :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) work_dict = {} break from_timestamp = None try: from_timestamp = work_dict['from_timestamp'] except: from_timestamp = None until_timestamp = None try: until_timestamp = work_dict['until_timestamp'] except: until_timestamp = None try: interval = work_dict['schedule_interval'] except Exception as err: err_msg = 'failed to determine schedule_interval from work_dict' logger.error('error :: bq_update :: %s:%s, err: %s' % ( err_msg, str(work_dict_str), err)) fail_job(job_id, work_dict, err_msg) work_dict = {} break try: replace_query_string = work_dict['update_replace_query'] except Exception as err: err_msg = 'failed to determine update_replace_query from work_dict' logger.error('error :: bq_update :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) work_dict = {} break try: date_format = work_dict['date_field_format'] except Exception as err: err_msg = 'failed to determine date_field_format from work_dict' logger.error('error :: bq_update :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err)) fail_job(job_id, work_dict, err_msg) work_dict = {} break verify_ssl = True try: verify_ssl = work_dict['verify_ssl'] except: verify_ssl = True break if len(work_dict) == 0: err_msg = 'failed to construct work_dict' logger.error('error :: bq_update :: %s: %s, for job_id: %s , err: %s' % ( err_msg, str(work_dict_str), str(job_id), err_msg)) fail_job(job_id, work_dict, err_msg) return started = int(time()) today_date_fromat = datetime.fromtimestamp(int(started)).strftime(date_format) work_dict['started'] = started job_key_dict = {'started': started, 'job_id': job_id, 'work_dict': work_dict} try: redis_conn_decoded.setex('vista.bq_update.running', 7200, str(job_key_dict)) # @modified 20241106 - Task #5526: Build v5.0.0 and upgrade deps # bandit - B608:hardcoded_sql_expressions logger.info('bq_update set vista.bq_update.running Redis key with %s' % str(job_key_dict)) # nosec B608 except Exception as err: logger.error('error :: bq_update :: failed to set on vista.bq_update.running from Redis, err: %s' % err) # nosec B608 """ # Example work_dict that is submitted via /bq_update by the webapp, direct # or by vista_bq_update work_data = { 'job_id': '1718967838.customer.account_activity', 'flux_api_token': 'xxxx', 'oauth_credentials': { 'token': 'xxxx', 'refresh_token': 'xxxx', 'token_uri': 'https://oauth2.googleapis.com/token', 'client_id': 'xxx.apps.googleusercontent.com', 'client_secret': 'xxx' }, 'oauth_credentials_file': '', 'service_credentials': {}, 'service_credentials_file': '', 'project_id': 'account-data-prod', 'query': 'SELECT * FROM `account-data-prod.skyline.account_activity` WHERE date = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)', 'metric_format': 'graphite', 'metric_prefix': 'customer.account_activity', 'date_field': 'date', 'date_field_format': '%Y-%m-%d', 'primary_dimension': 'acc_id', 'metric_dimensions': ['requests', 'clickthru', 'signups'], 'schedule': {'minute': 0, 'hour': 1, 'day_of_month': '*', 'month': '*', 'day_of_week': '*'}, 'schedule_interval': 86400, 'full_duration': 31536000, 'stale_period': 604800, 'resolution': 86400, 'batch_processing_namespace': 'customer.account_activity', 'non_derivative_namespaces': ['customer.account_activity'], 'zero_fill_analysis': True, 'update_data_period': 2592000, 'update_data_schedule': {'minute': 15, 'hour': [0,6,12,18], 'day_of_month': '*', 'month': '*', 'day_of_week': '*'}, 'update_replace_query': 'DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)', 'from_timestamp': 1717027200, 'until_timestamp': 1717027200, 'verify_ssl': False, 'added_by': 'webapp', 'vista_bq_account_key': 'customer.account_activity'} } """ # Remove the work remove_key(job_id) # Update the job status work_dict['status'] = 'processing' intervals_to_fetch_timestamps = {} resolution = work_dict['resolution'] if not until_timestamp: until_timestamp = int(time()) // resolution * resolution until_timestamp = until_timestamp // resolution * resolution if not from_timestamp: from_timestamp = until_timestamp - work_dict['update_data_period'] from_timestamp = from_timestamp // resolution * resolution intervals_to_fetch = [] end_timestamp = int(until_timestamp) + 1 ctime = int(from_timestamp) while ctime < end_timestamp: try: interval_date_fromat = datetime.fromtimestamp(int(ctime)).strftime(date_format) intervals_to_fetch.append(interval_date_fromat) intervals_to_fetch_timestamps[interval_date_fromat] = int(ctime) except Exception as err: logger.error('error :: bq_update :: failed to convert %s to date_format, err: %s' % ( str(ctime), err)) ctime = ctime + interval if ctime > end_timestamp: break work_dict['intervals_to_fetch'] = intervals_to_fetch current_period_timestamp = int(time()) // resolution * resolution vista_bq_accounts = {} try: vista_bq_accounts = get_bq_accounts_settings(skyline_app) except Exception as err: logger.error('error :: bq_update :: get_bq_accounts_settings failed, err: %s' % err) bq_account_settings = {} try: bq_account_settings = vista_bq_accounts[vista_bq_account_key] except Exception as err: logger.error('error :: bq_update :: failed to determine settings for %s, err: %s' % ( vista_bq_account_key, err)) if not bq_account_settings: err_msg = 'failed to determine settings for %s' % vista_bq_account_key logger.error('error :: bq_update :: %s' % err_msg) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err) return query = None try: query = bq_account_settings['query'] except Exception as err: err_msg = 'failed to determine query from settings for %s' % vista_bq_account_key logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err) return metric_format = 'graphite' try: metric_format = bq_account_settings['metric_format'] except Exception as err: logger.error('error :: bq_update :: failed to ddetermine metric_format from bq_account_settings defaulting to Graphite, err: %s' % err) metric_format = 'graphite' flux_api_token = None try: flux_api_token = bq_account_settings['flux_api_token'] except Exception as err: err_msg = 'failed to determine flux_api_token from settings for %s' % vista_bq_account_key logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err) return try: full_duration = bq_account_settings['full_duration'] except Exception as err: err_msg = 'failed to determine full_duration from settings for %s' % vista_bq_account_key logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err) return try: metric_prefix = bq_account_settings['metric_prefix'] except Exception as err: err_msg = 'failed to determine metric_prefix from settings for %s' % vista_bq_account_key logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err) return # @added 20240725 - Feature #5372: vista - bq_update # Handle BQ api fail and try use another node. BigQuery can deny by IP # it appears use_remote = False try: use_remote = bq_account_settings['use_remote'] except Exception as err: use_remote = False flux_key_namespace_prefixes = { settings.FLUX_SELF_API_KEY: None } for flux_token in settings.FLUX_API_KEYS: flux_key_namespace_prefixes[flux_token] = settings.FLUX_API_KEYS[flux_token] skyline_external_settings = {} try: skyline_external_settings = get_external_settings(parent_skyline_app, None, True) except Exception as err: logger.error('error :: bq_update :: get_external_settings failed, err: %s' % ( err)) if skyline_external_settings: for settings_key in list(skyline_external_settings.keys()): external_setting_keys = list(skyline_external_settings[settings_key].keys()) if 'flux_token' in external_setting_keys: flux_token = skyline_external_settings[settings_key]['flux_token'] if flux_token and flux_token not in flux_key_namespace_prefixes.keys(): flux_key_namespace_prefixes[flux_token] = skyline_external_settings[settings_key]['namespace'] namespace_prefix = None if flux_api_token: try: namespace_prefix = flux_key_namespace_prefixes[flux_api_token] except Exception as err: logger.error('error :: bq_update :: failed to determine flux namespace_prefix from flux_api_token, err: %s' % ( err)) namespace_prefix = None # Resources to determine what metric ids need to be set to active again all_db_metric_ids_with_names = {} try: all_db_metric_ids_with_names = redis_conn_decoded.hgetall('metrics_manager.all_db_metric_ids_with_names') except Exception as err: logger.error('error :: bq_update :: failed to determine flux namespace_prefix from flux_api_token, err: %s' % ( err)) all_base_names_with_ids = {} all_ids_with_base_names = {} for id_str, i_base_name in all_db_metric_ids_with_names.items(): metric_id = int(float(id_str)) all_base_names_with_ids[i_base_name] = metric_id all_ids_with_base_names[metric_id] = i_base_name # get inactive metric_ids inactive_metric_names_with_id = {} try: inactive_metric_names, inactive_metric_names_with_id = get_all_inactive_db_metric_names(skyline_app, with_ids=True) except Exception as err: logger.error('error :: bq_update :: get_all_inactive_db_metric_names failed, err: %s' % ( err)) inactive_metric_ids = set(list(inactive_metric_names_with_id.values())) metric_ids_to_set_as_active_again = [] # Credentials can be passed in a json file or as a dict (as from # external_settings) credentials = None use_credentials = None oauth_credentials = {} service_credentials = {} oauth_credentials_file = None service_credentials_file = None project_id = None if 'project_id' in bq_account_settings.keys(): project_id = bq_account_settings['project_id'] if 'oauth_credentials' in bq_account_settings.keys(): oauth_credentials = bq_account_settings['oauth_credentials'] if 'oauth_credentials_file' in bq_account_settings.keys(): oauth_credentials_file = bq_account_settings['oauth_credentials_file'] if 'service_credentials' in bq_account_settings.keys(): service_credentials = bq_account_settings['service_credentials'] if 'service_credentials_file' in bq_account_settings.keys(): service_credentials_file = bq_account_settings['service_credentials_file'] try: credentials = get_bq_credentials( skyline_app, credentials=use_credentials, project_id=project_id, oauth_credentials=oauth_credentials, oauth_credentials_file=oauth_credentials_file, service_credentials=service_credentials, service_credentials_file=service_credentials_file) except Exception as err: logger.error('error :: bq_update :: get_bq_credentials failed, err: %s' % ( err)) if not credentials: logger.error('error :: bq_update :: no credentials retrieved for %s' % vista_bq_account_key) err_msg = 'failed to get credentials for %s' % vista_bq_account_key logger.error('error :: bq_update :: %s' % err_msg) fail_job(job_id, work_dict, err_msg) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err) return else: use_credentials = credentials failed_job = False failed_job_err = None total_intervals_to_fetch = len(intervals_to_fetch) logger.info('bq_update :: %s intervals_to_fetch for %s' % ( str(total_intervals_to_fetch), vista_bq_account_key)) until_graphite_timestamp = int(started // interval * interval) from_graphite_timestamp = until_graphite_timestamp - full_duration reprocess_url = '%s/reprocess_batch_metrics' % settings.SKYLINE_URL user = str(settings.WEBAPP_AUTH_USER) password = str(settings.WEBAPP_AUTH_USER_PASSWORD) # if no archive and fetch_interval is today a backfill job will be added at # the end of the run add_backfill_job = None intervals_fetched = [] for fetch_interval in intervals_to_fetch: if failed_job: break start_fetch_interval = int(time()) metric_ids_to_set_as_active_again = [] fetch_interval_timestamp = intervals_to_fetch_timestamps[fetch_interval] # Search the path for the latest archive for the archive_data_search_path = '%s/vista/bq_archives/%s' % ( settings.SKYLINE_DIR, vista_bq_account_key) bq_archives = [] if os.path.isdir(archive_data_search_path): for dir_path, folders, files in os.walk(archive_data_search_path): try: if files: for i in files: path_and_file = '%s/%s' % (dir_path, i) if i.endswith('.csv'): bq_archives.append(path_and_file) except Exception as err: logger.error('error :: bq_update :: os.walk failed on %s, err: %s' % ( str(archive_data_search_path), err)) # Sort latest to oldest bq_archives = sorted(bq_archives, reverse=True) logger.info('bq_update :: %s bq_archives found' % (str(len(bq_archives)))) archive_csv_filename = '%s.%s.csv' % (vista_bq_account_key, str(fetch_interval)) archive_csv = None # Find latest archive if one exists for i in bq_archives: if archive_csv_filename in i: archive_csv = str(i) break # Only create the archive if the fetch_interval is not today's date create_archive = True if fetch_interval == today_date_fromat: if not archive_csv: create_archive = False add_backfill_job = str(fetch_interval) logger.info('bq_update :: no archive csv found for %s, which is the current period so not creating archive with Redis and Graphite data' % fetch_interval) logger.info('bq_update :: no archive csv found for current period %s so a backfill job will be added' % fetch_interval) if not archive_csv and create_archive: # GET THE DATA FROM REDIS (LOCAL) AND GRAPHITE (REMOTE) IF AN ARCHIVE DOES NOT EXIST ... AND MAKE THE ARCHIVE? # Use create_bq_archive function logger.info('bq_update :: no archive csv found, creating with Redis and Graphite data') archive_file = None try: metrics_data, archive_file = create_bq_archive(skyline_app, vista_bq_account_key, fetch_interval_timestamp) try: del metrics_data except: pass if os.path.isfile(archive_file): logger.info('bq_update :: created archive csv - %s' % archive_file) archive_csv = str(archive_file) except Exception as err: logger.error('error :: bq_update :: create_bq_archive failed for %s, err: %s' % ( str(fetch_interval_timestamp), err)) if not archive_csv and add_backfill_job: logger.info('bq_update :: no archive_csv was found or created for %s but backfill job will be added' % ( str(fetch_interval))) continue if not archive_csv: logger.error('error :: bq_update :: no archive_csv was found or created for %s, cannot proceed' % ( str(fetch_interval))) continue previous_df = None if archive_csv: if os.path.isfile(archive_csv): logger.info('bq_update :: loading data for %s from latest archive data file %s' % (str(fetch_interval), archive_csv)) try: previous_df = pd.read_csv(archive_csv) except Exception as err: err_msg = 'pd.read_csv failed with archive_csv: %s' % archive_csv logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) if previous_df is not None: logger.info('bq_update :: loaded %s data rows for %s from %s' % ( str(len(previous_df)), str(fetch_interval), archive_csv)) if previous_df is None: logger.error('error :: bq_update :: failed to load any data rows for %s from archive' % ( str(fetch_interval))) continue sql = str(query) try: unquoted_replace_string = str(fetch_interval) replace_string = "'%s'" % unquoted_replace_string sql = query.replace(replace_query_string, replace_string) except Exception as err: err_msg = 'failed to generate sql with replace_query_string and %s' % str(fetch_interval) logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) fail_job(job_id, work_dict, err_msg) failed_job = True failed_job_err = str(err_msg) break logger.info('bq_update :: fetching current data for %s with sql: %s' % (vista_bq_account_key, sql)) primary_dimension = bq_account_settings['primary_dimension'] metric_dimensions = bq_account_settings['metric_dimensions'] # Just to ensure the credentials are refreshed if they have expired # during the loop if use_credentials: try: credentials = get_bq_credentials( skyline_app, credentials=use_credentials, project_id=project_id, oauth_credentials=oauth_credentials, oauth_credentials_file=oauth_credentials_file, service_credentials=service_credentials, service_credentials_file=service_credentials_file) except Exception as err: logger.error('error :: bq_update :: get_bq_credentials failed, err: %s' % ( err)) if credentials: use_credentials = credentials # @added 20240725 - Feature #5372: vista - bq_update # Handle BQ api fail and try use another node. BigQuery can deny by IP # it appears pandas_gbq_error = False configuration = {'jobTimeoutMs': 120000, 'timeoutMs': 120000} df = None if df is None: try: # @modified 20240725 - Feature #5372: vista - bq_update # Added configuration # @modified 20240725 - Feature #5372: vista - bq_update # Only use pandas_gbq if use_remote has not be set if not use_remote: df = pandas_gbq.read_gbq(sql, project_id=bq_account_settings['project_id'], credentials=credentials, progress_bar_type=None, configuration=configuration) else: df = None except Exception as err: err_msg = 'pandas_gbq.read_gbq failed with sql: %s' % sql logger.error(traceback.format_exc()) logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) # @modified 20240725 - Feature #5372: vista - bq_update # Handle BQ api fail and try use another node. BigQuery can deny by IP # it appears #fail_job(job_id, work_dict, err_msg) #failed_job = True #break pandas_gbq_error = True if HORIZON_SHARDS: use_remote = True tmp_archive_csv = None try_remote = False if pandas_gbq_error and HORIZON_SHARDS: try_remote = True if use_remote: try_remote = True if try_remote: archive_csv_url = None logger.info('bq_update :: requesting bq_query from other cluster nodes') for remote_url, remote_user, remote_password, shostname in settings.REMOTE_SKYLINE_INSTANCES: if shostname == this_host: continue if archive_csv_url: break try: # Although a POST request query parameters are added for tracking # and debugging in the log shard_url = '%s/bq_query' % remote_url except Exception as err: logger.error('error :: bq_update :: failed to construct the shard_url for %s, err: %s' % ( shostname, err)) data_dict = { 'data': { 'vista_bq_account_key': vista_bq_account_key, 'sql': sql, 'host': this_host, 'cluster_data': False, 'cluster_call': False, } } logger.info('bq_update :: requesting %s with data_dict: %s' % (shard_url, str(data_dict))) r = None try: headers = {"content-type": "application/json"} r = requests.post(shard_url, auth=(remote_user, remote_password), json=data_dict, headers=headers, timeout=125, verify=settings.VERIFY_SSL) logger.info('bq_update :: %s returned status_code: %s' % ( shard_url, str(r.status_code))) except Exception as err: logger.error('error :: bq_update :: request failed on %s, err: %s' % ( str(shard_url), err)) if r: try: response_json = r.json() archive_csv_url = response_json['data']['archive_csv_url'] tmp_archive_csv = response_json['data']['archive_csv'] logger.info('bq_update :: %s response: %s' % ( shostname, str(response_json))) except Exception as err: logger.error('error :: bq_update :: failed to determine archive_csv_url from %s response, err: %s' % ( shostname, err)) if archive_csv_url: break if archive_csv_url: r = None try: r = requests.get(archive_csv_url, auth=(remote_user, remote_password), timeout=125, verify=settings.VERIFY_SSL) logger.info('bq_update :: %s returned status_code: %s' % ( shostname, str(r.status_code))) except Exception as err: logger.error('error :: bq_update :: request failed on %s, err: %s' % ( str(archive_csv_url), err)) if r: # Check if the request was successful (status code 200) if r.status_code == 200: # Save the content of the response to a local CSV file with open(tmp_archive_csv, 'wb') as fh: fh.write(r.content) logger.info('bq_update :: saved %s' % ( tmp_archive_csv)) else: logger.error('error :: bq_update :: failed to download and save %s' % ( str(tmp_archive_csv))) if tmp_archive_csv: try: df = pd.read_csv(tmp_archive_csv) logger.info('bq_update :: read %s rows from %s' % ( str(len(df)), tmp_archive_csv)) except Exception as err: logger.error('error :: bq_update :: failed to load %s, err: %s' % ( str(tmp_archive_csv), err)) if df is None: err_msg = 'no data fetched' fail_job(job_id, work_dict, err_msg) failed_job = True break intervals_fetched.append(fetch_interval) logger.info('bq_update :: %s rows returned for interval: %s' % ( str(len(df)), str(fetch_interval))) previous_data = {} for index, data in previous_df.to_dict('index').items(): try: acc_id = data[primary_dimension] except KeyError: continue previous_data[acc_id] = {} for metric_dimension in metric_dimensions: try: previous_data[acc_id][metric_dimension] = data[metric_dimension] except KeyError: continue current_data = {} for index, data in df.to_dict('index').items(): try: acc_id = data[primary_dimension] except KeyError: continue current_data[acc_id] = {} for metric_dimension in metric_dimensions: try: current_data[acc_id][metric_dimension] = data[metric_dimension] except KeyError: continue updates = {} big_changes = {} d = fetch_interval different = {} new_data = {} if d not in big_changes.keys(): big_changes[fetch_interval] = {} for acc_id, cdata in current_data.items(): try: odata = {} try: odata = previous_data[acc_id] except: odata = {} if len(odata) == 0: new_data[acc_id] = {fetch_interval_timestamp: {}} for key in cdata.keys(): if str(cdata[key]) == 'nan': continue metric = '%s.%s.%s' % (metric_prefix, acc_id, str(key)) base_name = '%s.%s' % (namespace_prefix, metric) new_data[acc_id][fetch_interval_timestamp][metric] = cdata[key] metric_id = None try: metric_id = all_base_names_with_ids[base_name] except: pass if metric_id: if metric_id in inactive_metric_ids: metric_ids_to_set_as_active_again.append(metric_id) continue if str(cdata) != str(odata): different[acc_id] = {fetch_interval_timestamp: {}} for key, value in cdata.items(): if str(value) != str(odata[key]): if isinstance(value, float): if isinstance(odata[key], float): close = np.allclose([value], [odata[key]]) if close: continue if str(value) == 'nan': continue metric = '%s.%s.%s' % (metric_prefix, acc_id, str(key)) base_name = '%s.%s' % (namespace_prefix, metric) metric_id = None try: metric_id = all_base_names_with_ids[base_name] except: pass if metric_id: if metric_id in inactive_metric_ids: metric_ids_to_set_as_active_again.append(metric_id) different[acc_id][fetch_interval_timestamp][metric] = {} different[acc_id][fetch_interval_timestamp][metric]['current'] = value different[acc_id][fetch_interval_timestamp][metric]['previous'] = odata[key] if isinstance(value, float) and isinstance(odata[key], float): percent_different = None if odata[key] == 0 and str(value) != 'nan': percent_different = 100 if str(value) == 'nan' and odata[key] != 0: percent_different = 100 if str(value) == 'nan' and odata[key] == 0: percent_different = 0 if value == 0 and str(odata[key]) == 'nan': percent_different = 0 if value == 0 and str(odata[key]) != 'nan': percent_different = 100 if percent_different is None: percent_different = get_percent_different(value, odata[key], True) # print('get_percent_different', value, odata[key]) different[acc_id][fetch_interval_timestamp][metric]['percent_different'] = percent_different if percent_different: if percent_different > 15: if acc_id not in big_changes[d].keys(): big_changes[d][acc_id] = {fetch_interval_timestamp: {}} big_changes[d][acc_id][fetch_interval_timestamp][metric] = {'current': value, 'previous': odata[key], '%diff': percent_different} if len(different[acc_id][fetch_interval_timestamp]) == 0 and len(different[acc_id][fetch_interval_timestamp]) == 0: del different[acc_id][fetch_interval_timestamp] except Exception as err: logger.error('error :: bq_update :: failed to determine differences between archive and current data for %s, err: %s' % ( str(acc_id), err)) for acc_id in list(different.keys()): if len(different[acc_id]) == 0: del different[acc_id] significant_changes = len(big_changes[d]) if not significant_changes: del big_changes[d] logger.info('bq_update :: %s - new: %s, updated: %s, significant_changes: %s' % ( fetch_interval, str(len(new_data)), str(len(different)), str(significant_changes))) if len(new_data) > 0: updates[fetch_interval] = {'new': new_data} if len(different) > 0: try: updates[fetch_interval]['updates'] = different except: updates[fetch_interval] = {'updates': different} if len(metric_ids_to_set_as_active_again): logger.info('bq_update :: %s metrics to set as active' % str(len(metric_ids_to_set_as_active_again))) # nosec B608 set_metrics_as_active_count = None try: set_metrics_as_active_count = set_metric_ids_as_active(skyline_app, metric_ids_to_set_as_active_again) except Exception as err: logger.error('error :: bq_update :: set_metric_ids_as_active failed - %s' % ( str(err))) # nosec B608 logger.info('bq_update :: %s metrics set as active' % str(set_metrics_as_active_count)) # nosec B608 # Do all Graphite updates first and then sleep once to allow Graphite # write and relay to happen, rather than sleeping per metric for acc_id in new_data.keys(): for fetch_interval_timestamp in new_data[acc_id].keys(): for metric in new_data[acc_id][fetch_interval_timestamp]: base_name = '%s.%s' % (namespace_prefix, metric) value = new_data[acc_id][fetch_interval_timestamp][metric] # Update Graphite via the relay line protocol port graphite_updated = False try: graphite_updated = update_graphite_metric(skyline_app, base_name, fetch_interval_timestamp, value) except Exception as err: logger.error('error :: bq_update :: update_graphite_metric failed for %s, timestamp: %s, value: %s, err: %s' % ( base_name, str(fetch_interval_timestamp), str(value), err)) logger.info('bq_update :: new timestamp: %s and value: %s for %s, graphite_updated: %s' % ( str(fetch_interval_timestamp), str(value), base_name, str(graphite_updated))) if graphite_updated: # As to not swamp Graphite sleep(0.05) for acc_id in list(different.keys()): for fetch_interval_timestamp in different[acc_id].keys(): for metric in different[acc_id][fetch_interval_timestamp]: base_name = '%s.%s' % (namespace_prefix, metric) value = different[acc_id][fetch_interval_timestamp][metric]['current'] previous_value = different[acc_id][fetch_interval_timestamp][metric]['previous'] # Update Graphite via the relay line protocol port graphite_updated = False try: graphite_updated = update_graphite_metric(skyline_app, base_name, fetch_interval_timestamp, value) except Exception as err: logger.error('error :: bq_update :: update_graphite_metric failed for %s, timestamp: %s, value: %s, err: %s' % ( base_name, str(fetch_interval_timestamp), str(value), err)) logger.info('bq_update :: timestamp: %s, value: %s (previous value: %s) for %s, graphite_updated: %s' % ( str(fetch_interval_timestamp), str(value), str(previous_value), base_name, str(graphite_updated))) if graphite_updated: # As to not swamp Graphite sleep(0.05) # sleep for 5 to 60 seconds to allow Graphite write and the relay to # happen if there are many updates sleep_for = 0 if len(new_data) > 0 or len(different) > 0: sleep_for = 5 if len(new_data) > 10 or len(different) > 10: sleep_for = 10 if len(new_data) > 30 or len(different) > 30: sleep_for = 20 if len(new_data) > 100 or len(different) > 100: sleep_for = 60 if sleep_for > 1: # @added 20240703 - Allow for more update checks to be scheduled # but only run one per hour last_bq_update_key = 'vista.bq_update.%s.last_update' % vista_bq_account_key try: redis_conn_decoded.setex(last_bq_update_key, 3600, int(time())) logger.info('bq_update :: set %s Redis key' % str(last_bq_update_key)) # nosec B608 except Exception as err: logger.error('error :: bq_fetcher :: failed to set %s, err: %s' % ( str(last_bq_update_key), err)) logger.info('bq_update :: sleeping for %ss to allow Graphite to write and replicate' % str(sleep_for)) sleep(sleep_for) for acc_id in new_data.keys(): for fetch_interval_timestamp in new_data[acc_id].keys(): for metric in new_data[acc_id][fetch_interval_timestamp]: base_name = '%s.%s' % (namespace_prefix, metric) value = new_data[acc_id][fetch_interval_timestamp][metric] # Do not reprocess if the new data point has a timestamp # for the current period and will be analysed normally via # analyzer_batch because it is a new data point if int(fetch_interval_timestamp) == current_period_timestamp: continue # Repopulate Redis until_graphite_timestamp = int(started // interval * interval) from_graphite_timestamp = until_graphite_timestamp - full_duration # Surface the timeseries from the local Graphite because if this is # in a cluster it is possible that the remote graphite/webapp will # not have access to the metric until the remote carbon flushes it metric_timeseries = [] try: metric_timeseries = get_graphite_metric(skyline_app, base_name, from_graphite_timestamp, until_graphite_timestamp, 'list', 'object') except Exception as err: logger.error('error :: bq_update :: get_graphite_metric failed, err: %s' % err) replace_redis_timeseries = True try: populated_timeseries = populate_redis_metric(skyline_app, base_name, from_timestamp=from_graphite_timestamp, until_timestamp=until_graphite_timestamp, timeseries=metric_timeseries, replace=replace_redis_timeseries, verify_ssl=verify_ssl) if len(populated_timeseries) > 0: logger.info('bq_update :: repopulated Redis time series key for %s with %s data points, populated_timeseries[-1]: %s' % ( base_name, str(len(populated_timeseries)), str(populated_timeseries[-1]))) except Exception as err: logger.error('error :: bq_update :: populate_redis_metric failed for %s, job_id: %s, err: %s' % ( base_name, job_id, err)) if HORIZON_SHARDS: shard_metric = is_shard_metric(base_name) if shard_metric: # sleep to allow the shard host to finalise new # Redis timeseries sleep(1.0) # Reprocess new data post_data = { 'data': { 'namespaces': base_name, 'from_timestamp': fetch_interval_timestamp, 'until_timestamp': fetch_interval_timestamp, 'dry_run': False, 'oneshot': False, } } r = None try: headers = {"content-type": "application/json"} r = requests.post(reprocess_url, auth=(user, password), json=post_data, headers=headers, timeout=5, verify=verify_ssl) logger.info('bq_update :: reprocess API request for %s returned status_code: %s' % ( base_name, str(r.status_code))) logger.info('bq_update :: reprocess API request for %s at %s, for new value: %s, returned status_code: %s' % ( base_name, str(fetch_interval_timestamp), str(value), str(r.status_code))) except Exception as err: logger.error('error :: bq_update :: reprocess request failed for %s on %s, err: %s' % ( base_name, str(reprocess_url), err)) anomalies_updated = {} for acc_id in list(different.keys()): for fetch_interval_timestamp in different[acc_id].keys(): for metric in different[acc_id][fetch_interval_timestamp]: base_name = '%s.%s' % (namespace_prefix, metric) value = different[acc_id][fetch_interval_timestamp][metric]['current'] previous_value = different[acc_id][fetch_interval_timestamp][metric]['previous'] # Repopulate Redis with all the data until_graphite_timestamp = int(started // interval * interval) from_graphite_timestamp = until_graphite_timestamp - full_duration # Surface the timeseries from the local Graphite because if this is # in a cluster it is possible that the remote graphite/webapp will # not have access to the metric until the remote carbon flushes it metric_timeseries = [] try: metric_timeseries = get_graphite_metric(skyline_app, base_name, from_graphite_timestamp, until_graphite_timestamp, 'list', 'object') except Exception as err: logger.error('error :: bq_update :: get_graphite_metric failed, err: %s' % err) if len(metric_timeseries) > 0: logger.info('bq_update :: get_graphite_metric for %s at %s with previous_value: %s, new value: %s, metric_timeseries[-3:]: %s' % ( base_name, str(fetch_interval_timestamp), str(previous_value), str(value), str(metric_timeseries[-3:]))) else: logger.warning('warning :: bq_update :: get_graphite_metric for %s at %s with previous_value: %s, new value: %s, no time series from Graphite, len(metric_timeseries): %s' % ( base_name, str(fetch_interval_timestamp), str(previous_value), str(value), str(len(metric_timeseries)))) replace_redis_timeseries = True populated_timeseries = None if len(metric_timeseries) > 0: try: populated_timeseries = populate_redis_metric(skyline_app, base_name, from_timestamp=from_graphite_timestamp, until_timestamp=until_graphite_timestamp, timeseries=metric_timeseries, replace=replace_redis_timeseries, verify_ssl=verify_ssl) if len(populated_timeseries) > 0: logger.info('bq_update :: repopulated Redis time series key for %s with %s data points, populated_timeseries[-1]: %s' % ( base_name, str(len(populated_timeseries)), str(populated_timeseries[-1]))) except Exception as err: logger.error('error :: bq_update :: populate_redis_metric failed for %s, job_id: %s, err: %s' % ( base_name, job_id, err)) # Determine if there is an anomaly to update try: metric_id = get_metric_id_from_base_name(skyline_app, base_name) except Exception as err: logger.error('error :: bq_update :: get_metric_id_from_base_name failed with metric: %s, err: %s' % ( str(base_name), err)) anomalies_dict = {} try: anomalies_dict = get_anomalies_for_period(skyline_app, [metric_id], (fetch_interval_timestamp - 5), (fetch_interval_timestamp + 5)) except Exception as err: logger.error('error :: bq_update :: get_anomalies_for_period failed for metric_id %s, err: %s' % ( str(metric_id), err)) if len(anomalies_dict) > 0: for anomaly_id in anomalies_dict.keys(): logger.error('bq_update :: updating anomaly id: %s for %s with new value' % ( str(anomaly_id), base_name)) anomaly_updated = False try: field = 'anomalous_datapoint' anomaly_updated = update_anomaly(skyline_app, anomaly_id, field, value) if anomaly_updated: logger.info('bq_update :: anomaly_updated: %s' % ( str(anomaly_updated))) except Exception as err: logger.error('error :: bq_update :: update_anomaly failed for anomaly id: %s, err: %s' % ( str(anomaly_id), err)) if anomaly_updated: anomalies_updated[anomaly_id] = {'previous_value': anomalies_dict[anomaly_id]['anomalous_datapoint'], 'new_value': value} else: logger.info('bq_update :: no anomaly to update for %s at %s' % ( base_name, str(fetch_interval_timestamp))) # Reprocess if significantly different. What is a # significant difference? percent_different = different[acc_id][fetch_interval_timestamp][metric]['percent_different'] if percent_different > 15 and len(metric_timeseries) > 0: if HORIZON_SHARDS: shard_metric = is_shard_metric(base_name) if shard_metric: # sleep to allow the shard host to finalise new # Redis timeseries sleep(1.0) post_data = { 'data': { 'namespaces': base_name, 'from_timestamp': fetch_interval_timestamp, 'until_timestamp': fetch_interval_timestamp, 'dry_run': False, 'oneshot': False, } } r = None try: headers = {"content-type": "application/json"} r = requests.post(reprocess_url, auth=(user, password), json=post_data, headers=headers, timeout=5, verify=verify_ssl) logger.info('bq_update :: reprocess API request for %s at %s because percent_different: %s, previous_value: %s, new_value: %s, returned status_code: %s' % ( base_name, str(fetch_interval_timestamp), str(percent_different), str(previous_value), str(value), str(r.status_code))) except Exception as err: logger.error('error :: bq_update :: reprocess request failed for %s on %s, err: %s' % ( base_name, str(reprocess_url), err)) if len(anomalies_updated) > 0: try: updates[fetch_interval]['anomalies_updated'] = copy.deepcopy(anomalies_updated) except: updates[fetch_interval] = {'anomalies_updated': copy.deepcopy(anomalies_updated)} # Save the latest archive data # The date_folder must be the date and time it was downloaded not the # fetch_interval date_folder_ts = int(start_bq_update_job) date_folder = datetime.fromtimestamp(date_folder_ts, tz=timezone.utc).strftime('%Y%m%d%H%M%S') archive_data_path = '%s/vista/bq_archives/%s/%s' % ( settings.SKYLINE_DIR, vista_bq_account_key, date_folder) new_archive_csv = '%s/%s' % (str(archive_data_path), archive_csv_filename) # Only save a new archive if it was different from the last archive if df is not None and len(updates) > 0: if not os.path.isdir(archive_data_path): try: mkdir_p(archive_data_path) except Exception as err: err_msg = 'failed to create archive_data_path: %s' % archive_data_path logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) if not os.path.isfile(new_archive_csv): logger.info('bq_update :: archiving data for %s to %s' % (str(fetch_interval), new_archive_csv)) try: df.to_csv(new_archive_csv, index=False) except Exception as err: err_msg = 'df.to_csv failed to create archive_csv: %s' % new_archive_csv logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) if os.path.isfile(new_archive_csv): logger.info('bq_update :: archived data for %s to %s' % (str(fetch_interval), new_archive_csv)) else: logger.error('error :: bq_update :: df.to_csv failed to create archive_csv: %s' % new_archive_csv) if len(updates) > 0: hash_key = 'vista.bq_update.%s.%s.%s' % (vista_bq_account_key, str(fetch_interval), date_folder) changed_count = 0 try: changed_count = len(different) except: changed_count = 0 new_count = 0 try: new_count = len(new_data) except: new_count = 0 anomaly_updated_count = 0 try: anomaly_updated_count = len(anomalies_updated) except: anomaly_updated_count = 0 try: redis_conn_decoded.hset(hash_key, 'anomaly_updated_count', anomaly_updated_count) redis_conn_decoded.hset(hash_key, 'changed_count', changed_count) redis_conn_decoded.hset(hash_key, 'new_count', new_count) redis_conn_decoded.hset(hash_key, 'new', str(new_data)) redis_conn_decoded.hset(hash_key, 'changed', str(different)) redis_conn_decoded.hset(hash_key, 'anomalies_updated', str(anomalies_updated)) redis_conn_decoded.expire(hash_key, 2592000) logger.info('bq_update :: created %s' % hash_key) except Exception as err: logger.error('error :: bq_update :: failed to create Redis hash %s, err: %s' % ( hash_key, err)) if add_backfill_job: logger.info('bq_update :: no archive found for current date adding bq_backfill job for %s %s data' % ( vista_bq_account_key, add_backfill_job)) bq_backfill_url = '%s/bq_backfill' % (settings.SKYLINE_URL) bq_update_url = '%s/bq_update' % (settings.SKYLINE_URL) user = str(settings.WEBAPP_AUTH_USER) password = str(settings.WEBAPP_AUTH_USER_PASSWORD) # Submit a bq_backfill job try: interval = int(bq_account_settings['schedule_interval']) except Exception as err: logger.error('error :: bq_update :: could not determine schedule_interval from %s, err: %s' % ( vista_bq_account_key, err)) backfill_from_timestamp = int(time()) // interval * interval post_data = {} try: post_data = { 'data': { 'bq_backfill': True, 'vista_bq_account_key': vista_bq_account_key, 'from_timestamp': backfill_from_timestamp, # Also use the from_timestamp for the until_timestamp in a backfill job 'until_timestamp': backfill_from_timestamp, 'interval': interval, 'date_format': bq_account_settings['date_field_format'], 'replace_query_string': bq_account_settings['update_replace_query'], 'zero_fill': False, 'replace_redis_timeseries': False, 'max_creates_per_minute': 1500, 'dry_run': False, 'flux_test': False, 'added_by': 'vista_bq_update', } } except Exception as err: logger.error('error :: bq_update :: failed to build post_data for bq_backfill job, err: %s' % err) r = None try: headers = {"content-type": "application/json"} r = requests.post(bq_backfill_url, auth=(user, password), json=post_data, headers=headers, timeout=5, verify=settings.VERIFY_SSL) logger.info('bq_update :: bq_backfill API request made for %s, returned status_code: %s' % ( vista_bq_account_key, str(r.status_code))) except Exception as err: logger.error('error :: bq_update :: bq_backfill request failed for %s on %s, err: %s' % ( vista_bq_account_key, str(bq_backfill_url), err)) response = None if r: try: response = r.json() except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: bq_update :: failed to parse json response from %s, err: %s' % ( str(bq_backfill_url), err)) if response: logger.info('bq_update :: bq_backfill_job submitted for %s, response: %s' % ( vista_bq_account_key, str(response))) else: logger.error('error :: bq_update :: bq_backfill job submitted for %s but no response' % ( vista_bq_account_key)) try: redis_conn_decoded.delete('vista.bq_update.running') except Exception as err: logger.error('error :: bq_update :: failed to delete on vista.bq_update.running from Redis, err: %s' % err)