from __future__ import division
# @modified 20191115 - Branch #3262: py3
# from os import kill, getpid
from os import kill
from redis import StrictRedis, WatchError
from multiprocessing import Process
from threading import Thread
from msgpack import Unpacker, packb
try:
from types import TupleType
except ImportError:
eliminated_in_python3 = True
from time import time, sleep
from math import ceil
# import traceback
import logging
import sys
import os.path
from os import remove as os_remove
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
# @modified 20191115 - Branch #3262: py3
# This prevents flake8 E402 - module level import not at top of file
if True:
import settings
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
from skyline_functions import get_redis_conn, get_redis_conn_decoded
parent_skyline_app = 'horizon'
child_skyline_app = 'roomba'
skyline_app_logger = '%sLog' % parent_skyline_app
logger = logging.getLogger(skyline_app_logger)
skyline_app = '%s.%s' % (parent_skyline_app, child_skyline_app)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, parent_skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
python_version = int(sys.version_info[0])
# @added 20200727 - Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
try:
from settings import ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
except:
ROOMBA_DO_NOT_PROCESS_BATCH_METRICS = False
try:
from settings import BATCH_PROCESSING
except:
BATCH_PROCESSING = None
try:
BATCH_PROCESSING_NAMESPACES = list(settings.BATCH_PROCESSING_NAMESPACES)
except:
BATCH_PROCESSING_NAMESPACES = []
try:
from settings import BATCH_PROCESSING_DEBUG
except:
BATCH_PROCESSING_DEBUG = None
[docs]class Roomba(Thread):
"""
The Roomba is responsible for deleting keys older than DURATION.
"""
def __init__(self, parent_pid, skip_mini):
super(Roomba, self).__init__()
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
# @modified 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# if settings.REDIS_PASSWORD:
# self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
# else:
# self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# Added a single functions to deal with Redis connection and the
# charset='utf-8', decode_responses=True arguments required in py3
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
self.daemon = True
self.parent_pid = parent_pid
self.skip_mini = skip_mini
[docs] def check_if_parent_is_alive(self):
"""
Self explanatory.
"""
try:
kill(self.parent_pid, 0)
except:
# @added 20201203 - Bug #3856: Handle boring sparsely populated metrics in derivative_metrics
# Log warning
logger.warning('warning :: parent process is dead')
sys.exit(0)
[docs] def vacuum(self, i, namespace, duration):
"""
Trim metrics that are older than settings.FULL_DURATION and purge old
metrics.
"""
begin = time()
logger.info('%s :: started vacuum' % (skyline_app))
# Discover assigned metrics
namespace_unique_metrics = '%sunique_metrics' % str(namespace)
# @modified 20191030 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
# unique_metrics = list(self.redis_conn.smembers(namespace_unique_metrics))
unique_metrics = list(self.redis_conn_decoded.smembers(namespace_unique_metrics))
# @added 20200727 - Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# Feature #3480: batch_processing
# Feature #3486: analyzer_batch
if ROOMBA_DO_NOT_PROCESS_BATCH_METRICS and BATCH_PROCESSING and BATCH_PROCESSING_NAMESPACES:
try:
# @modified 20211127 - Feature #4328: BATCH_METRICS_CUSTOM_FULL_DURATIONS
# Ensure that known and new batch_processing_metrics are
# accounted for
# batch_metrics = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
batch_metrics1 = list(self.redis_conn_decoded.smembers('aet.analyzer.batch_processing_metrics'))
batch_metrics2 = list(self.redis_conn_decoded.smembers('analyzer.batch_processing_metrics'))
all_batch_metrics = batch_metrics1 + batch_metrics2
batch_metrics = list(set(all_batch_metrics))
except:
logger.error('error - failed to get Redis set aet.analyzer.batch_processing_metrics')
batch_metrics = []
if batch_metrics:
full_namespace_batch_metrics = []
for base_name in batch_metrics:
metric = ''.join((settings.FULL_NAMESPACE, base_name))
full_namespace_batch_metrics.append(metric)
del batch_metrics
non_batch_unique_metrics = []
for metric in unique_metrics:
if metric not in full_namespace_batch_metrics:
non_batch_unique_metrics.append(metric)
# @modified 20200815 - Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
# del after log
# UnboundLocalError: local variable 'full_namespace_batch_metrics' referenced before assignment
# del full_namespace_batch_metrics
if non_batch_unique_metrics:
logger.info('horizon.roomba :: batch_processing :: removing %s batch metrics from unique_metrics' % str(len(full_namespace_batch_metrics)))
unique_metrics = non_batch_unique_metrics
del non_batch_unique_metrics
# @added 20200815 - Feature #3650: ROOMBA_DO_NOT_PROCESS_BATCH_METRICS
del full_namespace_batch_metrics
keys_per_processor = int(ceil(float(len(unique_metrics)) / float(settings.ROOMBA_PROCESSES)))
if i == settings.ROOMBA_PROCESSES:
assigned_max = len(unique_metrics)
else:
assigned_max = min(len(unique_metrics), i * keys_per_processor)
assigned_min = (i - 1) * keys_per_processor
assigned_keys = range(assigned_min, assigned_max)
# Compile assigned metrics
assigned_metrics = [unique_metrics[index] for index in assigned_keys]
euthanized = 0
blocked = 0
trimmed_keys = 0
active_keys = 0
# @modified 20191016 - Task #3280: Handle py2 xange and py3 range
# Branch #3262: py3
# for i in xrange(len(assigned_metrics)):
range_list = []
if python_version == 2:
for i in xrange(len(assigned_metrics)):
range_list.append(i)
if python_version == 3:
for i in range(len(assigned_metrics)):
range_list.append(i)
for i in range_list:
self.check_if_parent_is_alive()
pipe = self.redis_conn.pipeline()
now = time()
key = assigned_metrics[i]
try:
# WATCH the key
pipe.watch(key)
# Everything below NEEDS to happen before another datapoint
# comes in. If your data has a very small resolution (<.1s),
# this technique may not suit you.
raw_series = pipe.get(key)
unpacker = Unpacker(use_list=False)
unpacker.feed(raw_series)
timeseries = sorted([unpacked for unpacked in unpacker])
# Put pipe back in multi mode
pipe.multi()
# There's one value. Purge if it's too old
try:
if python_version == 2:
if not isinstance(timeseries[0], TupleType):
if timeseries[0] < now - duration:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
continue
if python_version == 3:
if not isinstance(timeseries[0], tuple):
if timeseries[0] < now - duration:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
continue
except IndexError:
continue
# Check if the last value is too old and purge
if timeseries[-1][0] < now - duration:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
continue
# Remove old datapoints and duplicates from timeseries
temp = set()
temp_add = temp.add
delta = now - duration
trimmed = [
tuple for tuple in timeseries
if tuple[0] > delta and
tuple[0] not in temp and not
temp_add(tuple[0])
]
# Purge if everything was deleted, set key otherwise
if len(trimmed) > 0:
# Serialize and turn key back into not-an-array
btrimmed = packb(trimmed)
if len(trimmed) <= 15:
value = btrimmed[1:]
elif len(trimmed) <= 65535:
value = btrimmed[3:]
trimmed_keys += 1
else:
value = btrimmed[5:]
trimmed_keys += 1
pipe.set(key, value)
active_keys += 1
else:
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
euthanized += 1
pipe.execute()
except WatchError:
blocked += 1
assigned_metrics.append(key)
except Exception as e:
# If something bad happens, zap the key and hope it goes away
pipe.delete(key)
pipe.srem(namespace_unique_metrics, key)
pipe.execute()
euthanized += 1
logger.info(e)
logger.info('%s :: vacuum Euthanizing %s' % (skyline_app, key))
finally:
pipe.reset()
logger.info(
'%s :: vacuum operated on %s %d keys in %f seconds' %
(skyline_app, namespace, len(assigned_metrics), time() - begin))
logger.info('%s :: vaccum %s keyspace is now %d keys' % (skyline_app, namespace, (len(assigned_metrics) - euthanized)))
logger.info('%s :: vaccum blocked %d times' % (skyline_app, blocked))
logger.info('%s :: vacuum euthanized %d geriatric keys' % (skyline_app, euthanized))
logger.info('%s :: vacuum processed %d active keys' % (skyline_app, active_keys))
logger.info('%s :: vacuum potentially trimmed %d keys' % (skyline_app, trimmed_keys))
# sleeping in the main process is more CPU efficient than sleeping
# in the vacuum def
# if (time() - begin < 30):
# logger.info(skyline_app + ' :: sleeping due to low run time...')
# sleep(10)
[docs] def run(self):
"""
Called when process initializes.
"""
# Log management to prevent overwriting
# Allow the bin/<skyline_app>.d to manage the log
if os.path.isfile(skyline_app_logwait):
try:
os_remove(skyline_app_logwait)
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_logwait)
now = time()
log_wait_for = now + 5
while now < log_wait_for:
if os.path.isfile(skyline_app_loglock):
sleep(.1)
now = time()
else:
now = log_wait_for + 1
logger.info('starting %s run' % skyline_app)
if os.path.isfile(skyline_app_loglock):
logger.error('error - bin/%s.d log management seems to have failed, continuing' % skyline_app)
try:
os_remove(skyline_app_loglock)
logger.info('log lock file removed')
except OSError:
logger.error('error - failed to remove %s, continuing' % skyline_app_loglock)
else:
logger.info('bin/%s.d log management done' % skyline_app)
logger.info('%s :: started roomba' % skyline_app)
while 1:
now = time()
# Make sure Redis is up
try:
self.redis_conn.ping()
except:
logger.error(
'%s :: roomba can\'t connect to redis at socket path %s' %
(skyline_app, settings.REDIS_SOCKET_PATH))
sleep(10)
# @modified 20180519 - Feature #2378: Add redis auth to Skyline and rebrow
# @modified 20191115 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
if settings.REDIS_PASSWORD:
self.redis_conn = StrictRedis(password=settings.REDIS_PASSWORD, unix_socket_path=settings.REDIS_SOCKET_PATH)
else:
self.redis_conn = StrictRedis(unix_socket_path=settings.REDIS_SOCKET_PATH)
# @added 20191115 - Bug #3266: py3 Redis binary objects not strings
# Branch #3262: py3
self.redis_conn = get_redis_conn(skyline_app)
self.redis_conn_decoded = get_redis_conn_decoded(skyline_app)
continue
# Spawn processes
pids = []
for i in range(1, settings.ROOMBA_PROCESSES + 1):
if not self.skip_mini:
logger.info('%s :: starting vacuum process on mini namespace' % skyline_app)
p = Process(target=self.vacuum, args=(i, settings.MINI_NAMESPACE, settings.MINI_DURATION + settings.ROOMBA_GRACE_TIME))
pids.append(p)
p.start()
logger.info('%s :: starting vacuum process' % skyline_app)
p = Process(target=self.vacuum, args=(i, settings.FULL_NAMESPACE, settings.FULL_DURATION + settings.ROOMBA_GRACE_TIME))
pids.append(p)
p.start()
# Send wait signal to zombie processes
# for p in pids:
# p.join()
# deroomba - kill any lingering vacuum processes
# Changed to manage Roomba processes as edge cases related to I/O
# wait have been experienced that resulted in Roomba stalling so a
# ROOMBA_TIMEOUT setting was added and here we use the pattern
# described by http://stackoverflow.com/users/2073595/dano at
# http://stackoverflow.com/a/26064238 to monitor and kill any
# stalled processes rather than using p.join(TIMEOUT) - 20160505
# @earthgecko ref 1342
logger.info('%s :: allowing vacuum process/es %s seconds to run' % (
skyline_app, str(settings.ROOMBA_TIMEOUT)))
start = time()
while time() - start <= settings.ROOMBA_TIMEOUT:
if any(p.is_alive() for p in pids):
# Just to avoid hogging the CPU
sleep(.1)
else:
# All the processes are done, break now.
time_to_run = time() - start
logger.info('%s :: vacuum processes completed in %.2f' % (skyline_app, time_to_run))
break
else:
# We only enter this if we didn't 'break' above.
logger.info('%s :: timed out, killing all Roomba processes' % (skyline_app))
for p in pids:
p.terminate()
p.join()
# sleeping in the main process is more CPU efficient than sleeping
# in the vacuum def also roomba is quite CPU intensive so we only
# what to run roomba once every minute
process_runtime = time() - now
# @added 20210513 - Feature #4066: ROOMBA_OPTIMUM_RUN_DURATION
try:
ROOMBA_OPTIMUM_RUN_DURATION = int(settings.ROOMBA_OPTIMUM_RUN_DURATION)
except Exception as e:
logger.warning('%s :: roomba failed to determine ROOMBA_OPTIMUM_RUN_DURATION from settings, defaulting to 60 - %s' % (
skyline_app, e))
ROOMBA_OPTIMUM_RUN_DURATION = 60
# @modified 20210513 - Feature #4066: ROOMBA_OPTIMUM_RUN_DURATION
# roomba_optimum_run_duration = 60
roomba_optimum_run_duration = ROOMBA_OPTIMUM_RUN_DURATION
if process_runtime < roomba_optimum_run_duration:
sleep_for = (roomba_optimum_run_duration - process_runtime)
logger.info('%s :: sleeping %.2f for due to low run time' % (skyline_app, sleep_for))
sleep(sleep_for)