Source code for webapp.api_bq_query

"""
api_bq_query.py
"""
import copy
import logging
import os
import sys
import traceback

from contextlib import nullcontext
from datetime import datetime
from time import time
from flask import request

# For version pinning consider implementing a skyline-bq-py31013 environment
# and adding that to the path so that imports come from there rather than from
# the skyline-py31013 environment which has different versions of some libraries
# e.g. googleapis-common-protos
import settings
from functions.settings.get_bq_accounts_settings import get_bq_accounts_settings

try:
    VISTA_BQ_ACCOUNTS = copy.deepcopy(settings.VISTA_BQ_ACCOUNTS)
except:
    VISTA_BQ_ACCOUNTS = {}
try:
    VISTA_BQ_VIRTUALENV_PATH = settings.VISTA_BQ_VIRTUALENV_PATH
except:
    VISTA_BQ_VIRTUALENV_PATH = None
if VISTA_BQ_VIRTUALENV_PATH:
    sys.path.insert(0, VISTA_BQ_VIRTUALENV_PATH)
    from functions.bigquery.bq_df_to_metrics import bq_df_to_metrics
    from functions.bigquery.bq_credentials import get_bq_credentials
    import pandas_gbq
else:
    bq_df_to_metrics = nullcontext()
    get_bq_credentials = nullcontext()
    get_bq_credentials = nullcontext()
    pandas_gbq = nullcontext()


