Source code for webapp.wind

"""
wind.py
"""
import gzip
import json
import logging
import traceback
from time import time

from flask import request

import settings

from skyline_functions import get_redis_conn_decoded

skyline_app = 'webapp'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)


[docs] def wind(results): """ Create a job or accept the results of a job and submit them to the appropriate work Redis hash. wind expects a json payload with the required_keys and will either post to results of the job to the originating source node or will accept the results from a job and post them to the appropriate work Redis hash. If results are present, results_url must be set to None, if this is a job being submitted results_url must be set and results must be an empty dict or None/null. post_data = { 'source_host': skyline_node, 'app': 'ionosphere', 'job': 'motif_annihilation', 'redis_work_hash': 'ionosphere.find_repetitive_patterns.motif_annihilation.work', 'redis_work_hash_key': '1712230013.1507778', 'data': {'metric_id': metric_id, 'metric': metric, 'anomaly_id': anomaly_id, 'anomaly_timestamp': anomaly_timestamp }, 'results_url': 'url_to_post_results_to or None/null', 'results': results_dict_or_None/null, } :param results: a dict with status_code, etc to populate and return :type results: dict :return: results :rtype: dict """ post_data = {} content_encoding = None try: content_encoding = request.headers.get('content-encoding', '') except Exception as err: logger.error('error :: wind :: an error occurred determining content-encoding, err: %s' % ( err)) if content_encoding == 'gzip': logger.info('wind :: handling gzip payload') post_data_obj = None try: postData_obj = request.data post_data_obj = gzip.decompress(postData_obj) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: wind :: gzip.decompress(postData_obj) failed, err: %s' % err) if post_data_obj: try: post_data = json.loads(post_data_obj) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: wind :: json.loads(post_data_obj) failed, err: %s' % err) if not post_data: try: post_data = request.get_json() except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: wind :: no POST data, err: %s' % ( err)) logger.info('wind :: return 400 no POST data') results['status_code'] = 400 results['reason'] = 'no POST data' return results if not post_data: logger.error('error :: wind :: no POST data') logger.info('wind :: return 400 no POST data') results['status_code'] = 400 results['reason'] = 'no POST data' return results required_keys = [ 'source_host', 'app', 'job', 'redis_work_hash', 'redis_work_hash_key', 'data', 'results', 'results_url' ] valid_jobs = { 'motif_annihilation': { 'app': 'ionosphere', 'redis_work_hash': 'ionosphere.find_repetitive_patterns.motif_annihilation.work', }, } results_dict = {} for key in required_keys: if key not in post_data.keys(): reason = 'missing parameter in POST data' logger.error('error :: wind :: no %s in POST data' % ( key)) logger.info('wind, return 400') results['status_code'] = 400 results['reason'] = reason return results if key == 'source_host': source_host = post_data[key] if key == 'app': app = post_data[key] if key == 'job': job = post_data[key] if job not in list(valid_jobs.keys()): reason = 'invalid job passed' logger.error('error :: wind :: %s, job: %s' % ( reason, str(job))) logger.info('wind :: return 400') results['status_code'] = 400 results['reason'] = reason return results if key == 'redis_work_hash': redis_work_hash = post_data[key] if key == 'redis_work_hash_key': redis_work_hash_key = post_data[key] if key == 'data': data = post_data[key] if not isinstance(data, dict): reason = 'invalid data passed' logger.error('error :: wind :: %s, data: %s' % ( reason, str(data))) logger.info('wind :: return 400') results['status_code'] = 400 results['reason'] = reason return results if key == 'results_url': results_url = post_data[key] work_host = None if key == 'results': results_dict = post_data[key] if isinstance(results_dict, dict): if len(results_dict) > 0: results_url = None try: work_host = results_dict['work_host'] except KeyError: work_host = None if not work_host: try: work_host = post_data['work_host'] except KeyError: work_host = None if job == 'motif_annihilation': if 'pw5_timeseries' not in post_data.keys(): reason = 'invalid data passed' logger.error('error :: wind :: no pw5_timeseries passed') logger.info('wind :: return 400') results['status_code'] = 400 results['reason'] = reason return results if results_url: logger.info('wind :: accepting %s job from %s' % ( job, source_host)) if results_dict: logger.info('wind :: accepting %s job results from %s' % ( job, str(work_host))) try: redis_conn_decoded = get_redis_conn_decoded(skyline_app) except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: wind :: get_redis_conn_decoded failed, err: %s' % ( err)) logger.info('wind, return 500') results['status_code'] = 500 results['reason'] = 'Redis failed' return results added_at = time() post_data['added_at'] = added_at new_redis_work_hash_key = str(float(redis_work_hash_key) + 1) post_data['redis_work_hash_key'] = new_redis_work_hash_key if results_dict: try: redis_conn_decoded.hset(redis_work_hash, new_redis_work_hash_key, str(post_data)) results['status_code'] = 200 results['reason'] = 'OK' results['job'] = job job_result = 'results submitted for %s to %s' % (redis_work_hash_key, redis_work_hash) results['action'] = job_result logger.info('wind :: request results: %s' % str(results)) return results except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: wind :: redis_conn_decoded failed to set %s in %s, err: %s' % ( str(redis_work_hash_key), str(redis_work_hash), err)) logger.info('wind, return 500') results['status_code'] = 500 results['reason'] = 'Redis failed add results' logger.info('wind :: request results: %s' % str(results)) return results if results_url: try: redis_conn_decoded.hset(redis_work_hash, new_redis_work_hash_key, str(post_data)) results['status_code'] = 200 results['reason'] = 'OK' results['job'] = job job_result = 'submitted job for %s to %s' % (new_redis_work_hash_key, redis_work_hash) results['action'] = job_result logger.info('wind :: request results: %s' % str(results)) return results except Exception as err: logger.error(traceback.format_exc()) logger.error('error :: wind :: redis_conn_decoded failed to set %s in %s, err: %s' % ( str(new_redis_work_hash_key), str(redis_work_hash), err)) logger.info('wind, return 500') results['status_code'] = 500 results['reason'] = 'Redis failed add results' logger.info('wind :: request results: %s' % str(results)) return results logger.info('wind :: request results: %s' % str(results)) return results