import logging
import sys
import traceback
from os import getpid, uname
from os.path import isdir
from daemon import runner
from time import sleep
from logging.handlers import TimedRotatingFileHandler, MemoryHandler
# @added 20240626 - Feature #5352: vista - bigquery
# Feature #5372: vista - bq_update
from contextlib import nullcontext
import os.path
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
if True:
try:
import settings
from fetcher import Fetcher
from worker import Worker
from validate_settings import validate_settings_variables
# @added 20220328 - Feature #4018: thunder - skyline.errors
from functions.redis.RedisErrorLogHandler import RedisErrorLogHandler
except:
print(traceback.format_exc())
print('failed to import Skyline modules')
sys.exit(1)
# @added 20240626 - Feature #5352: vista - bigquery
# Feature #5372: vista - bq_update
# Only enable vista bq_fetcher on the node or cluster if BQ is enabled and this
# is the worker node
VISTA_BQ_WORKER = False
VISTA_BQ_VIRTUALENV_PATH = None
try:
VISTA_BQ_VIRTUALENV_PATH = settings.VISTA_BQ_VIRTUALENV_PATH
except:
VISTA_BQ_VIRTUALENV_PATH = None
if VISTA_BQ_VIRTUALENV_PATH:
VISTA_BQ_WORKER = True
try:
HORIZON_SHARDS = settings.HORIZON_SHARDS
except:
HORIZON_SHARDS = {}
this_host = str(uname()[1])
HORIZON_SHARD = 0
if HORIZON_SHARDS:
HORIZON_SHARD = HORIZON_SHARDS[this_host]
if HORIZON_SHARDS and VISTA_BQ_VIRTUALENV_PATH:
if HORIZON_SHARD != 0:
VISTA_BQ_WORKER = False
if VISTA_BQ_WORKER:
from bq_fetcher import Bq_Fetcher
else:
Bq_Fetcher = nullcontext()
skyline_app = 'vista'
skyline_app_logger = '%sLog' % skyline_app
logger = logging.getLogger(skyline_app_logger)
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
[docs]
class Vista():
"""
Initializes Vista
"""
def __init__(self):
self.stdin_path = '/dev/null'
self.stdout_path = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
self.stderr_path = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
self.pidfile_path = '%s/%s.pid' % (settings.PID_PATH, skyline_app)
self.pidfile_timeout = 5
[docs]
def run(self):
logger.info('%s :: agent starting' % skyline_app)
pid = getpid()
# Start the workers
for i in range(settings.VISTA_WORKER_PROCESSES):
if i == 0:
logger.info('%s :: agent :: starting Worker 1' % skyline_app)
Worker(pid).start()
else:
logger.info('%s :: agent :: starting Worker %s' % (skyline_app, str(i)))
Worker(pid).start()
# Start the fetcher
logger.info('%s :: agent :: starting Fetcher' % skyline_app)
Fetcher(pid).start()
# @added 20240626 - Feature #5352: vista - bigquery
# Feature #5372: vista - bq_update
if VISTA_BQ_WORKER:
logger.info('%s :: agent :: starting Bq_Fetcher' % skyline_app)
Bq_Fetcher(pid).start()
while 1:
sleep(100)
[docs]
def run():
"""
Start the Vista.
Start the logger.
"""
if not isdir(settings.PID_PATH):
print('pid directory does not exist at %s' % settings.PID_PATH)
sys.exit(1)
if not isdir(settings.LOG_PATH):
print('log directory does not exist at %s' % settings.LOG_PATH)
sys.exit(1)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s :: %(process)s :: %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
handler = logging.handlers.TimedRotatingFileHandler(
logfile,
when="midnight",
interval=1,
backupCount=5)
memory_handler = logging.handlers.MemoryHandler(256,
flushLevel=logging.DEBUG,
target=handler)
handler.setFormatter(formatter)
logger.addHandler(memory_handler)
# @added 20220328 - Feature #4018: thunder - skyline.errors
# For every error logged set a count in the app Redis key which is consumed
# by thunder and creates the sskyline.<hostname>.<skyline_app>.logged_errors
# metric
redis_error_log_handler = RedisErrorLogHandler(skyline_app)
redis_error_log_handler.setLevel(logging.ERROR)
redis_error_log_handler.setFormatter(formatter)
logger.addHandler(redis_error_log_handler)
# Validate settings variables
valid_settings = validate_settings_variables(skyline_app)
if not valid_settings:
print('error :: agent :: invalid variables in settings.py - cannot start')
sys.exit(1)
vista = Vista()
logger.info('agent :: starting vista')
memory_handler.flush
if len(sys.argv) > 1 and sys.argv[1] == 'run':
vista.run()
else:
daemon_runner = runner.DaemonRunner(vista)
daemon_runner.daemon_context.files_preserve = [handler.stream]
daemon_runner.do_action()
logger.info('stopping vista')
memory_handler.flush
if __name__ == '__main__':
run()