from __future__ import division
import logging
import uuid
import random
from time import time, sleep
from threading import Thread
from multiprocessing import Process
import os
from os import kill, getpid
import traceback
from sys import version_info
from sys import exit as sys_exit
# import os.path
import datetime
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import settings
from skyline_functions import get_redis_conn, get_redis_conn_decoded
# @added 20220429 - Feature #4536: Handle Redis failure
from functions.flux.get_last_metric_data import get_last_metric_data
if settings.MEMCACHE_ENABLED:
from functions.memcache.get_memcache_key import get_memcache_key
from functions.memcache.set_memcache_key import set_memcache_key
from functions.memcache.delete_memcache_key import delete_memcache_key
else:
get_memcache_key = None
set_memcache_key = None
delete_memcache_key = None
skyline_app = 'snab_flux_load_test'
skyline_app_logger = 'snab_flux_load_testLog'
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(version_info[0])
this_host = str(os.uname()[1])
try:
SERVER_METRIC_PATH = '.%s' % settings.SERVER_METRICS_NAME
if SERVER_METRIC_PATH == '.':
SERVER_METRIC_PATH = ''
except:
SERVER_METRIC_PATH = ''
try:
SNAB_FLUX_LOAD_TEST_ENABLED = settings.SNAB_FLUX_LOAD_TEST_ENABLED
except:
SNAB_FLUX_LOAD_TEST_ENABLED = True
try:
SNAB_FLUX_LOAD_TEST_METRICS = settings.SNAB_FLUX_LOAD_TEST_METRICS
except:
SNAB_FLUX_LOAD_TEST_METRICS = 0
try:
SNAB_FLUX_LOAD_TEST_METRICS_PER_POST = settings.SNAB_FLUX_LOAD_TEST_METRICS_PER_POST
except:
SNAB_FLUX_LOAD_TEST_METRICS_PER_POST = 100
try:
SNAB_FLUX_LOAD_TEST_NAMESPACE_PREFIX = settings.SNAB_FLUX_LOAD_TEST_NAMESPACE_PREFIX
except:
SNAB_FLUX_LOAD_TEST_NAMESPACE_PREFIX = 'skyline.snab.%s.flux_load_test' % this_host
FLUX_POST_URL = '%s/flux/metric_data_post' % settings.SKYLINE_URL
LOCAL_DEBUG = False
snab_flux_load_test_metrics_set = 'snab.flux_load_test.metrics'
snab_flux_load_test_metrics_all_set = 'snab.flux_load_test.metrics.all'
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
[docs]class SNAB_flux_load_test(Thread):
"""
The SNAB class which controls the snab thread and spawned
processes.
"""
def __init__(self, parent_pid):
"""
Initialize the SNAB_flux_load_test
"""
super(SNAB_flux_load_test, self).__init__()
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.current_pid = getpid()
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory
"""
try:
kill(self.current_pid, 0)
kill(self.parent_pid, 0)
except:
# @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics
# Log warning
logger.warning('warning :: parent or current process dead')
sys_exit(0)
[docs] def spin_snab_flux_load_test_process(self, current_timestamp):
"""
Push metrics to flux.
:param i: python process id
:return: True
:rtype: boolean
"""
spin_start = time()
# snab_flux_load_test_metrics_set = 'snab.flux_load_test.metrics'
# snab_flux_load_test_metrics_all_set = 'snab.flux_load_test.metrics.all'
spin_snab_flux_load_test_process_pid = os.getpid()
logger.info('spin_snab_flux_load_test_process - pid %s, sending %s metrics to flux at %s metrics per POST' % (
str(spin_snab_flux_load_test_process_pid),
str(SNAB_FLUX_LOAD_TEST_METRICS),
str(SNAB_FLUX_LOAD_TEST_METRICS_PER_POST)))
if not SNAB_FLUX_LOAD_TEST_METRICS:
logger.info('nothing to do')
return
snab_flux_load_test_metrics = []
try:
snab_flux_load_test_metrics = sorted(list(self.redis_conn_decoded.smembers(snab_flux_load_test_metrics_set)))
except Exception as e:
logger.error('error :: could not query Redis for set %s - %s' % (snab_flux_load_test_metrics_set, e))
# @added 20220429 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
try:
snab_flux_load_test_metrics = get_memcache_key('snab_flux_load_test', snab_flux_load_test_metrics_set)
if not snab_flux_load_test_metrics:
snab_flux_load_test_metrics = []
else:
snab_flux_load_test_metrics = sorted(snab_flux_load_test_metrics)
logger.info('failed over to memcache - got data from memcache key %s' % snab_flux_load_test_metrics_set)
except Exception as err:
logger.error('error :: get_memcache_key %s failed - %s' % (
snab_flux_load_test_metrics_set, err))
snab_flux_load_test_metrics = []
logger.info('snab_flux_load_test_metrics determined %s test metrics from Redis' % (
str(len(snab_flux_load_test_metrics))))
if snab_flux_load_test_metrics:
try:
self.redis_conn.sadd(snab_flux_load_test_metrics_all_set, *set(snab_flux_load_test_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add multiple members to the %s Redis set' % snab_flux_load_test_metrics_all_set)
# @added 20220429 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
success = False
try:
success = set_memcache_key('snab_flux_load_test', snab_flux_load_test_metrics_all_set, snab_flux_load_test_metrics)
except Exception as err:
logger.error('error :: failed to add %s metrics to memcache %s key - %s' % (
str(len(snab_flux_load_test_metrics)), snab_flux_load_test_metrics_set, err))
if success:
logger.info('added %s metrics to memcache %s key' % (
str(len(snab_flux_load_test_metrics)), snab_flux_load_test_metrics_set))
snab_flux_load_test_metrics_all = []
try:
snab_flux_load_test_metrics_all = sorted(list(self.redis_conn_decoded.smembers(snab_flux_load_test_metrics_all_set)))
except Exception as e:
logger.error('error :: could not query Redis for set %s - %s' % (snab_flux_load_test_metrics_all_set, e))
# @added 20220429 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
try:
snab_flux_load_test_metrics_all = get_memcache_key('snab_flux_load_test', snab_flux_load_test_metrics_all_set)
if not snab_flux_load_test_metrics_all:
snab_flux_load_test_metrics_all = []
else:
snab_flux_load_test_metrics_all = sorted(snab_flux_load_test_metrics_all)
logger.info('failed over to memcache - got data from memcache key %s' % snab_flux_load_test_metrics_all_set)
except Exception as err:
logger.error('error :: get_memcache_key %s failed - %s' % (
snab_flux_load_test_metrics_all_set, err))
snab_flux_load_test_metrics_all = []
logger.info('snab_flux_load_test_metrics_all determined %s known test metrics from Redis' % (
str(len(snab_flux_load_test_metrics_all))))
check_for_removals = True
if len(snab_flux_load_test_metrics) != len(snab_flux_load_test_metrics_all):
check_for_removals = False
if len(snab_flux_load_test_metrics) > SNAB_FLUX_LOAD_TEST_METRICS:
check_for_removals = True
if check_for_removals:
logger.info('checking what snab test metrics need to be removed')
remove_from_snab_set = False
if len(snab_flux_load_test_metrics) > SNAB_FLUX_LOAD_TEST_METRICS:
remove_from_snab_set = True
metrics_to_remove = []
if remove_from_snab_set:
metrics_to_remove = snab_flux_load_test_metrics_all[SNAB_FLUX_LOAD_TEST_METRICS:]
snab_flux_load_test_metrics = snab_flux_load_test_metrics[0:SNAB_FLUX_LOAD_TEST_METRICS]
if remove_from_snab_set:
if metrics_to_remove:
logger.info('removing %s metrics from %s Redis set' % (
str(len(metrics_to_remove)), snab_flux_load_test_metrics_set))
try:
self.redis_conn.srem(snab_flux_load_test_metrics_set, *set(metrics_to_remove))
except:
logger.info(traceback.format_exc())
logger.error('error :: failed to remove multiple members from %s Redis set' % snab_flux_load_test_metrics_set)
logger.info('getting list of metrics.unique_metrics from Redis set')
full_uniques = '%sunique_metrics' % settings.FULL_NAMESPACE
try:
unique_metrics = list(self.redis_conn_decoded.smembers(full_uniques))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to generate a list from %s Redis set' % full_uniques)
unique_metrics = []
logger.info('checking if any snab test metrics need to be removed from metrics.unique_metrics')
for metric in unique_metrics:
if SNAB_FLUX_LOAD_TEST_NAMESPACE_PREFIX in metric:
if metric.startswith(settings.FULL_NAMESPACE):
base_name = metric.replace(settings.FULL_NAMESPACE, '', 1)
else:
base_name = metric
if base_name not in snab_flux_load_test_metrics:
try:
self.redis_conn.srem(full_uniques, str(metric))
except:
pass
del unique_metrics
del metrics_to_remove
flux_last_key_prefix = 'flux.last.%s.*' % SNAB_FLUX_LOAD_TEST_NAMESPACE_PREFIX
logger.info('generating list of all possible %s Redis keys' % flux_last_key_prefix)
snab_remove_flux_last_keys = []
for base_name in snab_flux_load_test_metrics_all:
if base_name not in snab_flux_load_test_metrics:
try:
flux_last_key = 'flux.last.%s' % base_name
snab_remove_flux_last_keys.append(flux_last_key)
except:
pass
del snab_flux_load_test_metrics_all
logger.info('getting list of flux.last Redis keys')
try:
flux_last_keys = list(self.redis_conn_decoded.scan_iter(match=flux_last_key_prefix))
except Exception as err:
logger.error('error :: scan_iter failed on flux.last Redis keys - %s' % err)
flux_last_keys = []
logger.info('there are potentially %s flux.last keys that need to be removed from Redis for not in use snab test metrics' % str(len(snab_remove_flux_last_keys)))
logger.info('checking if any of the %s flux.last keys need to be removed from Redis' % str(len(flux_last_keys)))
snab_flux_last_keys_to_remove = []
for flux_last_key in snab_remove_flux_last_keys:
if flux_last_key in flux_last_keys:
snab_flux_last_keys_to_remove.append(flux_last_key)
del flux_last_keys
del snab_remove_flux_last_keys
if snab_flux_last_keys_to_remove:
for flux_last_key in snab_flux_last_keys_to_remove:
try:
self.redis_conn.delete(flux_last_key)
except:
continue
logger.info('deleted %s flux.last keys for not in use snab test metrics' % str(len(snab_flux_last_keys_to_remove)))
else:
logger.info('there are no flux.last keys for not in use snab test metrics to delete')
del snab_flux_last_keys_to_remove
adding_metrics = 0
if len(snab_flux_load_test_metrics) < SNAB_FLUX_LOAD_TEST_METRICS:
adding_metrics = SNAB_FLUX_LOAD_TEST_METRICS - len(snab_flux_load_test_metrics)
logger.info('adding %s metrics to snab_flux_load_test_metrics and %s Redis set' % (
str(adding_metrics), snab_flux_load_test_metrics_set))
if len(snab_flux_load_test_metrics) < SNAB_FLUX_LOAD_TEST_METRICS:
snab_flux_load_test_metrics_all = []
known_snab_metrics_to_added = 0
try:
snab_flux_load_test_metrics_all = list(self.redis_conn_decoded.smembers(snab_flux_load_test_metrics_all_set))
except Exception as e:
logger.error('error :: could not query Redis for set %s - %s' % (snab_flux_load_test_metrics_all_set, e))
snab_flux_load_test_metrics_all = []
# @added 20220429 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
try:
snab_flux_load_test_metrics_all = get_memcache_key('snab_flux_load_test', snab_flux_load_test_metrics_all_set)
if not snab_flux_load_test_metrics_all:
snab_flux_load_test_metrics_all = []
else:
snab_flux_load_test_metrics_all = sorted(snab_flux_load_test_metrics_all)
logger.info('failed over to memcache - got data from memcache key %s' % snab_flux_load_test_metrics_all_set)
except Exception as err:
logger.error('error :: get_memcache_key %s failed - %s' % (
snab_flux_load_test_metrics_all_set, err))
snab_flux_load_test_metrics_all = []
logger.info('snab_flux_load_test_metrics_all determined %s test metrics from Redis' % (
str(len(snab_flux_load_test_metrics_all))))
if snab_flux_load_test_metrics_all:
unique_snab_flux_load_test_metrics_set = set(snab_flux_load_test_metrics)
unique_snab_flux_load_test_metrics_all_set = set(snab_flux_load_test_metrics_all)
known_snab_metrics_to_add = []
set_difference = unique_snab_flux_load_test_metrics_all_set.difference(unique_snab_flux_load_test_metrics_set)
for metric in set_difference:
known_snab_metrics_to_add.append(metric)
if known_snab_metrics_to_add:
known_snab_metrics_to_add = list(set(known_snab_metrics_to_add))
for metric in known_snab_metrics_to_add:
if len(snab_flux_load_test_metrics) < SNAB_FLUX_LOAD_TEST_METRICS:
snab_flux_load_test_metrics.append(metric)
known_snab_metrics_to_added += 1
logger.info('%s known_snab_metrics added snab_flux_load_test_metrics' % (
str(known_snab_metrics_to_added)))
del snab_flux_load_test_metrics_all
while len(snab_flux_load_test_metrics) < SNAB_FLUX_LOAD_TEST_METRICS:
new_uuid = str(uuid.uuid4())
new_metric_uuid = new_uuid.replace('-', '.')
slot = str(round(random.random(), 2)) # nosec
new_metric = '%s.%s.%s' % (SNAB_FLUX_LOAD_TEST_NAMESPACE_PREFIX, slot, new_metric_uuid)
snab_flux_load_test_metrics.append(new_metric)
# Add to the snab_flux_load_test_metrics_set Redis set
try:
self.redis_conn.sadd(snab_flux_load_test_metrics_set, new_metric)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add item to Redis set %s' % (
snab_flux_load_test_metrics_set))
if adding_metrics:
logger.info('snab_flux_load_test_metrics now has %s metrics' % (
str(len(snab_flux_load_test_metrics))))
logger.info('snab_flux_load_test_metrics has %s metrics' % (
str(len(snab_flux_load_test_metrics))))
logger.info('snab_flux_load_test_metrics has %s unique metrics' % (
str(len(set(snab_flux_load_test_metrics)))))
try:
self.redis_conn.sadd(snab_flux_load_test_metrics_set, *set(snab_flux_load_test_metrics))
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to add multiple members to the %s Redis set' % snab_flux_load_test_metrics_set)
# @added 20220429 - Feature #4536: Handle Redis failure
if settings.MEMCACHE_ENABLED:
success = False
try:
success = set_memcache_key('snab_flux_load_test', snab_flux_load_test_metrics_set, snab_flux_load_test_metrics)
except Exception as err:
logger.error('error :: failed to add %s metrics to memcache %s key - %s' % (
str(len(snab_flux_load_test_metrics)), snab_flux_load_test_metrics_set, err))
if success:
logger.info('added %s metrics to memcache %s key' % (
str(len(snab_flux_load_test_metrics)), snab_flux_load_test_metrics_set))
epoch_Y = datetime.datetime.today().year
epoch_m = datetime.datetime.today().month
epoch_d = datetime.datetime.today().day
epoch_H = datetime.datetime.today().hour
epoch_M = datetime.datetime.today().minute
epoch_S = 0
initial_datetime = datetime.datetime(epoch_Y, epoch_m, epoch_d, epoch_H, epoch_M, epoch_S)
one_minute = datetime.timedelta(minutes=1)
epoch_datetime = initial_datetime - one_minute
# epoch_timestamp = int(epoch_datetime.strftime('%s'))
now_epoch_timestamp = int(initial_datetime.strftime('%s'))
epoch_timestamp = int(now_epoch_timestamp // 60 * 60)
connect_timeout = 5
read_timeout = 5
use_timeout = (int(connect_timeout), int(read_timeout))
if settings.WEBAPP_AUTH_ENABLED:
user = str(settings.WEBAPP_AUTH_USER)
password = str(settings.WEBAPP_AUTH_USER_PASSWORD)
post_count = 0
posted_count = 0
for metric in snab_flux_load_test_metrics:
if not post_count:
post_data_dict = {
'key': settings.FLUX_SELF_API_KEY,
'metrics': []
}
if post_count < SNAB_FLUX_LOAD_TEST_METRICS_PER_POST:
post_data_dict['metrics'].append({'metric': metric, 'timestamp': str(epoch_timestamp), 'value': str(round(random.random(), 2))}) # nosec
post_count += 1
if post_count == SNAB_FLUX_LOAD_TEST_METRICS_PER_POST:
response = None
try:
response = requests.post(FLUX_POST_URL, auth=(user, password), json=post_data_dict, timeout=use_timeout, verify=settings.VERIFY_SSL)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to post %s metrics, sleeping for 1 second' % (
str(post_count)))
response = None
sleep(1)
if response:
logger.info('posted %s metrics to flux with status code %s returned' % (str(post_count), str(response.status_code)))
posted_count += post_count
post_count = 0
running_for = int(time()) - current_timestamp
if running_for > 55:
logger.info('load test has run for longer than 55 seconds, stopping')
post_count = 0
break
if post_count:
response = None
try:
response = requests.post(FLUX_POST_URL, auth=(user, password), json=post_data_dict, timeout=use_timeout, verify=settings.VERIFY_SSL)
except:
logger.error(traceback.format_exc())
logger.error('error :: failed to post %s metrics' % (
str(post_count)))
response = None
if response:
posted_count += post_count
spin_end = time() - spin_start
logger.info('spin_snab_flux_load_test_process posted %s metrics to flux in %.2f seconds' % (str(posted_count), spin_end))
return
[docs] def run(self):
"""
- Called when the process intializes.
- Determine if Redis is up and discover checks to run.
- Divide and assign each process a metric check to analyse and add
results to source Redis set.
- Wait for the processes to finish.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os.remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os.remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
else:
logger.info('bin/%s.d log management done' % skyline_app)
logger.info('starting SNAB_flux_load_test')
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
logger.info('pinged Redis via get_redis_conn')
except:
logger.error(traceback.format_exc())
logger.error('error :: cannot connect to redis at socket path %s' % settings.REDIS_SOCKET_PATH)
sleep(10)
try:
self.redis_conn = get_redis_conn(skyline_app)
logger.info('connected via get_redis_conn')
except:
logger.error(traceback.format_exc())
logger.error('error :: not connected via get_redis_conn')
# @added 20220429 - Feature #4536: Handle Redis failure
if not settings.MEMCACHE_ENABLED:
continue
try:
self.redis_conn_decoded.ping()
logger.info('pinged Redis via get_redis_conn_decoded')
except:
logger.error(traceback.format_exc())
logger.error('error :: not connected via get_redis_conn_decoded')
sleep(10)
try:
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
logger.info('connected via get_redis_conn_decoded')
except:
logger.error(traceback.format_exc())
logger.error('error :: cannot connect to get_redis_conn_decoded')
# @added 20220429 - Feature #4536: Handle Redis failure
if not settings.MEMCACHE_ENABLED:
continue
# Run load test
while True:
current_timestamp = int(time())
logger.info('snab_flux_load_test - running load test')
# Spawn processes
pids = []
spawned_pids = []
pid_count = 0
p = Process(target=self.spin_snab_flux_load_test_process, args=(current_timestamp,))
pids.append(p)
pid_count += 1
logger.info('starting 1 of %s spin_snab_process' % (str(pid_count)))
p.start()
spawned_pids.append(p.pid)
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# Self monitor processes and terminate if any spin_snab_process
# that has run for longer than 58 seconds
p_starts = time()
while time() - p_starts <= 58:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - p_starts
logger.info('1 spin_snab_flux_load_test_process completed in %.2f seconds' % (time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('timed out, killing spin_snab_flux_load_test_process process')
for p in pids:
p.terminate()
# p.join()
for p in pids:
if p.is_alive():
logger.info('stopping spin_snab_flux_load_test_process - %s' % (str(p.is_alive())))
p.join()
process_runtime = time() - current_timestamp
if process_runtime < 60:
sleep_for = (60 - process_runtime)
logger.info('sleeping for %.2f seconds' % sleep_for)
sleep(sleep_for)
try:
del sleep_for
except:
pass