#----------------------------------------------------------- # Threaded, Gevent and Prefork Servers #----------------------------------------------------------- import datetime import errno import logging import os import os.path import platform import random import select import signal import socket import subprocess import sys import threading import time import unittest from itertools import chain import psutil import werkzeug.serving from werkzeug.debug import DebuggedApplication from ..tests import loader if os.name == 'posix': # Unix only for workers import fcntl import resource try: import inotify from inotify.adapters import InotifyTrees from inotify.constants import IN_MODIFY, IN_CREATE, IN_MOVED_TO INOTIFY_LISTEN_EVENTS = IN_MODIFY | IN_CREATE | IN_MOVED_TO except ImportError: inotify = None else: # Windows shim signal.SIGHUP = -1 inotify = None if not inotify: try: import watchdog from watchdog.observers import Observer from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileMovedEvent except ImportError: watchdog = None # Optional process names for workers try: from setproctitle import setproctitle except ImportError: setproctitle = lambda x: None import odoo from odoo.modules import get_modules from odoo.modules.registry import Registry from odoo.release import nt_service_name from odoo.tools import config from odoo.tools import stripped_sys_argv, dumpstacks, log_ormcache_stats _logger = logging.getLogger(__name__) SLEEP_INTERVAL = 60 # 1 min def memory_info(process): """ :return: the relevant memory usage according to the OS in bytes. """ # psutil < 2.0 does not have memory_info, >= 3.0 does not have get_memory_info pmem = (getattr(process, 'memory_info', None) or process.get_memory_info)() # MacOSX allocates very large vms to all processes so we only monitor the rss usage. if platform.system() == 'Darwin': return pmem.rss return pmem.vms def set_limit_memory_hard(): if platform.system() == 'Linux' and config['limit_memory_hard']: rlimit = resource.RLIMIT_AS soft, hard = resource.getrlimit(rlimit) resource.setrlimit(rlimit, (config['limit_memory_hard'], hard)) def empty_pipe(fd): try: while os.read(fd, 1): pass except OSError as e: if e.errno not in [errno.EAGAIN]: raise #---------------------------------------------------------- # Werkzeug WSGI servers patched #---------------------------------------------------------- class LoggingBaseWSGIServerMixIn(object): def handle_error(self, request, client_address): t, e, _ = sys.exc_info() if t == socket.error and e.errno == errno.EPIPE: # broken pipe, ignore error return _logger.exception('Exception happened during processing of request from %s', client_address) class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer): """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer use this class, sets the socket and calls the process_request() manually """ def __init__(self, app): werkzeug.serving.BaseWSGIServer.__init__(self, "127.0.0.1", 0, app) # Directly close the socket. It will be replaced by WorkerHTTP when processing requests if self.socket: self.socket.close() def server_activate(self): # dont listen as we use PreforkServer#socket pass class RequestHandler(werkzeug.serving.WSGIRequestHandler): def setup(self): # timeout to avoid chrome headless preconnect during tests if config['test_enable'] or config['test_file']: self.timeout = 5 # flag the current thread as handling a http request super(RequestHandler, self).setup() me = threading.current_thread() me.name = 'odoo.service.http.request.%s' % (me.ident,) def make_environ(self): environ = super().make_environ() # Add the TCP socket to environ in order for the websocket # connections to use it. environ['socket'] = self.connection if self.headers.get('Upgrade') == 'websocket': # Since the upgrade header is introduced in version 1.1, Firefox # won't accept a websocket connection if the version is set to # 1.0. self.protocol_version = "HTTP/1.1" return environ def send_header(self, keyword, value): # Prevent `WSGIRequestHandler` from sending the connection close header (compatibility with werkzeug >= 2.1.1 ) # since it is incompatible with websocket. if self.headers.get('Upgrade') == 'websocket' and keyword == 'Connection' and value == 'close': # Do not keep processing requests. self.close_connection = True return super().send_header(keyword, value) class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer): """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket given by the environment, this is used by autoreload to keep the listen socket open when a reload happens. """ def __init__(self, host, port, app): # The ODOO_MAX_HTTP_THREADS environment variable allows to limit the amount of concurrent # socket connections accepted by a threaded server, implicitly limiting the amount of # concurrent threads running for http requests handling. self.max_http_threads = os.environ.get("ODOO_MAX_HTTP_THREADS") if self.max_http_threads: try: self.max_http_threads = int(self.max_http_threads) except ValueError: # If the value can't be parsed to an integer then it's computed in an automated way to # half the size of db_maxconn because while most requests won't borrow cursors concurrently # there are some exceptions where some controllers might allocate two or more cursors. self.max_http_threads = config['db_maxconn'] // 2 self.http_threads_sem = threading.Semaphore(self.max_http_threads) super(ThreadedWSGIServerReloadable, self).__init__(host, port, app, handler=RequestHandler) # See https://github.com/pallets/werkzeug/pull/770 # This allow the request threads to not be set as daemon # so the server waits for them when shutting down gracefully. self.daemon_threads = False def server_bind(self): SD_LISTEN_FDS_START = 3 if os.environ.get('LISTEN_FDS') == '1' and os.environ.get('LISTEN_PID') == str(os.getpid()): self.reload_socket = True self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM) _logger.info('HTTP service (werkzeug) running through socket activation') else: self.reload_socket = False super(ThreadedWSGIServerReloadable, self).server_bind() _logger.info('HTTP service (werkzeug) running on %s:%s', self.server_name, self.server_port) def server_activate(self): if not self.reload_socket: super(ThreadedWSGIServerReloadable, self).server_activate() def process_request(self, request, client_address): """ Start a new thread to process the request. Override the default method of class socketserver.ThreadingMixIn to be able to get the thread object which is instantiated and set its start time as an attribute """ t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.type = 'http' t.start_time = time.time() t.start() # TODO: Remove this method as soon as either of the revision # - python/cpython@8b1f52b5a93403acd7d112cd1c1bc716b31a418a for Python 3.6, # - python/cpython@908082451382b8b3ba09ebba638db660edbf5d8e for Python 3.7, # is included in all Python 3 releases installed on all operating systems supported by Odoo. # These revisions are included in Python from releases 3.6.8 and Python 3.7.2 respectively. def _handle_request_noblock(self): """ In the python module `socketserver` `process_request` loop, the __shutdown_request flag is not checked between select and accept. Thus when we set it to `True` thanks to the call `httpd.shutdown`, a last request is accepted before exiting the loop. We override this function to add an additional check before the accept(). """ if self._BaseServer__shutdown_request: return if self.max_http_threads and not self.http_threads_sem.acquire(timeout=0.1): # If the semaphore is full we will return immediately to the upstream (most probably # socketserver.BaseServer's serve_forever loop which will retry immediately as the # selector will find a pending connection to accept on the socket. There is a 100 ms # penalty in such case in order to avoid cpu bound loop while waiting for the semaphore. return # upstream _handle_request_noblock will handle errors and call shutdown_request in any cases super(ThreadedWSGIServerReloadable, self)._handle_request_noblock() def shutdown_request(self, request): if self.max_http_threads: # upstream is supposed to call this function no matter what happens during processing self.http_threads_sem.release() super().shutdown_request(request) #---------------------------------------------------------- # FileSystem Watcher for autoreload and cache invalidation #---------------------------------------------------------- class FSWatcherBase(object): def handle_file(self, path): if path.endswith('.py') and not os.path.basename(path).startswith('.~'): try: source = open(path, 'rb').read() + b'\n' compile(source, path, 'exec') except IOError: _logger.error('autoreload: python code change detected, IOError for %s', path) except SyntaxError: _logger.error('autoreload: python code change detected, SyntaxError in %s', path) else: if not getattr(odoo, 'phoenix', False): _logger.info('autoreload: python code updated, autoreload activated') restart() return True class FSWatcherWatchdog(FSWatcherBase): def __init__(self): self.observer = Observer() for path in odoo.addons.__path__: _logger.info('Watching addons folder %s', path) self.observer.schedule(self, path, recursive=True) def dispatch(self, event): if isinstance(event, (FileCreatedEvent, FileModifiedEvent, FileMovedEvent)): if not event.is_directory: path = getattr(event, 'dest_path', '') or event.src_path self.handle_file(path) def start(self): self.observer.start() _logger.info('AutoReload watcher running with watchdog') def stop(self): self.observer.stop() self.observer.join() class FSWatcherInotify(FSWatcherBase): def __init__(self): self.started = False # ignore warnings from inotify in case we have duplicate addons paths. inotify.adapters._LOGGER.setLevel(logging.ERROR) # recreate a list as InotifyTrees' __init__ deletes the list's items paths_to_watch = [] for path in odoo.addons.__path__: paths_to_watch.append(path) _logger.info('Watching addons folder %s', path) self.watcher = InotifyTrees(paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=.5) def run(self): _logger.info('AutoReload watcher running with inotify') dir_creation_events = set(('IN_MOVED_TO', 'IN_CREATE')) while self.started: for event in self.watcher.event_gen(timeout_s=0, yield_nones=False): (_, type_names, path, filename) = event if 'IN_ISDIR' not in type_names: # despite not having IN_DELETE in the watcher's mask, the # watcher sends these events when a directory is deleted. if 'IN_DELETE' not in type_names: full_path = os.path.join(path, filename) if self.handle_file(full_path): return elif dir_creation_events.intersection(type_names): full_path = os.path.join(path, filename) for root, _, files in os.walk(full_path): for file in files: if self.handle_file(os.path.join(root, file)): return def start(self): self.started = True self.thread = threading.Thread(target=self.run, name="odoo.service.autoreload.watcher") self.thread.daemon = True self.thread.start() def stop(self): self.started = False self.thread.join() del self.watcher # ensures inotify watches are freed up before reexec #---------------------------------------------------------- # Servers: Threaded, Gevented and Prefork #---------------------------------------------------------- class CommonServer(object): _on_stop_funcs = [] def __init__(self, app): self.app = app # config self.interface = config['http_interface'] or '0.0.0.0' self.port = config['http_port'] # runtime self.pid = os.getpid() def close_socket(self, sock): """ Closes a socket instance cleanly :param sock: the network socket to close :type sock: socket.socket """ try: sock.shutdown(socket.SHUT_RDWR) except socket.error as e: if e.errno == errno.EBADF: # Werkzeug > 0.9.6 closes the socket itself (see commit # https://github.com/mitsuhiko/werkzeug/commit/4d8ca089) return # On OSX, socket shutdowns both sides if any side closes it # causing an error 57 'Socket is not connected' on shutdown # of the other side (or something), see # http://bugs.python.org/issue4397 # note: stdlib fixed test, not behavior if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']: raise sock.close() @classmethod def on_stop(cls, func): """ Register a cleanup function to be executed when the server stops """ cls._on_stop_funcs.append(func) def stop(self): for func in self._on_stop_funcs: try: _logger.debug("on_close call %s", func) func() except Exception: _logger.warning("Exception in %s", func.__name__, exc_info=True) class ThreadedServer(CommonServer): def __init__(self, app): super(ThreadedServer, self).__init__(app) self.main_thread_id = threading.current_thread().ident # Variable keeping track of the number of calls to the signal handler defined # below. This variable is monitored by ``quit_on_signals()``. self.quit_signals_received = 0 #self.socket = None self.httpd = None self.limits_reached_threads = set() self.limit_reached_time = None def signal_handler(self, sig, frame): if sig in [signal.SIGINT, signal.SIGTERM]: # shutdown on kill -INT or -TERM self.quit_signals_received += 1 if self.quit_signals_received > 1: # logging.shutdown was already called at this point. sys.stderr.write("Forced shutdown.\n") os._exit(0) # interrupt run() to start shutdown raise KeyboardInterrupt() elif hasattr(signal, 'SIGXCPU') and sig == signal.SIGXCPU: sys.stderr.write("CPU time limit exceeded! Shutting down immediately\n") sys.stderr.flush() os._exit(0) elif sig == signal.SIGHUP: # restart on kill -HUP odoo.phoenix = True self.quit_signals_received += 1 # interrupt run() to start shutdown raise KeyboardInterrupt() def process_limit(self): memory = memory_info(psutil.Process(os.getpid())) if config['limit_memory_soft'] and memory > config['limit_memory_soft']: _logger.warning('Server memory limit (%s) reached.', memory) self.limits_reached_threads.add(threading.current_thread()) for thread in threading.enumerate(): thread_type = getattr(thread, 'type', None) if not thread.daemon and thread_type != 'websocket' or thread_type == 'cron': # We apply the limits on cron threads and HTTP requests, # websocket requests excluded. if getattr(thread, 'start_time', None): thread_execution_time = time.time() - thread.start_time thread_limit_time_real = config['limit_time_real'] if (getattr(thread, 'type', None) == 'cron' and config['limit_time_real_cron'] and config['limit_time_real_cron'] > 0): thread_limit_time_real = config['limit_time_real_cron'] if thread_limit_time_real and thread_execution_time > thread_limit_time_real: _logger.warning( 'Thread %s virtual real time limit (%d/%ds) reached.', thread, thread_execution_time, thread_limit_time_real) self.limits_reached_threads.add(thread) # Clean-up threads that are no longer alive # e.g. threads that exceeded their real time, # but which finished before the server could restart. for thread in list(self.limits_reached_threads): if not thread.is_alive(): self.limits_reached_threads.remove(thread) if self.limits_reached_threads: self.limit_reached_time = self.limit_reached_time or time.time() else: self.limit_reached_time = None def cron_thread(self, number): # Steve Reich timing style with thundering herd mitigation. # # On startup, all workers bind on a notification channel in # postgres so they can be woken up at will. At worst they wake # up every SLEEP_INTERVAL with a jitter. The jitter creates a # chorus effect that helps distribute on the timeline the moment # when individual worker wake up. # # On NOTIFY, all workers are awaken at the same time, sleeping # just a bit prevents they all poll the database at the exact # same time. This is known as the thundering herd effect. from odoo.addons.base.models.ir_cron import ir_cron conn = odoo.sql_db.db_connect('postgres') with conn.cursor() as cr: pg_conn = cr._cnx # LISTEN / NOTIFY doesn't work in recovery mode cr.execute("SELECT pg_is_in_recovery()") in_recovery = cr.fetchone()[0] if not in_recovery: cr.execute("LISTEN cron_trigger") else: _logger.warning("PG cluster in recovery mode, cron trigger not activated") cr.commit() while True: select.select([pg_conn], [], [], SLEEP_INTERVAL + number) time.sleep(number / 100) pg_conn.poll() registries = odoo.modules.registry.Registry.registries _logger.debug('cron%d polling for jobs', number) for db_name, registry in registries.d.items(): if registry.ready: thread = threading.current_thread() thread.start_time = time.time() try: ir_cron._process_jobs(db_name) except Exception: _logger.warning('cron%d encountered an Exception:', number, exc_info=True) thread.start_time = None def cron_spawn(self): """ Start the above runner function in a daemon thread. The thread is a typical daemon thread: it will never quit and must be terminated when the main process exits - with no consequence (the processing threads it spawns are not marked daemon). """ # Force call to strptime just before starting the cron thread # to prevent time.strptime AttributeError within the thread. # See: http://bugs.python.org/issue7980 datetime.datetime.strptime('2012-01-01', '%Y-%m-%d') for i in range(odoo.tools.config['max_cron_threads']): def target(): self.cron_thread(i) t = threading.Thread(target=target, name="odoo.service.cron.cron%d" % i) t.daemon = True t.type = 'cron' t.start() _logger.debug("cron%d started!" % i) def http_thread(self): self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, self.app) self.httpd.serve_forever() def http_spawn(self): t = threading.Thread(target=self.http_thread, name="odoo.service.httpd") t.daemon = True t.start() def start(self, stop=False): _logger.debug("Setting signal handlers") set_limit_memory_hard() if os.name == 'posix': signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGCHLD, self.signal_handler) signal.signal(signal.SIGHUP, self.signal_handler) signal.signal(signal.SIGXCPU, self.signal_handler) signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGUSR1, log_ormcache_stats) elif os.name == 'nt': import win32api win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1) test_mode = config['test_enable'] or config['test_file'] if test_mode or (config['http_enable'] and not stop): # some tests need the http daemon to be available... self.http_spawn() def stop(self): """ Shutdown the WSGI server. Wait for non daemon threads. """ if getattr(odoo, 'phoenix', None): _logger.info("Initiating server reload") else: _logger.info("Initiating shutdown") _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.") stop_time = time.time() if self.httpd: self.httpd.shutdown() super().stop() # Manually join() all threads before calling sys.exit() to allow a second signal # to trigger _force_quit() in case some non-daemon threads won't exit cleanly. # threading.Thread.join() should not mask signals (at least in python 2.5). me = threading.current_thread() _logger.debug('current thread: %r', me) for thread in threading.enumerate(): _logger.debug('process %r (%r)', thread, thread.daemon) if (thread != me and not thread.daemon and thread.ident != self.main_thread_id and thread not in self.limits_reached_threads): while thread.is_alive() and (time.time() - stop_time) < 1: # We wait for requests to finish, up to 1 second. _logger.debug('join and sleep') # Need a busyloop here as thread.join() masks signals # and would prevent the forced shutdown. thread.join(0.05) time.sleep(0.05) odoo.sql_db.close_all() _logger.debug('--') logging.shutdown() def run(self, preload=None, stop=False): """ Start the http server and the cron thread then wait for a signal. The first SIGINT or SIGTERM signal will initiate a graceful shutdown while a second one if any will force an immediate exit. """ self.start(stop=stop) rc = preload_registries(preload) if stop: if config['test_enable']: logger = odoo.tests.result._logger with Registry.registries._lock: for db, registry in Registry.registries.d.items(): report = registry._assertion_report log = logger.error if not report.wasSuccessful() \ else logger.warning if not report.testsRun \ else logger.info log("%s when loading database %r", report, db) self.stop() return rc self.cron_spawn() # Wait for a first signal to be handled. (time.sleep will be interrupted # by the signal handler) try: while self.quit_signals_received == 0: self.process_limit() if self.limit_reached_time: has_other_valid_requests = any( not t.daemon and t not in self.limits_reached_threads for t in threading.enumerate() if getattr(t, 'type', None) == 'http') if (not has_other_valid_requests or (time.time() - self.limit_reached_time) > SLEEP_INTERVAL): # We wait there is no processing requests # other than the ones exceeding the limits, up to 1 min, # before asking for a reload. _logger.info('Dumping stacktrace of limit exceeding threads before reloading') dumpstacks(thread_idents=[thread.ident for thread in self.limits_reached_threads]) self.reload() # `reload` increments `self.quit_signals_received` # and the loop will end after this iteration, # therefore leading to the server stop. # `reload` also sets the `phoenix` flag # to tell the server to restart the server after shutting down. else: time.sleep(1) else: time.sleep(SLEEP_INTERVAL) except KeyboardInterrupt: pass self.stop() def reload(self): os.kill(self.pid, signal.SIGHUP) class GeventServer(CommonServer): def __init__(self, app): super(GeventServer, self).__init__(app) self.port = config['gevent_port'] self.httpd = None def process_limits(self): restart = False if self.ppid != os.getppid(): _logger.warning("Gevent Parent changed: %s", self.pid) restart = True memory = memory_info(psutil.Process(self.pid)) if config['limit_memory_soft'] and memory > config['limit_memory_soft']: _logger.warning('Gevent virtual memory limit reached: %s', memory) restart = True if restart: # suicide !! os.kill(self.pid, signal.SIGTERM) def watchdog(self, beat=4): import gevent self.ppid = os.getppid() while True: self.process_limits() gevent.sleep(beat) def start(self): import gevent try: from gevent.pywsgi import WSGIServer, WSGIHandler except ImportError: from gevent.wsgi import WSGIServer, WSGIHandler class ProxyHandler(WSGIHandler): """ When logging requests, try to get the client address from the environment so we get proxyfix's modifications (if any). Derived from werzeug.serving.WSGIRequestHandler.log / werzeug.serving.WSGIRequestHandler.address_string """ def _connection_upgrade_requested(self): if self.headers.get('Connection', '').lower() == 'upgrade': return True if self.headers.get('Upgrade', '').lower() == 'websocket': return True return False def format_request(self): old_address = self.client_address if getattr(self, 'environ', None): self.client_address = self.environ['REMOTE_ADDR'] elif not self.client_address: self.client_address = '' # other cases are handled inside WSGIHandler try: return super().format_request() finally: self.client_address = old_address def finalize_headers(self): # We need to make gevent.pywsgi stop dealing with chunks when the connection # Is being upgraded. see https://github.com/gevent/gevent/issues/1712 super().finalize_headers() if self.code == 101: # Switching Protocols. Disable chunked writes. self.response_use_chunked = False def get_environ(self): # Add the TCP socket to environ in order for the websocket # connections to use it. environ = super().get_environ() environ['socket'] = self.socket # Disable support for HTTP chunking on reads which cause # an issue when the connection is being upgraded, see # https://github.com/gevent/gevent/issues/1712 if self._connection_upgrade_requested(): environ['wsgi.input'] = self.rfile environ['wsgi.input_terminated'] = False return environ set_limit_memory_hard() if os.name == 'posix': # Set process memory limit as an extra safeguard signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGUSR1, log_ormcache_stats) gevent.spawn(self.watchdog) self.httpd = WSGIServer( (self.interface, self.port), self.app, log=logging.getLogger('longpolling'), error_log=logging.getLogger('longpolling'), handler_class=ProxyHandler, ) _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port) try: self.httpd.serve_forever() except: _logger.exception("Evented Service (longpolling): uncaught error during main loop") raise def stop(self): import gevent self.httpd.stop() super().stop() gevent.shutdown() def run(self, preload, stop): self.start() self.stop() class PreforkServer(CommonServer): """ Multiprocessing inspired by (g)unicorn. PreforkServer (aka Multicorn) currently uses accept(2) as dispatching method between workers but we plan to replace it by a more intelligent dispatcher to will parse the first HTTP request line. """ def __init__(self, app): super().__init__(app) # config self.population = config['workers'] self.timeout = config['limit_time_real'] self.limit_request = config['limit_request'] self.cron_timeout = config['limit_time_real_cron'] or None if self.cron_timeout == -1: self.cron_timeout = self.timeout # working vars self.beat = 4 self.socket = None self.workers_http = {} self.workers_cron = {} self.workers = {} self.generation = 0 self.queue = [] self.long_polling_pid = None def pipe_new(self): pipe = os.pipe() for fd in pipe: # non_blocking flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) # close_on_exec flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, flags) return pipe def pipe_ping(self, pipe): try: os.write(pipe[1], b'.') except IOError as e: if e.errno not in [errno.EAGAIN, errno.EINTR]: raise def signal_handler(self, sig, frame): if len(self.queue) < 5 or sig == signal.SIGCHLD: self.queue.append(sig) self.pipe_ping(self.pipe) else: _logger.warning("Dropping signal: %s", sig) def worker_spawn(self, klass, workers_registry): self.generation += 1 worker = klass(self) pid = os.fork() if pid != 0: worker.pid = pid self.workers[pid] = worker workers_registry[pid] = worker return worker else: worker.run() sys.exit(0) def long_polling_spawn(self): nargs = stripped_sys_argv() cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:] popen = subprocess.Popen(cmd) self.long_polling_pid = popen.pid def worker_pop(self, pid): if pid == self.long_polling_pid: self.long_polling_pid = None if pid in self.workers: _logger.debug("Worker (%s) unregistered", pid) try: self.workers_http.pop(pid, None) self.workers_cron.pop(pid, None) u = self.workers.pop(pid) u.close() except OSError: return def worker_kill(self, pid, sig): try: os.kill(pid, sig) except OSError as e: if e.errno == errno.ESRCH: self.worker_pop(pid) def process_signals(self): while len(self.queue): sig = self.queue.pop(0) if sig in [signal.SIGINT, signal.SIGTERM]: raise KeyboardInterrupt elif sig == signal.SIGHUP: # restart on kill -HUP odoo.phoenix = True raise KeyboardInterrupt elif sig == signal.SIGQUIT: # dump stacks on kill -3 dumpstacks() elif sig == signal.SIGUSR1: # log ormcache stats on kill -SIGUSR1 log_ormcache_stats() elif sig == signal.SIGTTIN: # increase number of workers self.population += 1 elif sig == signal.SIGTTOU: # decrease number of workers self.population -= 1 def process_zombie(self): # reap dead workers while 1: try: wpid, status = os.waitpid(-1, os.WNOHANG) if not wpid: break if (status >> 8) == 3: msg = "Critial worker error (%s)" _logger.critical(msg, wpid) raise Exception(msg % wpid) self.worker_pop(wpid) except OSError as e: if e.errno == errno.ECHILD: break raise def process_timeout(self): now = time.time() for (pid, worker) in self.workers.items(): if worker.watchdog_timeout is not None and \ (now - worker.watchdog_time) >= worker.watchdog_timeout: _logger.error("%s (%s) timeout after %ss", worker.__class__.__name__, pid, worker.watchdog_timeout) self.worker_kill(pid, signal.SIGKILL) def process_spawn(self): if config['http_enable']: while len(self.workers_http) < self.population: self.worker_spawn(WorkerHTTP, self.workers_http) if not self.long_polling_pid: self.long_polling_spawn() while len(self.workers_cron) < config['max_cron_threads']: self.worker_spawn(WorkerCron, self.workers_cron) def sleep(self): try: # map of fd -> worker fds = {w.watchdog_pipe[0]: w for w in self.workers.values()} fd_in = list(fds) + [self.pipe[0]] # check for ping or internal wakeups ready = select.select(fd_in, [], [], self.beat) # update worker watchdogs for fd in ready[0]: if fd in fds: fds[fd].watchdog_time = time.time() empty_pipe(fd) except select.error as e: if e.args[0] not in [errno.EINTR]: raise def start(self): # wakeup pipe, python doesn't throw EINTR when a syscall is interrupted # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the # signal handler to overcome this behaviour self.pipe = self.pipe_new() # set signal handlers signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGHUP, self.signal_handler) signal.signal(signal.SIGCHLD, self.signal_handler) signal.signal(signal.SIGTTIN, self.signal_handler) signal.signal(signal.SIGTTOU, self.signal_handler) signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGUSR1, log_ormcache_stats) if config['http_enable']: # listen to socket _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port) family = socket.AF_INET if ':' in self.interface: family = socket.AF_INET6 self.socket = socket.socket(family, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setblocking(0) self.socket.bind((self.interface, self.port)) self.socket.listen(8 * self.population) def stop(self, graceful=True): if self.long_polling_pid is not None: # FIXME make longpolling process handle SIGTERM correctly self.worker_kill(self.long_polling_pid, signal.SIGKILL) self.long_polling_pid = None if self.socket: self.socket.close() if graceful: _logger.info("Stopping gracefully") super().stop() limit = time.time() + self.timeout for pid in self.workers: self.worker_kill(pid, signal.SIGINT) while self.workers and time.time() < limit: try: self.process_signals() except KeyboardInterrupt: _logger.info("Forced shutdown.") break self.process_zombie() time.sleep(0.1) else: _logger.info("Stopping forcefully") for pid in self.workers: self.worker_kill(pid, signal.SIGTERM) def run(self, preload, stop): self.start() rc = preload_registries(preload) if stop: self.stop() return rc # Empty the cursor pool, we dont want them to be shared among forked workers. odoo.sql_db.close_all() _logger.debug("Multiprocess starting") while 1: try: #_logger.debug("Multiprocess beat (%s)",time.time()) self.process_signals() self.process_zombie() self.process_timeout() self.process_spawn() self.sleep() except KeyboardInterrupt: _logger.debug("Multiprocess clean stop") self.stop() break except Exception as e: _logger.exception(e) self.stop(False) return -1 class Worker(object): """ Workers """ def __init__(self, multi): self.multi = multi self.watchdog_time = time.time() self.watchdog_pipe = multi.pipe_new() self.eintr_pipe = multi.pipe_new() self.wakeup_fd_r, self.wakeup_fd_w = self.eintr_pipe # Can be set to None if no watchdog is desired. self.watchdog_timeout = multi.timeout self.ppid = os.getpid() self.pid = None self.alive = True # should we rename into lifetime ? self.request_max = multi.limit_request self.request_count = 0 def setproctitle(self, title=""): setproctitle('odoo: %s %s %s' % (self.__class__.__name__, self.pid, title)) def close(self): os.close(self.watchdog_pipe[0]) os.close(self.watchdog_pipe[1]) os.close(self.eintr_pipe[0]) os.close(self.eintr_pipe[1]) def signal_handler(self, sig, frame): self.alive = False def signal_time_expired_handler(self, n, stack): # TODO: print actual RUSAGE_SELF (since last check_limits) instead of # just repeating the config setting _logger.info('Worker (%d) CPU time limit (%s) reached.', self.pid, config['limit_time_cpu']) # We dont suicide in such case raise Exception('CPU time limit exceeded.') def sleep(self): try: select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat) # clear wakeup pipe if we were interrupted empty_pipe(self.wakeup_fd_r) except select.error as e: if e.args[0] not in [errno.EINTR]: raise def check_limits(self): # If our parent changed suicide if self.ppid != os.getppid(): _logger.info("Worker (%s) Parent changed", self.pid) self.alive = False # check for lifetime if self.request_count >= self.request_max: _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count) self.alive = False # Reset the worker if it consumes too much memory (e.g. caused by a memory leak). memory = memory_info(psutil.Process(os.getpid())) if config['limit_memory_soft'] and memory > config['limit_memory_soft']: _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, memory) self.alive = False # Commit suicide after the request. set_limit_memory_hard() # update RLIMIT_CPU so limit_time_cpu applies per unit of work r = resource.getrusage(resource.RUSAGE_SELF) cpu_time = r.ru_utime + r.ru_stime soft, hard = resource.getrlimit(resource.RLIMIT_CPU) resource.setrlimit(resource.RLIMIT_CPU, (int(cpu_time + config['limit_time_cpu']), hard)) def process_work(self): pass def start(self): self.pid = os.getpid() self.setproctitle() _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid) # Reseed the random number generator random.seed() if self.multi.socket: # Prevent fd inheritance: close_on_exec flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags) # reset blocking status self.multi.socket.setblocking(0) signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGXCPU, self.signal_time_expired_handler) signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGHUP, signal.SIG_DFL) signal.signal(signal.SIGCHLD, signal.SIG_DFL) signal.signal(signal.SIGTTIN, signal.SIG_DFL) signal.signal(signal.SIGTTOU, signal.SIG_DFL) signal.set_wakeup_fd(self.wakeup_fd_w) def stop(self): pass def run(self): try: self.start() t = threading.Thread(name="Worker %s (%s) workthread" % (self.__class__.__name__, self.pid), target=self._runloop) t.daemon = True t.start() t.join() _logger.info("Worker (%s) exiting. request_count: %s, registry count: %s.", self.pid, self.request_count, len(odoo.modules.registry.Registry.registries)) self.stop() except Exception: _logger.exception("Worker (%s) Exception occurred, exiting...", self.pid) # should we use 3 to abort everything ? sys.exit(1) def _runloop(self): signal.pthread_sigmask(signal.SIG_BLOCK, { signal.SIGXCPU, signal.SIGINT, signal.SIGQUIT, signal.SIGUSR1, }) try: while self.alive: self.check_limits() self.multi.pipe_ping(self.watchdog_pipe) self.sleep() if not self.alive: break self.process_work() except: _logger.exception("Worker %s (%s) Exception occurred, exiting...", self.__class__.__name__, self.pid) sys.exit(1) class WorkerHTTP(Worker): """ HTTP Request workers """ def __init__(self, multi): super(WorkerHTTP, self).__init__(multi) # The ODOO_HTTP_SOCKET_TIMEOUT environment variable allows to control socket timeout for # extreme latency situations. It's generally better to use a good buffering reverse proxy # to quickly free workers rather than increasing this timeout to accommodate high network # latencies & b/w saturation. This timeout is also essential to protect against accidental # DoS due to idle HTTP connections. sock_timeout = os.environ.get("ODOO_HTTP_SOCKET_TIMEOUT") self.sock_timeout = float(sock_timeout) if sock_timeout else 2 def process_request(self, client, addr): client.setblocking(1) client.settimeout(self.sock_timeout) client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Prevent fd inherientence close_on_exec flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC fcntl.fcntl(client, fcntl.F_SETFD, flags) # do request using BaseWSGIServerNoBind monkey patched with socket self.server.socket = client # tolerate broken pipe when the http client closes the socket before # receiving the full reply try: self.server.process_request(client, addr) except IOError as e: if e.errno != errno.EPIPE: raise self.request_count += 1 def process_work(self): try: client, addr = self.multi.socket.accept() self.process_request(client, addr) except socket.error as e: if e.errno not in (errno.EAGAIN, errno.ECONNABORTED): raise def start(self): Worker.start(self) self.server = BaseWSGIServerNoBind(self.multi.app) class WorkerCron(Worker): """ Cron workers """ def __init__(self, multi): super(WorkerCron, self).__init__(multi) # process_work() below process a single database per call. # The variable db_index is keeping track of the next database to # process. self.db_index = 0 self.watchdog_timeout = multi.cron_timeout # Use a distinct value for CRON Worker def sleep(self): # Really sleep once all the databases have been processed. if self.db_index == 0: interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect # simulate interruptible sleep with select(wakeup_fd, timeout) try: select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval) # clear pg_conn/wakeup pipe if we were interrupted time.sleep(self.pid / 100 % .1) self.dbcursor._cnx.poll() empty_pipe(self.wakeup_fd_r) except select.error as e: if e.args[0] != errno.EINTR: raise def _db_list(self): if config['db_name']: db_names = config['db_name'].split(',') else: db_names = odoo.service.db.list_dbs(True) return db_names def process_work(self): _logger.debug("WorkerCron (%s) polling for jobs", self.pid) db_names = self._db_list() if len(db_names): self.db_index = (self.db_index + 1) % len(db_names) db_name = db_names[self.db_index] self.setproctitle(db_name) from odoo.addons import base base.models.ir_cron.ir_cron._process_jobs(db_name) # dont keep cursors in multi database mode if len(db_names) > 1: odoo.sql_db.close_db(db_name) self.request_count += 1 if self.request_count >= self.request_max and self.request_max < len(db_names): _logger.error("There are more dabatases to process than allowed " "by the `limit_request` configuration variable: %s more.", len(db_names) - self.request_max) else: self.db_index = 0 def start(self): os.nice(10) # mommy always told me to be nice with others... Worker.start(self) if self.multi.socket: self.multi.socket.close() dbconn = odoo.sql_db.db_connect('postgres') self.dbcursor = dbconn.cursor() # LISTEN / NOTIFY doesn't work in recovery mode self.dbcursor.execute("SELECT pg_is_in_recovery()") in_recovery = self.dbcursor.fetchone()[0] if not in_recovery: self.dbcursor.execute("LISTEN cron_trigger") else: _logger.warning("PG cluster in recovery mode, cron trigger not activated") self.dbcursor.commit() def stop(self): super().stop() self.dbcursor.close() #---------------------------------------------------------- # start/stop public api #---------------------------------------------------------- server = None def load_server_wide_modules(): server_wide_modules = {'base', 'web'} | set(odoo.conf.server_wide_modules) for m in server_wide_modules: try: odoo.modules.module.load_openerp_module(m) except Exception: msg = '' if m == 'web': msg = """ The `web` module is provided by the addons found in the `openerp-web` project. Maybe you forgot to add those addons in your addons_path configuration.""" _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) def _reexec(updated_modules=None): """reexecute openerp-server process with (nearly) the same arguments""" if odoo.tools.osutil.is_running_as_nt_service(): subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True) exe = os.path.basename(sys.executable) args = stripped_sys_argv() if updated_modules: args += ["-u", ','.join(updated_modules)] if not args or args[0] != exe: args.insert(0, exe) # We should keep the LISTEN_* environment variabled in order to support socket activation on reexec os.execve(sys.executable, args, os.environ) def load_test_file_py(registry, test_file): # pylint: disable=import-outside-toplevel from odoo.tests.suite import OdooSuite threading.current_thread().testing = True try: test_path, _ = os.path.splitext(os.path.abspath(test_file)) for mod in [m for m in get_modules() if '%s%s%s' % (os.path.sep, m, os.path.sep) in test_file]: for mod_mod in loader.get_test_modules(mod): mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', '')) if test_path == config._normalize(mod_path): tests = loader.unwrap_suite( unittest.TestLoader().loadTestsFromModule(mod_mod)) suite = OdooSuite(tests) _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__) suite(registry._assertion_report) if not registry._assertion_report.wasSuccessful(): _logger.error('%s: at least one error occurred in a test', test_file) return finally: threading.current_thread().testing = False def preload_registries(dbnames): """ Preload a registries, possibly run a test file.""" # TODO: move all config checks to args dont check tools.config here dbnames = dbnames or [] rc = 0 for dbname in dbnames: try: update_module = config['init'] or config['update'] registry = Registry.new(dbname, update_module=update_module) # run test_file if provided if config['test_file']: test_file = config['test_file'] if not os.path.isfile(test_file): _logger.warning('test file %s cannot be found', test_file) elif not test_file.endswith('py'): _logger.warning('test file %s is not a python file', test_file) else: _logger.info('loading test file %s', test_file) load_test_file_py(registry, test_file) # run post-install tests if config['test_enable']: t0 = time.time() t0_sql = odoo.sql_db.sql_counter module_names = (registry.updated_modules if update_module else sorted(registry._init_modules)) _logger.info("Starting post tests") tests_before = registry._assertion_report.testsRun post_install_suite = loader.make_suite(module_names, 'post_install') if post_install_suite.has_http_case(): with registry.cursor() as cr: env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {}) env['ir.qweb']._pregenerate_assets_bundles() result = loader.run_suite(post_install_suite) registry._assertion_report.update(result) _logger.info("%d post-tests in %.2fs, %s queries", registry._assertion_report.testsRun - tests_before, time.time() - t0, odoo.sql_db.sql_counter - t0_sql) registry._assertion_report.log_stats() if not registry._assertion_report.wasSuccessful(): rc += 1 except Exception: _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True) return -1 return rc def start(preload=None, stop=False): """ Start the odoo http server and cron processor. """ global server load_server_wide_modules() if odoo.evented: server = GeventServer(odoo.http.root) elif config['workers']: if config['test_enable'] or config['test_file']: _logger.warning("Unit testing in workers mode could fail; use --workers 0.") server = PreforkServer(odoo.http.root) # Workaround for Python issue24291, fixed in 3.6 (see Python issue26721) if sys.version_info[:2] == (3,5): # turn on buffering also for wfile, to avoid partial writes (Default buffer = 8k) werkzeug.serving.WSGIRequestHandler.wbufsize = -1 else: if platform.system() == "Linux" and sys.maxsize > 2**32 and "MALLOC_ARENA_MAX" not in os.environ: # glibc's malloc() uses arenas [1] in order to efficiently handle memory allocation of multi-threaded # applications. This allows better memory allocation handling in case of multiple threads that # would be using malloc() concurrently [2]. # Due to the python's GIL, this optimization have no effect on multithreaded python programs. # Unfortunately, a downside of creating one arena per cpu core is the increase of virtual memory # which Odoo is based upon in order to limit the memory usage for threaded workers. # On 32bit systems the default size of an arena is 512K while on 64bit systems it's 64M [3], # hence a threaded worker will quickly reach it's default memory soft limit upon concurrent requests. # We therefore set the maximum arenas allowed to 2 unless the MALLOC_ARENA_MAX env variable is set. # Note: Setting MALLOC_ARENA_MAX=0 allow to explicitly set the default glibs's malloc() behaviour. # # [1] https://sourceware.org/glibc/wiki/MallocInternals#Arenas_and_Heaps # [2] https://www.gnu.org/software/libc/manual/html_node/The-GNU-Allocator.html # [3] https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c;h=00ce48c;hb=0a8262a#l862 try: import ctypes libc = ctypes.CDLL("libc.so.6") M_ARENA_MAX = -8 assert libc.mallopt(ctypes.c_int(M_ARENA_MAX), ctypes.c_int(2)) except Exception: _logger.warning("Could not set ARENA_MAX through mallopt()") server = ThreadedServer(odoo.http.root) watcher = None if 'reload' in config['dev_mode'] and not odoo.evented: if inotify: watcher = FSWatcherInotify() watcher.start() elif watchdog: watcher = FSWatcherWatchdog() watcher.start() else: if os.name == 'posix' and platform.system() != 'Darwin': module = 'inotify' else: module = 'watchdog' _logger.warning("'%s' module not installed. Code autoreload feature is disabled", module) rc = server.run(preload, stop) if watcher: watcher.stop() # like the legend of the phoenix, all ends with beginnings if getattr(odoo, 'phoenix', False): _reexec() return rc if rc else 0 def restart(): """ Restart the server """ if os.name == 'nt': # run in a thread to let the current thread return response to the caller. threading.Thread(target=_reexec).start() else: os.kill(server.pid, signal.SIGHUP)