diff --git a/DrQueue/client.py b/DrQueue/client.py index cbb5aeaa..837ac0ce 100644 --- a/DrQueue/client.py +++ b/DrQueue/client.py @@ -15,6 +15,7 @@ import time import pickle import datetime +import logging from IPython.parallel import Client as IPClient from IPython.parallel.util import unpack_apply_message from IPython.parallel import dependent @@ -23,6 +24,9 @@ from .computer import Computer as DrQueueComputer +log = logging.getLogger(__name__) + + class Client(): """DrQueue client actions""" def __init__(self): @@ -46,7 +50,6 @@ def job_run(self, job): # check job name if job['name'] in DrQueueJob.query_jobnames(): raise ValueError("Job name %s is already used!" % job['name']) - return False # save job in database job_id = DrQueueJob.store_db(job) @@ -72,21 +75,20 @@ def job_run(self, job): # check frame numbers if not (job['startframe'] >= 1): raise ValueError("Invalid value for startframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= 1): raise ValueError("Invalid value for endframe. Has to be equal or greater than 1.") - return False + if not (job['endframe'] >= job['startframe']): raise ValueError("Invalid value for endframe. Has be to equal or greater than startframe.") - return False + if job['endframe'] > job['startframe']: if not (job['endframe'] - job['startframe'] >= job['blocksize']): raise ValueError("Invalid value for blocksize. Has to be equal or lower than endframe-startframe.") - return False + if job['endframe'] == job['startframe']: if job['blocksize'] != 1: raise ValueError("Invalid value for blocksize. Has to be equal 1 if endframe equals startframe.") - return False task_frames = list(range(job['startframe'], job['endframe'] + 1, job['blocksize'])) ar = None @@ -242,22 +244,22 @@ def identify_computer(self, engine_id, cache_time, timeout=15): now = int(time.time()) # check existence and age of info if (engine != None) and (now <= engine['created_at'] + cache_time): - print("DEBUG: Engine %i was found in DB and info is up-to-date." % engine_id) + log.debug("Engine %i was found in DB and info is up-to-date." % engine_id) return engine # store new info else: if engine != None: - print("DEBUG: Engine %i was found in DB, but info needs to be updated." % engine_id) + log.debug("Engine %i was found in DB, but info needs to be updated." % engine_id) else: - print("DEBUG: Engine %i was not found in DB." % engine_id) + log.debug("Engine %i was not found in DB." % engine_id) # run command only on specific computer try: dview = self.ip_client[engine_id] except IndexError: - print("DEBUG: Engine with id %i unknown." % engine_id) + log.debug("Engine with id %i unknown." % engine_id) # delete old entry from database DrQueueComputer.delete_from_db_by_engine_id(engine_id) - print("DEBUG: Engine with id %i deleted from database." % engine_id) + log.debug("Engine with id %i deleted from database." % engine_id) new_engine = None else: # run command in async mode @@ -269,10 +271,10 @@ def identify_computer(self, engine_id, cache_time, timeout=15): ar.get(timeout) except Exception: if engine != None: - print("DEBUG: Update request for engine %i timed out. Using old information from DB." % engine_id) + log.debug("Update request for engine %i timed out. Using old information from DB." % engine_id) new_engine = engine else: - print("DEBUG: Information request for engine %i timed out." % engine_id) + log.debug("Information request for engine %i timed out." % engine_id) new_engine = None else: # get computer dict from engine namespace @@ -298,7 +300,7 @@ def computer_set_pools(self, computer, pool_list): # update database entry computer['pools'] = pool_list DrQueueComputer.store_db(computer) - print("DEBUG: Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") + log.debug("Engine " + str(computer['engine_id']) + " added to pools " + pool_str + ".") return computer diff --git a/DrQueue/computer_pool.py b/DrQueue/computer_pool.py index 45a1c867..cac2a5a4 100644 --- a/DrQueue/computer_pool.py +++ b/DrQueue/computer_pool.py @@ -10,17 +10,39 @@ """ import os -import getpass +import logging + + +log = logging.getLogger(__name__) + + +try: + import pymongo + import bson +except ImportError as err: + log.debug("Can't import pymongo/bson: %s" % err) + pymongo = bson = None + + + +def get_queue_pools(): + if pymongo is None: + raise RuntimeError("pymongo is needed, please install it!") + + connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) + db = connection['ipythondb'] + pools = db['drqueue_pools'] + return pools class ComputerPool(dict): """Subclass of dict for collecting Pool attribute values.""" - def __init__(self, name, engine_names=[]): + + def __init__(self, name, engine_names=None): dict.__init__(self) - if type(engine_names).__name__ != 'list': + if isinstance(engine_names, list): raise ValueError("argument is not of type list") - return False # mandatory elements pool = { @@ -29,101 +51,66 @@ def __init__(self, name, engine_names=[]): } self.update(pool) - @staticmethod def store_db(pool): - import pymongo """store pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.insert(pool) pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def update_db(pool): - import pymongo """update pool information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_id = pools.save(pool) pool['_id'] = str(pool['_id']) return pool_id - @staticmethod def query_db(pool_id): - import pymongo - import bson """query pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"_id": bson.ObjectId(pool_id)}) return pool - @staticmethod def delete_from_db(pool_id): - import pymongo - import bson """delete pool information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() return pools.remove({"_id": bson.ObjectId(pool_id)}) - @staticmethod def query_poolnames(): - import pymongo """query pool names from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() names = [] for pool in pools.find(): names.append(pool['name']) return names - @staticmethod def query_pool_by_name(pool_name): - import pymongo """query pool information from MongoDB by name""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) return pool - @staticmethod def query_pool_list(): - import pymongo """query list of pools from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool_arr = [] for pool in pools.find(): pool_arr.append(pool) return pool_arr - @staticmethod def query_pool_members(pool_name): - import pymongo """query list of members of pool from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - pools = db['drqueue_pools'] + pools = get_queue_pools() pool = pools.find_one({"name": pool_name}) if pool == None: return None else: return list(pool['engine_names']) - diff --git a/DrQueue/job.py b/DrQueue/job.py index 16f1f7b2..71813eee 100644 --- a/DrQueue/job.py +++ b/DrQueue/job.py @@ -9,11 +9,39 @@ Licensed under GNU General Public License version 3. See LICENSE for details. """ -import os, datetime +import os +import datetime import getpass + +import pymongo +import bson + import DrQueue +def connect_db(): + """ + :return: MongoDB connection object + """ + host=os.getenv('DRQUEUE_MONGODB', None) + if not host: + raise RuntimeError("Error: DRQUEUE_MONGODB not set!") + + print("Connect MongoDB on %s" % host) + connection = pymongo.Connection(host) + db = connection['ipythondb'] + return db + + +def get_jobs(): + """ + :return: return 'drqueue_jobs' + """ + db = connect_db() + jobs = db['drqueue_jobs'] + return jobs + + class Job(dict): """Subclass of dict for collecting Job attribute values.""" def __init__(self, name, startframe, endframe, blocksize, renderer, scenefile, retries=1, owner=getpass.getuser(), options={}, created_with=None, limits={}): @@ -33,21 +61,18 @@ def __init__(self, name, startframe, endframe, blocksize, renderer, scenefile, r 'enabled' : True, 'limits' : {} } + if name == "": raise ValueError("No name of job given!") - return False - if not (endframe >= startframe >= 1): - raise ValueError("Startframe and endframe need to be at least 1!") - return False + if not (endframe > startframe): + raise ValueError("Endframe must be bigger than startframe!") if blocksize < 1: raise ValueError("Blocksize needs to be at least 1!") - return False if DrQueue.check_renderer_support(renderer) == False: raise ValueError("Render called \"%s\" not supported!" % renderer) - return False if scenefile == "": raise ValueError("No scenefile given!") - return False + # optional elements if 'renderdir' in options: jb['renderdir'] = options['renderdir'] @@ -104,11 +129,8 @@ def __init__(self, name, startframe, endframe, blocksize, renderer, scenefile, r @staticmethod def store_db(job): - import pymongo """store job information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() job_id = jobs.insert(job) job['_id'] = str(job['_id']) return job_id @@ -116,11 +138,8 @@ def store_db(job): @staticmethod def update_db(job): - import pymongo """update job information in MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() job_id = jobs.save(job) job['_id'] = str(job['_id']) return job_id @@ -128,12 +147,8 @@ def update_db(job): @staticmethod def query_db(job_id): - import pymongo - import bson """query job information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() try: job = jobs.find_one({"_id": bson.ObjectId(job_id)}) except bson.errors.InvalidId: @@ -144,22 +159,15 @@ def query_db(job_id): @staticmethod def delete_from_db(job_id): - import pymongo - import bson """query job information from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() return jobs.remove({"_id": bson.ObjectId(job_id)}) @staticmethod def query_jobnames(): - import pymongo """query job names from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() names = [] for job in jobs.find(): names.append(job['name']) @@ -168,21 +176,15 @@ def query_jobnames(): @staticmethod def query_job_by_name(job_name): - import pymongo """query job information from MongoDB by name""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() job = jobs.find_one({"name": job_name}) return job @staticmethod def query_job_list(): - import pymongo """query list of jobs from MongoDB""" - connection = pymongo.Connection(os.getenv('DRQUEUE_MONGODB')) - db = connection['ipythondb'] - jobs = db['drqueue_jobs'] + jobs = get_jobs() return list(jobs.find()) diff --git a/bin/drqueue b/bin/drqueue index 78b3b7f9..03aeaca6 100644 --- a/bin/drqueue +++ b/bin/drqueue @@ -1,3 +1,4 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- """ @@ -942,8 +943,6 @@ def slave_daemon(args): ipengine_logpath = os.path.join(os.environ["DRQUEUE_ROOT"], "logs", "ipengine_" + slave_ip + ".log") ipengine_logfile = open(ipengine_logpath, "ab") - dist_egg = pkg_resources.get_distribution("DrQueueIPython") - startup_script = dist_egg.get_resource_filename(__name__, "EGG-INFO/scripts/get_slave_information.py") # register signal handler for SIGINT & SIGTERM signal.signal(signal.SIGTERM, slave_sig_handler) @@ -956,7 +955,7 @@ def slave_daemon(args): while True: # start IPython engine along with startup script print("Connecting to DrQueue master at " + master_ip + ".") - command = "ipengine -s " + startup_script + command = "ipengine -s get_slave_information.py" if (args.no_ssh == True) or LOCALHOST: command += " --ssh=" @@ -969,14 +968,16 @@ def slave_daemon(args): print("IPython engine started with PID " + str(IPENGINE_PID) + ". Logging to " + ipengine_logpath + ".") # wait for process to exit - os.waitpid(IPENGINE_PID, 0) + returncode = ipengine_daemon.wait() + print("IPython was shut down with returncode: %s" % returncode) # run only once if option given if args.no_restart == True: break else: - print("IPython was shut down. Restarting ...") - time.sleep(5) + sec = 5 + print("Restarting IPython after %s sec..." % sec) + time.sleep(sec) def process_exists(process_name): diff --git a/bin/drqueue_slave.py b/bin/drqueue_slave.py index 62d9a76a..2c08fe05 100644 --- a/bin/drqueue_slave.py +++ b/bin/drqueue_slave.py @@ -39,8 +39,6 @@ IPENGINE_PID = None IPENGINE_LOGPATH = os.path.join(os.environ["DRQUEUE_ROOT"], "logs", "ipengine_" + SLAVE_IP + ".log") IPENGINE_LOGFILE = open(IPENGINE_LOGPATH, "ab") -dist_egg = pkg_resources.get_distribution("DrQueueIPython") -STARTUP_SCRIPT = dist_egg.get_resource_filename(__name__, "EGG-INFO/scripts/get_slave_information.py") def sig_handler(signum, frame): @@ -82,7 +80,7 @@ def run_command(command): message = "OSError({0}) while executing command: {1}\n".format(errno, strerror) IPENGINE_LOGFILE.write(message) raise OSError(message) - return False + return p @@ -105,16 +103,18 @@ def main(): # restart ipengine if it was shut down by IPython while True: # start IPython engine along with startup script - command = "ipengine --url tcp://" + MASTER_IP + ":10101 -s " + STARTUP_SCRIPT + command = "ipengine --url tcp://" + MASTER_IP + ":10101 -s get_slave_information.py" ipengine_daemon = run_command(command) IPENGINE_PID = ipengine_daemon.pid print("IPython engine started with PID " + str(IPENGINE_PID) + ". Logging to " + IPENGINE_LOGPATH + ".") # wait for process to exit - os.waitpid(IPENGINE_PID, 0) + returncode = ipengine_daemon.wait() + print("IPython was shut down with returncode: %s" % returncode) - print("IPython was shut down. Restarting ...") - time.sleep(5) + sec = 5 + print("Restarting IPython after %s sec..." % sec) + time.sleep(sec) if __name__== "__main__": diff --git a/setup.py b/setup.py index ef230857..8ee96c41 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,17 @@ -import os, glob, shutil, sys, pwd, grp +#!/usr/bin/env python + +import os, glob, shutil, sys from setuptools import setup from distutils.core import setup, Command +try: + import pwd, grp +except ImportError: + # e.g. under windows + pwd = grp = None + + def read(fname): """Read file contents.""" return open(os.path.join(os.path.dirname(__file__), fname)).read() @@ -93,10 +102,10 @@ def run(self): # Windows lacks Unix functionality if not sys.platform.startswith("win"): # set to user-supplied user / group - if self.owner != None: + if self.owner != None and pwd is not None: uid = pwd.getpwnam(self.owner)[2] recursive_chown(drqueue_root, uid, -1) - if self.group != None: + if self.group != None and grp is not None: gid = grp.getgrnam(self.group)[2] recursive_chown(drqueue_root, -1, gid)