Index: src/supervisor/options.py =================================================================== --- src/supervisor/options.py (revision 881) +++ src/supervisor/options.py (working copy) @@ -681,8 +681,7 @@ path = normalize_path(path) return UnixStreamSocketConfig(path) - tcp_re = re.compile(r'^tcp://([^\s:]+):(\d+)$') - m = tcp_re.match(sock) + m = re.match(r'tcp://([^\s:]+):(\d+)$', sock) if m: host = m.group(1) port = int(m.group(2)) @@ -1557,6 +1556,7 @@ return dispatchers, p class FastCGIProcessConfig(ProcessConfig): + def make_process(self, group=None): if group is None: raise NotImplementedError('FastCGI programs require a group') @@ -1634,9 +1634,18 @@ self.process_configs = process_configs self.socket_manager = socket_manager + def __eq__(self, other): + if not isinstance(other, FastCGIGroupConfig): + return False + + if self.socket_manager.config() != other.socket_manager.config(): + return False + + return ProcessGroupConfig.__eq__(self, other) + def after_setuid(self): ProcessGroupConfig.after_setuid(self) - self.socket_manager.prepare_socket() + self.socket_manager.logger = self.options.logger def readFile(filename, offset, length): """ Read length bytes from the file named by filename starting at Index: src/supervisor/datatypes.py =================================================================== --- src/supervisor/datatypes.py (revision 881) +++ src/supervisor/datatypes.py (working copy) @@ -184,7 +184,22 @@ return '<%s at %s for %s>' % (self.__class__, id(self), self.url) + + def __str__(self): + return str(self.url) + + def __eq__(self, other): + if not isinstance(other, SocketConfig): + return False + if self.url != other.url: + return False + + return True + + def __ne__(self, other): + return not self.__eq__(other) + def addr(self): raise NotImplementedError Index: src/supervisor/tests/test_options.py =================================================================== --- src/supervisor/tests/test_options.py (revision 881) +++ src/supervisor/tests/test_options.py (working copy) @@ -1300,7 +1300,7 @@ def test_ctor(self): options = DummyOptions() - sock_manager = DummySocketManager(6) + sock_manager = DummySocketManager(6, 'conf') instance = self._makeOne(options, 'whatever', 999, [], sock_manager) self.assertEqual(instance.options, options) self.assertEqual(instance.name, 'whatever') @@ -1310,13 +1310,35 @@ def test_after_setuid(self): options = DummyOptions() - sock_manager = DummySocketManager(6) + sock_manager = DummySocketManager(6, 'conf') pconfigs = [DummyPConfig(options, 'process1', '/bin/process1')] instance = self._makeOne(options, 'whatever', 999, pconfigs, sock_manager) instance.after_setuid() self.assertTrue(pconfigs[0].autochildlogs_created) - self.assertTrue(instance.socket_manager.prepare_socket_called) + self.assertTrue(sock_manager.logger is not None) + def test_same_sockets_are_equal(self): + options = DummyOptions() + sock_manager1 = DummySocketManager(6, 'conf') + instance1 = self._makeOne(options, 'whatever', 999, [], sock_manager1) + + sock_manager2 = DummySocketManager(7, 'conf') + instance2 = self._makeOne(options, 'whatever', 999, [], sock_manager2) + + self.assertTrue(instance1 == instance2) + self.assertFalse(instance1 != instance2) + + def test_diff_sockets_are_not_equal(self): + options = DummyOptions() + sock_manager1 = DummySocketManager(6, 'conf') + instance1 = self._makeOne(options, 'whatever', 999, [], sock_manager1) + + sock_manager2 = DummySocketManager(6, 'different conf') + instance2 = self._makeOne(options, 'whatever', 999, [], sock_manager2) + + self.assertTrue(instance1 != instance2) + self.assertFalse(instance1 == instance2) + class UtilFunctionsTests(unittest.TestCase): def test_make_namespec(self): from supervisor.options import make_namespec Index: src/supervisor/tests/test_datatypes.py =================================================================== --- src/supervisor/tests/test_datatypes.py (revision 881) +++ src/supervisor/tests/test_datatypes.py (working copy) @@ -183,6 +183,25 @@ self.assertTrue(reuse) sock.close + def test_same_urls_are_equal(self): + conf1 = self._makeOne('localhost', '5001') + conf2 = self._makeOne('localhost', '5001') + self.assertTrue(conf1 == conf2) + self.assertFalse(conf1 != conf2) + + def test_diff_urls_are_not_equal(self): + conf1 = self._makeOne('localhost', '5001') + conf2 = self._makeOne('localhost', '5002') + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) + + def test_diff_objs_are_not_equal(self): + conf1 = self._makeOne('localhost', '5001') + conf2 = 'blah' + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) + + class UnixStreamSocketConfigTests(unittest.TestCase): def _getTargetClass(self): return datatypes.UnixStreamSocketConfig @@ -213,6 +232,24 @@ sock = conf.create() self.assertFalse(os.path.exists(tf_name)) sock.close + + def test_same_paths_are_equal(self): + conf1 = self._makeOne('/tmp/foo.sock') + conf2 = self._makeOne('/tmp/foo.sock') + self.assertTrue(conf1 == conf2) + self.assertFalse(conf1 != conf2) + + def test_diff_paths_are_not_equal(self): + conf1 = self._makeOne('/tmp/foo.sock') + conf2 = self._makeOne('/tmp/bar.sock') + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) + + def test_diff_objs_are_not_equal(self): + conf1 = self._makeOne('/tmp/foo.sock') + conf2 = 'blah' + self.assertTrue(conf1 != conf2) + self.assertFalse(conf1 == conf2) def test_suite(): return unittest.findTestCases(sys.modules[__name__]) Index: src/supervisor/tests/base.py =================================================================== --- src/supervisor/tests/base.py (revision 881) +++ src/supervisor/tests/base.py (working copy) @@ -1016,13 +1016,13 @@ return DummySocket(self.fd) class DummySocketManager: - def __init__(self, sock_fd): + def __init__(self, sock_fd, config): self.sock_fd = sock_fd - self.prepare_socket_called = False - - def prepare_socket(self): - self.prepare_socket_called = True - + self._config = config + + def config(self): + return self._config + def get_socket(self): return DummySocket(self.sock_fd) Index: src/supervisor/tests/test_process.py =================================================================== --- src/supervisor/tests/test_process.py (revision 881) +++ src/supervisor/tests/test_process.py (working copy) @@ -1185,7 +1185,7 @@ options.forkpid = 0 config = DummyPConfig(options, 'good', '/good/filename', uid=1) instance = self._makeOne(config) - sock_manager = DummySocketManager(7) + sock_manager = DummySocketManager(7, 'conf') gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, sock_manager) instance.group = DummyProcessGroup(gconfig) @@ -1203,7 +1203,7 @@ config = DummyPConfig(options, 'good', '/good/filename', uid=1) config.redirect_stderr = True instance = self._makeOne(config) - sock_manager = DummySocketManager(13) + sock_manager = DummySocketManager(13, 'conf') gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, sock_manager) instance.group = DummyProcessGroup(gconfig) @@ -1212,6 +1212,26 @@ self.assertEqual(len(options.duped), 2) self.assertEqual(options.duped[13], 0) self.assertEqual(len(options.fds_closed), options.minfds - 3) + + def test_before_spawn_gets_socket_ref(self): + options = DummyOptions() + config = DummyPConfig(options, 'good', '/good/filename', uid=1) + instance = self._makeOne(config) + sock_manager = DummySocketManager(7, 'conf') + gconfig = DummyFCGIGroupConfig(options, 'whatever', 999, None, + sock_manager) + instance.group = DummyProcessGroup(gconfig) + self.assertTrue(instance.fcgi_sock is None) + instance.before_spawn() + self.assertFalse(instance.fcgi_sock is None) + + def test_after_finish_removes_socket_ref(self): + options = DummyOptions() + config = DummyPConfig(options, 'good', '/good/filename', uid=1) + instance = self._makeOne(config) + instance.fcgi_sock = 'hello' + instance.after_finish() + self.assertTrue(instance.fcgi_sock is None) class ProcessGroupBaseTests(unittest.TestCase): def _getTargetClass(self): Index: src/supervisor/tests/test_socket_manager.py =================================================================== --- src/supervisor/tests/test_socket_manager.py (revision 881) +++ src/supervisor/tests/test_socket_manager.py (working copy) @@ -10,6 +10,80 @@ from supervisor.datatypes import UnixStreamSocketConfig from supervisor.datatypes import InetStreamSocketConfig +class TestObject(): + + def __init__(self): + self.value = 5 + + def getValue(self): + return self.value + + def setValue(self, val): + self.value = val + +class ProxyTest(unittest.TestCase): + + def setUp(self): + self.onDeleteCalled = False + + def _getTargetClass(self): + from supervisor.socket_manager import Proxy + return Proxy + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def setOnDeleteCalled(self): + self.onDeleteCalled = True + + def test_proxy_getattr(self): + proxy = self._makeOne(TestObject()) + self.assertEquals(5, proxy.getValue()) + + def test_onDelete(self): + proxy = self._makeOne(TestObject(), onDelete=self.setOnDeleteCalled) + self.assertEquals(5, proxy.getValue()) + proxy = None + self.assertTrue(self.onDeleteCalled) + +class ReferenceCounterTest(unittest.TestCase): + + def setUp(self): + self.running = False + + def start(self): + self.running = True + + def stop(self): + self.running = False + + def _getTargetClass(self): + from supervisor.socket_manager import ReferenceCounter + return ReferenceCounter + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_incr_and_decr(self): + ctr = self._makeOne(onZero=self.stop,onNonZero=self.start) + self.assertFalse(self.running) + ctr.increment() + self.assertTrue(self.running) + self.assertEquals(1, ctr.refCount) + ctr.increment() + self.assertTrue(self.running) + self.assertEquals(2, ctr.refCount) + ctr.decrement() + self.assertTrue(self.running) + self.assertEquals(1, ctr.refCount) + ctr.decrement() + self.assertFalse(self.running) + self.assertEquals(0, ctr.refCount) + + def test_decr_at_zero_raises_error(self): + ctr = self._makeOne(onZero=self.stop,onNonZero=self.start) + self.assertRaises(Exception, ctr.decrement) + class SocketManagerTest(unittest.TestCase): def _getTargetClass(self): from supervisor.socket_manager import SocketManager @@ -29,7 +103,6 @@ self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345)) - sock_manager.close() def test_tcp_w_ip(self): conf = InetStreamSocketConfig('127.0.0.1', 12345) @@ -37,7 +110,6 @@ self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), ('127.0.0.1', 12345)) - sock_manager.close() def test_unix(self): (tf_fd, tf_name) = tempfile.mkstemp(); @@ -46,53 +118,57 @@ self.assertEqual(sock_manager.socket_config, conf) sock = sock_manager.get_socket() self.assertEqual(sock.getsockname(), tf_name) - sock_manager.close() + sock = None os.close(tf_fd) def test_get_socket(self): conf = DummySocketConfig(2) sock_manager = self._makeOne(conf) + self.assertFalse(sock_manager.is_prepared()) sock = sock_manager.get_socket() + sock_id = id(sock._get()) + self.assertTrue(sock_manager.is_prepared()) sock2 = sock_manager.get_socket() - self.assertEqual(sock, sock2) - sock_manager.close() + sock2_id = id(sock2._get()) + self.assertNotEqual(sock, sock2) + self.assertEqual(sock_id, sock2_id) + sock = None + self.assertTrue(sock_manager.is_prepared()) + self.assertFalse(sock_manager.socket.close_called) + sock2 = None + self.assertFalse(sock_manager.is_prepared()) + self.assertTrue(sock_manager.socket.close_called) + sock3 = sock_manager.get_socket() - self.assertNotEqual(sock, sock3) + sock3_id = id(sock3._get()) + self.assertTrue(sock_manager.is_prepared()) + del sock3 + self.assertFalse(sock_manager.is_prepared()) + self.assertNotEqual(sock_id, sock3_id) def test_prepare_socket(self): conf = DummySocketConfig(1) sock_manager = self._makeOne(conf) sock = sock_manager.get_socket() - self.assertTrue(sock_manager.prepared) + self.assertTrue(sock_manager.is_prepared()) self.assertTrue(sock.bind_called) self.assertEqual(sock.bind_addr, 'dummy addr') self.assertTrue(sock.listen_called) self.assertEqual(sock.listen_backlog, socket.SOMAXCONN) self.assertFalse(sock.close_called) - - def test_close(self): - conf = DummySocketConfig(6) - sock_manager = self._makeOne(conf) - sock = sock_manager.get_socket() - self.assertFalse(sock.close_called) - self.assertTrue(sock_manager.prepared) - sock_manager.close() - self.assertFalse(sock_manager.prepared) - self.assertTrue(sock.close_called) def test_tcp_socket_already_taken(self): conf = InetStreamSocketConfig('127.0.0.1', 12345) sock_manager = self._makeOne(conf) - sock_manager.get_socket() + sock = sock_manager.get_socket() sock_manager2 = self._makeOne(conf) - self.assertRaises(socket.error, sock_manager2.prepare_socket) - sock_manager.close() + self.assertRaises(socket.error, sock_manager2.get_socket) + sock = None def test_unix_bad_sock(self): conf = UnixStreamSocketConfig('/notthere/foo.sock') sock_manager = self._makeOne(conf) self.assertRaises(socket.error, sock_manager.get_socket) - sock_manager.close() def test_suite(): return unittest.findTestCases(sys.modules[__name__]) Index: src/supervisor/process.py =================================================================== --- src/supervisor/process.py (revision 881) +++ src/supervisor/process.py (working copy) @@ -233,7 +233,7 @@ self._assertInState(ProcessStates.STARTING) self.change_state(ProcessStates.BACKOFF) return - + try: pid = options.fork() except OSError, why: @@ -557,7 +557,14 @@ class FastCGISubprocess(Subprocess): """Extends Subprocess class to handle FastCGI subprocesses""" - def _prepare_child_fds(self): + def __init__(self, config): + Subprocess.__init__(self, config) + self.fcgi_sock = None + + def before_spawn(self): + """ + The FastCGI socket needs to be created by the parent before we fork + """ if self.group is None: raise NotImplementedError('No group set for FastCGISubprocess') if not hasattr(self.group, 'config'): @@ -566,9 +573,36 @@ if not hasattr(self.group.config, 'socket_manager'): raise NotImplementedError('No SocketManager set for ' 'FastCGISubprocess group') - sock = self.group.config.socket_manager.get_socket() - sock_fd = sock.fileno() + self.fcgi_sock = self.group.config.socket_manager.get_socket() + + def spawn(self): + """ + Overrides Subprocess.spawn() so we can hook in before it happens + """ + self.before_spawn() + Subprocess.spawn(self) + def after_finish(self): + """ + Releases reference to FastCGI socket when process is reaped + """ + #Remove object reference to decrement the reference count + self.fcgi_sock = None + + def finish(self, pid, sts): + """ + Overrides Subprocess.finish() so we can hook in after it happens + """ + Subprocess.finish(self, pid, sts) + self.after_finish() + + def _prepare_child_fds(self): + """ + Overrides Subprocess._prepare_child_fds() + The FastCGI socket needs to be set to file descriptor 0 in the child + """ + sock_fd = self.fcgi_sock.fileno() + options = self.config.options options.dup2(sock_fd, 0) options.dup2(self.pipes['child_stdout'], 1) @@ -578,7 +612,7 @@ options.dup2(self.pipes['child_stderr'], 2) for i in range(3, options.minfds): options.close_fd(i) - + class ProcessGroupBase: def __init__(self, config): self.config = config Index: src/supervisor/socket_manager.py =================================================================== --- src/supervisor/socket_manager.py (revision 881) +++ src/supervisor/socket_manager.py (working copy) @@ -14,6 +14,46 @@ import socket +class Proxy(): + """ Class for wrapping a shared resource object and getting + notified when it's deleted + """ + + def __init__(self, object, **kwargs): + self.object = object + self.onDelete = kwargs.get('onDelete', None) + + def __del__(self): + if self.onDelete: + self.onDelete() + + def __getattr__(self, name): + return getattr(self.object, name) + + def _get(self): + return self.object + +class ReferenceCounter(): + """ Class for tracking references to a shared resource + """ + + def __init__(self, **kwargs): + self.onNonZero = kwargs['onNonZero'] + self.onZero = kwargs['onZero'] + self.refCount = 0 + + def increment(self): + if self.refCount == 0: + self.onNonZero() + self.refCount = self.refCount + 1 + + def decrement(self): + if self.refCount <= 0: + raise Exception('Illegal operation: cannot decrement below zero') + self.refCount -= 1 + if self.refCount == 0: + self.onZero() + class SocketManager: """ Class for managing sockets in servers that create/bind/listen before forking multiple child processes to accept() """ @@ -21,9 +61,12 @@ socket_config = None #SocketConfig object socket = None #Socket being managed prepared = False + logger = None - def __init__(self, socket_config): + def __init__(self, socket_config, **kwargs): + self.logger = kwargs.get('logger', None) self.socket_config = socket_config + self.refCtr = ReferenceCounter(onZero=self._close, onNonZero=self._prepare_socket) def __repr__(self): return '<%s at %s for %s>' % (self.__class__, @@ -33,17 +76,27 @@ def config(self): return self.socket_config - def prepare_socket(self): + def is_prepared(self): + return self.prepared + + def get_socket(self): + self.refCtr.increment() + if not self.prepared: + raise Exception('Socket has not been prepared') + return Proxy(self.socket, onDelete=self.refCtr.decrement) + + def _prepare_socket(self): + if self.logger: + self.logger.info('Creating socket %s' % self.socket_config) self.socket = self.socket_config.create() self.socket.bind(self.socket_config.addr()) self.socket.listen(socket.SOMAXCONN) self.prepared = True - def get_socket(self): + def _close(self): if not self.prepared: - self.prepare_socket() - return self.socket - - def close(self): + raise Exception('Socket has not been prepared') + if self.logger: + self.logger.info('Closing socket %s' % self.socket_config) self.socket.close() self.prepared = False