import logging
import time
import sys
from os import getpid
from os.path import dirname, abspath, isdir, join
from multiprocessing import Queue
from daemon import runner
from logging.handlers import TimedRotatingFileHandler, MemoryHandler
import os.path
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir))
sys.path.insert(0, os.path.dirname(__file__))
import settings
from validate_settings import validate_settings_variables
from listen import Listen
from roomba import Roomba
from worker import Worker
skyline_app = 'horizon'
skyline_app_logger = '%sLog' % skyline_app
# logger = logging.getLogger("HorizonLog")
# TODO: http://stackoverflow.com/questions/6728236/exception-thrown-in-multiprocessing-pool-not-detected
logger = logging.getLogger(skyline_app_logger)
skyline_app_logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
skyline_app_loglock = '%s.lock' % skyline_app_logfile
skyline_app_logwait = '%s.wait' % skyline_app_logfile
logfile = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
[docs]class Horizon():
"""
Initializes Horizon
"""
def __init__(self):
self.stdin_path = '/dev/null'
self.stdout_path = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
self.stderr_path = '%s/%s.log' % (settings.LOG_PATH, skyline_app)
self.pidfile_path = '%s/%s.pid' % (settings.PID_PATH, skyline_app)
self.pidfile_timeout = 5
[docs] def run(self):
"""
Determine the `MAX_QUEUE_SIZE` for the listen process.
Determine if horizon should populate the mini redis store for Oculus.
Starts the defined number of `WORKER_PROCESSES`, with the first worker
populating the canary metric.
Start the pickle (and UDP) listen processes.
Start roomba.
"""
logger.info('agent starting skyline %s' % skyline_app)
listen_queue = Queue(maxsize=settings.MAX_QUEUE_SIZE)
pid = getpid()
# If we're not using oculus, don't bother writing to mini
try:
skip_mini = True if settings.OCULUS_HOST == '' else False
except Exception:
skip_mini = True
# Start the workers
for i in range(settings.WORKER_PROCESSES):
if i == 0:
logger.info('%s :: starting Worker - canary' % skyline_app)
Worker(listen_queue, pid, skip_mini, canary=True).start()
else:
logger.info('%s :: starting Worker' % skyline_app)
Worker(listen_queue, pid, skip_mini).start()
# Start the listeners
logger.info('%s :: starting Listen - pickle' % skyline_app)
Listen(settings.PICKLE_PORT, listen_queue, pid, type="pickle").start()
logger.info('%s :: starting Listen - udp' % skyline_app)
Listen(settings.UDP_PORT, listen_queue, pid, type="udp").start()
# Start the roomba
logger.info('%s :: starting Roomba' % skyline_app)
Roomba(pid, skip_mini).start()
# Warn the Mac users
try:
listen_queue.qsize()
except NotImplementedError:
logger.info('WARNING: Queue().qsize() not implemented on Unix platforms like Mac OS X. Queue size logging will be unavailable.')
# Keep yourself occupied, sucka
while 1:
time.sleep(100)
[docs]def run():
"""
Start the Horizon agent and logger.
"""
if not isdir(settings.PID_PATH):
print ('pid directory does not exist at %s' % settings.PID_PATH)
sys.exit(1)
if not isdir(settings.LOG_PATH):
print ('log directory does not exist at %s' % settings.LOG_PATH)
sys.exit(1)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s :: %(process)s :: %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
handler = logging.handlers.TimedRotatingFileHandler(
logfile,
when="midnight",
interval=1,
backupCount=5)
memory_handler = logging.handlers.MemoryHandler(100,
flushLevel=logging.DEBUG,
target=handler)
handler.setFormatter(formatter)
logger.addHandler(memory_handler)
# Validate settings variables
valid_settings = validate_settings_variables(skyline_app)
if not valid_settings:
print ('error :: invalid variables in settings.py - cannot start')
sys.exit(1)
horizon = Horizon()
if len(sys.argv) > 1 and sys.argv[1] == 'run':
horizon.run()
else:
daemon_runner = runner.DaemonRunner(horizon)
daemon_runner.daemon_context.files_preserve = [handler.stream]
daemon_runner.do_action()
if __name__ == "__main__":
run()