Source code for webapp.add_bq_backfill_job

"""
add_bq_backfill_job.py
"""
import copy
import logging
import os

from datetime import datetime
from time import time
from flask import request

import settings
from skyline_functions import get_redis_conn_decoded, mkdir_p
from functions.settings.get_bq_accounts_settings import get_bq_accounts_settings


# @added 20240515 - Feature #5352: vista - bigquery
[docs] def add_bq_backfill_job(current_skyline_app): """ Return a dict with the backfill job details. :param current_skyline_app: the app calling the function :type current_skyline_app: str :return: bq_backfill_job_dict :rtype: dict """ function_str = 'add_bq_backfill_job' bq_backfill_job_dict = {} current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) vista_bq_account_key = None # Test whether form or json POST form_data = False try: vista_bq_account_key = request.form['vista_bq_account_key'] if vista_bq_account_key: form_data = True current_logger.info('add_bq_backfill_job, form POST') current_logger.info('add_bq_backfill_job - vista_bq_account_key passed: %s' % str(vista_bq_account_key)) except: current_logger.info('add_bq_backfill_job no form data trying json') post_data = {} if not form_data: try: post_data = request.get_json() except Exception as err: current_logger.error('error :: add_bq_backfill_job - no POST data - %s' % ( err)) current_logger.info('add_bq_backfill_job, return 400 no POST data') bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no post data' return bq_backfill_job_dict try: vista_bq_account_key = post_data['data']['vista_bq_account_key'] if vista_bq_account_key: current_logger.info('add_bq_backfill_job - vista_bq_account_key passed: %s' % str(vista_bq_account_key)) except KeyError: vista_bq_account_key = None except Exception as err: vista_bq_account_key = None current_logger.error('error :: add_bq_backfill_job - evaluation of post_data[\'data\'][\'vista_bq_account_key\'] failed - %s' % ( err)) if not vista_bq_account_key: current_logger.info('add_bq_backfill_job, return 400 no vista_bq_account_key determined') bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no vista_bq_account_key argument' return bq_backfill_job_dict from_timestamp = None try: if form_data: from_timestamp = int(request.form['from_timestamp']) else: from_timestamp = int(post_data['data']['from_timestamp']) current_logger.info('add_bq_backfill_job :: with from_timestamp: %s' % str(from_timestamp)) bq_backfill_job_dict['from_timestamp'] = from_timestamp except Exception as err: current_logger.error('error :: add_bq_backfill_job :: no valid from_timestamp argument, err: %s' % err) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no valid from_timestamp argument' return bq_backfill_job_dict until_timestamp = None try: if form_data: until_timestamp = int(request.form['until_timestamp']) else: until_timestamp = int(post_data['data']['until_timestamp']) current_logger.info('add_bq_backfill_job :: with until_timestamp: %s' % str(until_timestamp)) bq_backfill_job_dict['until_timestamp'] = until_timestamp except Exception as err: current_logger.error('error :: add_bq_backfill_job :: no valid until_timestamp argument, err: %s' % err) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no valid until_timestamp argument' return bq_backfill_job_dict interval = None try: if form_data: interval = int(request.form['interval']) else: interval = int(post_data['data']['interval']) current_logger.info('add_bq_backfill_job :: with interval: %s' % str(interval)) bq_backfill_job_dict['interval'] = interval except Exception as err: current_logger.error('error :: add_bq_backfill_job :: no valid interval argument, err: %s' % err) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no valid interval argument' return bq_backfill_job_dict date_format = None try: if form_data: date_format = request.form['date_format'] else: date_format = post_data['data']['date_format'] current_logger.info('add_bq_backfill_job :: with date_format: %s' % str(date_format)) bq_backfill_job_dict['date_format'] = date_format except Exception as err: current_logger.error('error :: add_bq_backfill_job :: no valid date_format argument, err: %s' % err) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no valid date_format argument' return bq_backfill_job_dict # Validate date_format valid_date_format = False try: now = datetime.now() valid_date_format = now.strftime(date_format) except Exception as err: current_logger.error('error :: add_bq_backfill_job :: invalid date_format argument, err: %s' % err) valid_date_format = False if not valid_date_format: bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'an invalid date_format argument was passed' return bq_backfill_job_dict replace_query_string = None try: if form_data: replace_query_string = request.form['replace_query_string'] else: replace_query_string = post_data['data']['replace_query_string'] current_logger.info('add_bq_backfill_job :: with replace_query_string: %s' % str(replace_query_string)) bq_backfill_job_dict['replace_query_string'] = replace_query_string except Exception as err: current_logger.error('error :: add_bq_backfill_job :: no valid replace_query_string argument, err: %s' % err) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no valid replace_query_string argument' return bq_backfill_job_dict zero_fill = True try: if form_data: zero_fill_string = request.form['zero_fill'] else: zero_fill_string = post_data['data']['zero_fill'] if zero_fill_string == 'false': zero_fill = False current_logger.info('add_bq_backfill_job :: with zero_fill: %s' % str(zero_fill)) bq_backfill_job_dict['zero_fill'] = zero_fill except: zero_fill = True replace_redis_timeseries = False try: if form_data: replace_redis_timeseries_string = request.form['replace_redis_timeseries'] else: replace_redis_timeseries_string = post_data['data']['replace_redis_timeseries'] if replace_redis_timeseries_string == 'true': replace_redis_timeseries = False current_logger.info('add_bq_backfill_job :: with replace_redis_timeseries: %s' % str(replace_redis_timeseries)) bq_backfill_job_dict['replace_redis_timeseries'] = replace_redis_timeseries except: dry_run = True max_creates_per_minute = 500 try: if form_data: max_creates_per_minute = int(request.form['max_creates_per_minute']) else: max_creates_per_minute = int(post_data['data']['max_creates_per_minute']) current_logger.info('add_bq_backfill_job :: with max_creates_per_minute: %s' % str(max_creates_per_minute)) bq_backfill_job_dict['max_creates_per_minute'] = max_creates_per_minute except: max_creates_per_minute = 500 dry_run = True try: if form_data: dry_run_string = request.form['dry_run'] else: dry_run_string = post_data['data']['dry_run'] if dry_run_string == 'false': dry_run = False current_logger.info('add_bq_backfill_job :: with dry_run: %s' % str(dry_run)) bq_backfill_job_dict['dry_run'] = dry_run except: dry_run = True flux_test = False try: if form_data: flux_test_string = request.form['flux_test'] else: flux_test_string = post_data['data']['flux_test'] if flux_test_string == 'true': flux_test = True current_logger.info('add_bq_backfill_job :: with flux_test: %s' % str(flux_test)) bq_backfill_job_dict['flux_test'] = flux_test except: flux_test = False external_progress_url = None try: if form_data: external_progress_url = request.form['external_progress_url'] else: external_progress_url = post_data['data']['external_progress_url'] if external_progress_url: current_logger.info('add_bq_backfill_job :: with external_progress_url: %s' % str(external_progress_url)) bq_backfill_job_dict['external_progress_url'] = external_progress_url except: external_progress_url = None verify_ssl = settings.VERIFY_SSL try: if form_data: verify_ssl_string = request.form['verify_ssl'] else: verify_ssl_string = post_data['data']['verify_ssl'] if verify_ssl_string == 'false': verify_ssl = False current_logger.info('add_bq_backfill_job :: with verify_ssl: %s' % str(verify_ssl)) bq_backfill_job_dict['verify_ssl'] = verify_ssl except: verify_ssl = settings.VERIFY_SSL try: if form_data: added_by = request.form['added_by'] else: added_by = post_data['data']['added_by'] current_logger.info('add_bq_backfill_job :: with added_by: %s' % str(added_by)) bq_backfill_job_dict['added_by'] = added_by except Exception as err: current_logger.warning('warning :: add_bq_backfill_job :: added_by was not passed, err: %s' % ( err)) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'no valid added_by argument' return bq_backfill_job_dict use_archive_data = True try: if form_data: use_archive_data_string = request.form['use_archive_data'] else: use_archive_data_string = post_data['data']['use_archive_data'] if use_archive_data_string == 'false': use_archive_data = False current_logger.info('add_bq_backfill_job :: with use_archive_data: %s' % str(use_archive_data)) bq_backfill_job_dict['use_archive_data'] = use_archive_data except: use_archive_data = True vista_bq_accounts = {} try: vista_bq_accounts = get_bq_accounts_settings(current_skyline_app) except Exception as err: current_logger.error('error :: add_bq_backfill_job :: get_bq_accounts_settings failed, err: %s' % err) # Validate the vista_bq_account_key vista_bq_account = {} try: vista_bq_account = copy.deepcopy(vista_bq_accounts[vista_bq_account_key]) if vista_bq_account: bq_backfill_job_dict['vista_bq_account'] = vista_bq_account_key except Exception as err: current_logger.error('error :: add_bq_backfill_job :: invalid vista_bq_account_key parameter, does not exist, err: %s' % err) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'invalid vista_bq_account_key passed, does not exist' return bq_backfill_job_dict # Validate that BigQuery account is disabled in Vista if 'schedule' not in vista_bq_account.keys(): current_logger.error('error :: add_bq_backfill_job :: schedule is not defined in the bq_account settings for %s' % vista_bq_account_key) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'schedule is not defined in the bq_account settings' return bq_backfill_job_dict schedule_disabled = True for key, value in vista_bq_account['schedule'].items(): if value != 0: schedule_disabled = False if not schedule_disabled and added_by not in ['vista_bq_update','webapp']: current_logger.error('error :: add_bq_backfill_job :: schedule is not disabeld in the bq_account settings for %s' % vista_bq_account_key) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'the schedule is not disabled in the bq_account settings all values must be 0' return bq_backfill_job_dict if 'schedule_interval' not in vista_bq_account.keys(): current_logger.error('error :: add_bq_backfill_job :: schedule_interval is not defined in the bq_account settings for %s' % vista_bq_account_key) bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'schedule_interval is not defined in the bq_account settings' return bq_backfill_job_dict # Check that replace_query_string is in the query if replace_query_string not in vista_bq_account['query']: current_logger.error('error :: add_bq_backfill_job :: the replace_query_string string was not found in the account settings query') bq_backfill_job_dict['status_code'] = 400 bq_backfill_job_dict['error'] = 'the replace_query_string string was not found in the account settings query' return bq_backfill_job_dict # Add job to bq_backfill Redis queue job_id = '%s.%s' % (str(int(time())), vista_bq_account_key) status_url = '%s/api?bq_backfill_job_status=true&job_id=%s' % ( settings.SKYLINE_URL, job_id ) if settings.REMOTE_SKYLINE_INSTANCES: status_url = status_url + '&cluster_data=true' data = {} try: data = { 'job_id': job_id, 'vista_bq_account_key': vista_bq_account_key, 'from_timestamp': from_timestamp, 'until_timestamp': until_timestamp, 'interval': interval, 'date_format': date_format, 'replace_query_string': replace_query_string, 'zero_fill': zero_fill, 'dry_run': dry_run, 'replace_redis_timeseries': replace_redis_timeseries, 'max_creates_per_minute': max_creates_per_minute, 'use_archive_data': use_archive_data, 'flux_test': flux_test, 'verify_ssl': verify_ssl, 'status': 'pending', 'status_url': status_url, 'percent_complete': 0, 'intervals_to_fetch': [], 'intervals_fetched': [], 'external_progress_url': external_progress_url, 'added_by': added_by, } except Exception as err: current_logger.error('error :: add_bq_backfill_job :: failed to create data dict, err: %s' % ( err)) bq_backfill_job_dict['status_code'] = 500 bq_backfill_job_dict['error'] = 'failed to create backfill job data dict' return bq_backfill_job_dict try: redis_conn_decoded = get_redis_conn_decoded(current_skyline_app) except Exception as err: current_logger.error('error :: add_bq_backfill_job :: get_redis_conn_decoded failed, err: %s' % ( err)) bq_backfill_job_dict['status_code'] = 500 bq_backfill_job_dict['error'] = 'failed to connect to Redis' return bq_backfill_job_dict try: redis_conn_decoded.hset('vista.bq_backfill.work', job_id, str(data)) except Exception as err: current_logger.error('error :: add_bq_backfill_job :: failed to add item to vista.bq_backfill.work hash, data: %s, err: %s' % ( str(data), err)) bq_backfill_job_dict['status_code'] = 500 bq_backfill_job_dict['error'] = 'failed to add backfill job to vista.bq_backfill.work' return bq_backfill_job_dict try: redis_conn_decoded.hset('vista.bq_backfill.job_status', job_id, str(data)) except Exception as err: current_logger.error('error :: add_bq_backfill_job :: failed to add item to vista.bq_backfill.job_status hash, data: %s, err: %s' % ( str(data), err)) if len(data) > 0: bq_backfill_job_dict = copy.deepcopy(data) bq_backfill_job_dict['status_code'] = 200 current_logger.info('%s :: returning bq_backfill_job_dict: %s' % ( function_str, str(bq_backfill_job_dict))) return bq_backfill_job_dict