#! /usr/bin/python # Eric LaForest, November 2008 # laforest AT eecg.utoronto.ca # Python Remote Objects import Pyro.core import Pyro.naming from Pyro.errors import PyroError, NamingError from subprocess import Popen import os import crypt import pwd # FIXME # I should derive uid/gid from /etc/passwd, but I added authentication late. # Is it at all possible to have a user run a shell under a forged uid/gid? # If so, then checking the uid/gid reported by the client shell against # /etc/passwd is a good thing, else it's a waste. # FIXME # Should merge the command into the client parameters # FIXME # Someone hammering this server could cause a Denial-Of-Service, or at least # load the system enough so as to pollute the measurements of the running job. class server_security_manager: def __init__(self): pass def get_shadow_passwd(self, username): shadow_file = file("/etc/shadow", "r") entries = shadow_file.readlines() shadow_file.close() for entry in entries: fields = entry.split(":") if fields[0] == username: return fields[1] return None def check_password(self, given_password, shadow_password): """See http://linux.die.net/man/3/crypt for salting details.""" if crypt.crypt(given_password, shadow_password) == shadow_password: return True return False def authenticate_user(self, username, password): shadow_password = self.get_shadow_password(username) return self.check_password(password, shadow_password) def verify_process_parameters(self, process_parameters): username = process_parameters['logname'] uid = process_parameters['uid'] gid = process_parameters['gid'] cwd = process_parameters['cwd'] passwd_entry = pwd.getpwnam(username) if uid != passwd_entry.pw_uid: return False if gid != passwd_entry.pw_gid: return False if passwd_entry.pw_dir not in cwd: return False return True def verify(self, process_parameters): username = process_parameters['logname'] password = process_parameters['passwd'] if self.authenticate_user(username, password) == False: return False if self.verify_process_parameters(process_parameters) == False: return False return True class batch_queue_manager: task_list = [] def __init__(self): pass def create_task_entry(self, parameters, command): entry = { # pid == 0 -> not running yet 'pid':0, 'parameters':parameters, 'command':command, } return entry def add_task(self, parameters, command): entry = self.create_task_entry(parameters, command) self.task_list.append(entry) def is_allowed(self, parameters): for entry in self.task_list: if entry['parameters']['logname'] == parameters['logname']: return False return True def no_tasks(self): if len(self.task_list) == 0: return True return False def delete_task(self): if not self.no_tasks(): del self.task_list[0] def set_task_running(self, pid): self.task_list[0]['pid'] = pid def task_still_running(self): if not self.no_tasks(): if self.task_list[0]['pid'] != 0: return True return False def get_task(self): task = self.task_list[0] parameters = task['parameters'] command = task['command'] return parameters, command def get_queue_info(self): return [(task['parameters']['logname'], task['command']) for task in self.task_list] class client_task_manager(Pyro.core.ObjBase): # These aren't really needed. Here for documentation. queue_client_job = None cleanup_child_processes = None run_next_job = None show_queue = None def __init__(self): """Since an instance of this class is published by the server, (and thus its methods are visible to the client) we want to expose only selected methods to the whole class. Everything else is private to the instance by being in the scope of the constructor only. Exposure to the class is done via object references. See http://en.literateprograms.org/Private_class_variables_(Python)""" Pyro.core.ObjBase.__init__(self) queue_manager = batch_queue_manager() security_manager = server_security_manager() # XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX # BE DEAD SURE THESE VALUES ARE CHECKED, ELSE KISS YOUR SECURITY GOODBYE!!! # XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX def set_client_process_parameters(parameters): os.setgid(parameters['gid']) os.setuid(parameters['uid']) os.chdir(parameters['cwd']) def spawn_client_task(parameters, command): pid = os.fork() if pid == 0: # child set_client_process_parameters(parameters) # leave the argument parsing to the final shell command = [parameters['env']['SHELL'], "-c", command] os.execvpe(command[0], command, parameters['env']) else: # parent return pid def spawn_killer_task(client_task_pid, kill_delay = 30): command = "sleep %d ; /bin/kill %d > /dev/null" % (kill_delay, client_task_pid) Popen(command, shell=True) def cleanup_child_processes(): while True: try: (pid, exit_status) = os.waitpid(0,os.WNOHANG) except OSError: # No child processes in existence return if pid == 0: # No child processes finished yet return queue_manager.delete_task() self.cleanup_child_processes = cleanup_child_processes def queue_client_job(parameters, command): """Once a job is on the queue, it's data is assumed to be safe.""" # XXX XXX XXX Security check happens here! XXX XXX XXX if self.security_manager.verify(parameters) == False: return False if queue_manager.is_allowed(parameters): queue_manager.add_task(parameters, command) return True return False self.queue_client_job = queue_client_job def run_next_job(): if queue_manager.task_still_running() or queue_manager.no_tasks(): return else: parameters, command = queue_manager.get_task() pid = spawn_client_task(parameters, command) spawn_killer_task(pid) queue_manager.set_task_running(pid) self.run_next_job = run_next_job def get_queue(): return queue_manager.get_queue_info() self.get_queue = get_queue def unregister_server_name(name_server, batch_server_name): """In case the batch server previously died horribly and didn't unregister from the Pyro Name Server first.""" try: name_server.unregister(batch_server_name) except NamingError: # If not registered pass def locate_name_server(server_address): """See http://pyro.sourceforge.net/manual/10-errors.html""" locator = Pyro.naming.NameServerLocator() try: return locator.getNS(host=server_address) except PyroError: message = """Pyro Name Server not running. Please do 'pyro-ns -n %s' as root.""" % server_address raise PyroError, message def server_daemon(server_name, server_address): """See http://pyro.sourceforge.net/manual/4-usage.html""" Pyro.core.initServer(banner=0) ns = locate_name_server(server_address) daemon = Pyro.core.Daemon() daemon.useNameServer(ns) unregister_server_name(ns, server_name) daemon.connect(server,server_name) return daemon def handle_requests(daemon, server): """See batch client to see which methods are being called.""" while True: server.cleanup_child_processes() server.run_next_job() # FIXME is 3s too much/little timeout? # check the load created by the server daemon.handleRequests(3) def main(): server_name = 'batch_server' server_address = 'localhost' daemon = server_daemon(server_name, server_address) server = client_task_manager() handle_requests(daemon, server) if __name__ == "__main__": main()