# @added 20240725 - Feature #5372: vista - bq_update
#                   Feature #5352: vista - bigquery
[docs] def api_bq_query(current_skyline_app): """ Return a dict with the backfill job details. :param current_skyline_app: the app calling the function :type current_skyline_app: str :return: bq_query_results :rtype: dict """ bq_query_results = {'status_code': 500, 'archive_csv': None, 'archive_csv_url': None} start = time() current_skyline_app_logger = current_skyline_app + 'Log' current_logger = logging.getLogger(current_skyline_app_logger) vista_bq_account_key = None # Test whether form or json POST form_data = False try: vista_bq_account_key = request.form['vista_bq_account_key'] if vista_bq_account_key: form_data = True current_logger.info('api_bq_query :: form POST') current_logger.info('api_bq_query - vista_bq_account_key passed: %s' % str(vista_bq_account_key)) except: current_logger.info('api_bq_query no form data trying json') post_data = {} if not form_data: try: post_data = request.get_json() except Exception as err: current_logger.error('error :: api_bq_query - no POST data - %s' % ( err)) current_logger.info('api_bq_query :: return 400 no POST data') bq_query_results['status_code'] = 400 bq_query_results['error'] = 'no post data' return bq_query_results if post_data: current_logger.info('api_bq_query :: post_data: %s' % str(post_data)) try: vista_bq_account_key = post_data['data']['vista_bq_account_key'] if vista_bq_account_key: current_logger.info('api_bq_query - vista_bq_account_key passed: %s' % str(vista_bq_account_key)) except KeyError: vista_bq_account_key = None except Exception as err: vista_bq_account_key = None current_logger.error('error :: api_bq_query - evaluation of post_data[\'data\'][\'vista_bq_account_key\'] failed - %s' % ( err)) if not vista_bq_account_key: current_logger.info('api_bq_query :: return 400 no vista_bq_account_key determined') bq_query_results['status_code'] = 400 bq_query_results['error'] = 'no vista_bq_account_key argument' return bq_query_results sql = None try: if form_data: sql = str(request.form['sql']) else: sql = str(post_data['data']['sql']) current_logger.info('api_bq_query :: with sql: %s' % sql) except Exception as err: current_logger.error('api_bq_query :: failed to determine sql, err: %s' % err) current_logger.info('api_bq_query :: return 400 no sql passed') bq_query_results['status_code'] = 400 bq_query_results['error'] = 'no sql argument' return bq_query_results vista_bq_accounts = {} try: vista_bq_accounts = get_bq_accounts_settings(current_skyline_app) except Exception as err: current_logger.error('error :: api_bq_query :: get_bq_accounts_settings failed, err: %s' % err) current_logger.info('api_bq_query :: returning 500') bq_query_results['status_code'] = 500 bq_query_results['error'] = 'get_bq_accounts_settings failed' return bq_query_results bq_account_settings = {} try: bq_account_settings = vista_bq_accounts[vista_bq_account_key] except Exception as err: current_logger.error('error :: api_bq_query :: failed to determine settings for %s, err: %s' % ( vista_bq_account_key, err)) if not bq_account_settings: err_msg = 'failed to determine settings for %s' % vista_bq_account_key current_logger.error('error :: api_bq_query :: %s' % err_msg) current_logger.info('api_bq_query :: returning 500') bq_query_results['status_code'] = 500 bq_query_results['error'] = err_msg return bq_query_results # Credentials can be passed in a json file or as a dict (as from # external_settings) credentials = None use_credentials = None oauth_credentials = {} service_credentials = {} oauth_credentials_file = None service_credentials_file = None project_id = None if 'project_id' in bq_account_settings.keys(): project_id = bq_account_settings['project_id'] if 'oauth_credentials' in bq_account_settings.keys(): oauth_credentials = bq_account_settings['oauth_credentials'] if 'oauth_credentials_file' in bq_account_settings.keys(): oauth_credentials_file = bq_account_settings['oauth_credentials_file'] if 'service_credentials' in bq_account_settings.keys(): service_credentials = bq_account_settings['service_credentials'] if 'service_credentials_file' in bq_account_settings.keys(): service_credentials_file = bq_account_settings['service_credentials_file'] try: credentials = get_bq_credentials( current_skyline_app, credentials=use_credentials, project_id=project_id, oauth_credentials=oauth_credentials, oauth_credentials_file=oauth_credentials_file, service_credentials=service_credentials, service_credentials_file=service_credentials_file) except Exception as err: current_logger.error('error :: api_bq_query :: get_bq_credentials failed, err: %s' % ( err)) if not credentials: current_logger.error('error :: api_bq_query :: no credentials retrieved for %s' % vista_bq_account_key) err_msg = 'failed to get credentials for %s' % vista_bq_account_key current_logger.error('error :: api_bq_query :: %s' % err_msg) current_logger.info('api_bq_query :: returning 500') bq_query_results['status_code'] = 500 bq_query_results['error'] = err_msg return bq_query_results else: use_credentials = credentials configuration = {'jobTimeoutMs': 120000, 'timeoutMs': 120000} df = None try: df = pandas_gbq.read_gbq(sql, project_id=bq_account_settings['project_id'], credentials=credentials, progress_bar_type=None, configuration=configuration) except Exception as err: err_msg = 'pandas_gbq.read_gbq failed with sql: %s' % sql current_logger.error(traceback.format_exc()) current_logger.error('error :: api_bq_query :: %s, err: %s' % ( err_msg, err)) current_logger.info('api_bq_query :: returning 500') bq_query_results['status_code'] = 500 bq_query_results['error'] = err_msg return bq_query_results if df is None: err_msg = 'pandas_gbq.read_gbq returned no data df is None' current_logger.error('error :: api_bq_query :: %s' % ( err_msg)) current_logger.info('api_bq_query :: returning 500') bq_query_results['status_code'] = 500 bq_query_results['error'] = err_msg return bq_query_results current_logger.info('api_bq_query :: fetched %s rows' % str(len(df))) vista_bq_archive_path = '%s/vista/bq_archives' % settings.SKYLINE_DIR new_archive_csv = '%s/%s.%s.csv' % (vista_bq_archive_path, str(int(start)), bq_account_settings['project_id']) try: df.to_csv(new_archive_csv, index=False) current_logger.info('api_bq_query :: created %s' % new_archive_csv) except Exception as err: err_msg = 'df.to_csv failed to create archive_csv: %s' % new_archive_csv current_logger.error('error :: bq_update :: %s, err: %s' % ( err_msg, err)) if os.path.isfile(new_archive_csv): archive_csv_url = '%s/vista_bq_archive_file?file=%s' % (settings.SKYLINE_URL, new_archive_csv) bq_query_results = {'status_code': 200, 'archive_csv': new_archive_csv, 'archive_csv_url': archive_csv_url} current_logger.info('api_bq_query :: returning bq_query_results: %s' % str(bq_query_results)) return bq_query_results