# -*- coding: utf-8 -*- """ The module :mod:`odoo.tests.common` provides unittest test cases and a few helpers and classes to write tests. """ import base64 import concurrent.futures import contextlib import difflib import importlib import inspect import itertools import json import logging import os import pathlib import platform import pprint import re import shutil import signal import subprocess import sys import tempfile import threading import time import unittest import warnings from collections import defaultdict, deque from concurrent.futures import Future, CancelledError, wait try: from concurrent.futures import InvalidStateError except ImportError: InvalidStateError = NotImplementedError from contextlib import contextmanager, ExitStack from datetime import datetime from functools import lru_cache from itertools import zip_longest as izip_longest from unittest.mock import patch, _patch from xmlrpc import client as xmlrpclib import requests import werkzeug.urls from lxml import etree, html from requests import PreparedRequest, Session import odoo from odoo import api from odoo.models import BaseModel from odoo.exceptions import AccessError from odoo.modules.registry import Registry from odoo.service import security from odoo.sql_db import BaseCursor, Cursor from odoo.tools import float_compare, single_email_re, profiler, lower_logging, SQL from odoo.tools.misc import find_in_path, mute_logger from . import case try: # the behaviour of decorator changed in 5.0.5 changing the structure of the traceback when # an error is raised inside a method using a decorator. # this is not a hudge problem for test execution but this makes error message # more difficult to read and breaks test_with_decorators # This also changes the error format making runbot error matching fail # This also breaks the first frame meaning that the module detection will also fail on runbot # In 5.1 decoratorx was introduced and it looks like it has the same behaviour of old decorator from decorator import decoratorx as decorator except ImportError: from decorator import decorator try: import websocket except ImportError: # chrome headless tests will be skipped websocket = None _logger = logging.getLogger(__name__) # backward compatibility: Form was defined in this file def __getattr__(name): # pylint: disable=import-outside-toplevel if name != 'Form': raise AttributeError(name) from .form import Form warnings.warn( "Since 17.0: odoo.tests.common.Form is deprecated, use odoo.tests.Form", category=PendingDeprecationWarning, stacklevel=2, ) return Form # The odoo library is supposed already configured. ADDONS_PATH = odoo.tools.config['addons_path'] HOST = '127.0.0.1' # Useless constant, tests are aware of the content of demo data ADMIN_USER_ID = odoo.SUPERUSER_ID CHECK_BROWSER_SLEEP = 0.1 # seconds CHECK_BROWSER_ITERATIONS = 100 BROWSER_WAIT = CHECK_BROWSER_SLEEP * CHECK_BROWSER_ITERATIONS # seconds def get_db_name(): db = odoo.tools.config['db_name'] # If the database name is not provided on the command-line, # use the one on the thread (which means if it is provided on # the command-line, this will break when installing another # database from XML-RPC). if not db and hasattr(threading.current_thread(), 'dbname'): return threading.current_thread().dbname return db standalone_tests = defaultdict(list) def standalone(*tags): """ Decorator for standalone test functions. This is somewhat dedicated to tests that install, upgrade or uninstall some modules, which is currently forbidden in regular test cases. The function is registered under the given ``tags`` and the corresponding Odoo module name. """ def register(func): # register func by odoo module name if func.__module__.startswith('odoo.addons.'): module = func.__module__.split('.')[2] standalone_tests[module].append(func) # register func with aribitrary name, if any for tag in tags: standalone_tests[tag].append(func) standalone_tests['all'].append(func) return func return register # For backwards-compatibility - get_db_name() should be used instead DB = get_db_name() def new_test_user(env, login='', groups='base.group_user', context=None, **kwargs): """ Helper function to create a new test user. It allows to quickly create users given its login and groups (being a comma separated list of xml ids). Kwargs are directly propagated to the create to further customize the created user. User creation uses a potentially customized environment using the context parameter allowing to specify a custom context. It can be used to force a specific behavior and/or simplify record creation. An example is to use mail-related context keys in mail tests to speedup record creation. Some specific fields are automatically filled to avoid issues * groups_id: it is filled using groups function parameter; * name: "login (groups)" by default as it is required; * email: it is either the login (if it is a valid email) or a generated string 'x.x@example.com' (x being the first login letter). This is due to email being required for most odoo operations; """ if not login: raise ValueError('New users require at least a login') if not groups: raise ValueError('New users require at least user groups') if context is None: context = {} groups_id = [(6, 0, [env.ref(g.strip()).id for g in groups.split(',')])] create_values = dict(kwargs, login=login, groups_id=groups_id) # automatically generate a name as "Login (groups)" to ease user comprehension if not create_values.get('name'): create_values['name'] = '%s (%s)' % (login, groups) # automatically give a password equal to login if not create_values.get('password'): create_values['password'] = login + 'x' * (8 - len(login)) # generate email if not given as most test require an email if 'email' not in create_values: if single_email_re.match(login): create_values['email'] = login else: create_values['email'] = '%s.%s@example.com' % (login[0], login[0]) # ensure company_id + allowed company constraint works if not given at create if 'company_id' in create_values and 'company_ids' not in create_values: create_values['company_ids'] = [(4, create_values['company_id'])] return env['res.users'].with_context(**context).create(create_values) def loaded_demo_data(env): return bool(env.ref('base.user_demo', raise_if_not_found=False)) class RecordCapturer: def __init__(self, model, domain): self._model = model self._domain = domain def __enter__(self): self._before = self._model.search(self._domain, order='id') self._after = None return self def __exit__(self, exc_type, exc_value, exc_traceback): if exc_type is None: self._after = self._model.search(self._domain, order='id') - self._before @property def records(self): if self._after is None: return self._model.search(self._domain, order='id') - self._before return self._after class MetaCase(type): """ Metaclass of test case classes to assign default 'test_tags': 'standard', 'at_install' and the name of the module. """ def __init__(cls, name, bases, attrs): super(MetaCase, cls).__init__(name, bases, attrs) # assign default test tags if cls.__module__.startswith('odoo.addons.'): if getattr(cls, 'test_tags', None) is None: cls.test_tags = {'standard', 'at_install'} cls.test_module = cls.__module__.split('.')[2] cls.test_class = cls.__name__ cls.test_sequence = 0 def _normalize_arch_for_assert(arch_string, parser_method="xml"): """Takes some xml and normalize it to make it comparable to other xml in particular, blank text is removed, and the output is pretty-printed :param str arch_string: the string representing an XML arch :param str parser_method: an string representing which lxml.Parser class to use when normalizing both archs. Takes either "xml" or "html" :return: the normalized arch :rtype str: """ Parser = None if parser_method == 'xml': Parser = etree.XMLParser elif parser_method == 'html': Parser = etree.HTMLParser parser = Parser(remove_blank_text=True) arch_string = etree.fromstring(arch_string, parser=parser) return etree.tostring(arch_string, pretty_print=True, encoding='unicode') class BlockedRequest(requests.exceptions.ConnectionError): pass _super_send = requests.Session.send class BaseCase(case.TestCase, metaclass=MetaCase): """ Subclass of TestCase for Odoo-specific code. This class is abstract and expects self.registry, self.cr and self.uid to be initialized by subclasses. """ longMessage = True # more verbose error message by default: https://www.odoo.com/r/Vmh warm = True # False during warm-up phase (see :func:`warmup`) _python_version = sys.version_info def __init__(self, methodName='runTest'): super().__init__(methodName) self.addTypeEqualityFunc(etree._Element, self.assertTreesEqual) self.addTypeEqualityFunc(html.HtmlElement, self.assertTreesEqual) @classmethod def _request_handler(cls, s: Session, r: PreparedRequest, /, **kw): # allow localhost requests # TODO: also check port? url = werkzeug.urls.url_parse(r.url) if url.host in (HOST, 'localhost'): return _super_send(s, r, **kw) if url.scheme == 'file': return _super_send(s, r, **kw) _logger.getChild('requests').info( "Blocking un-mocked external HTTP request %s %s", r.method, r.url) raise BlockedRequest(f"External requests verboten (was {r.method} {r.url})") def run(self, result): testMethod = getattr(self, self._testMethodName) if getattr(testMethod, '_retry', True) and getattr(self, '_retry', True): tests_run_count = int(os.environ.get('ODOO_TEST_FAILURE_RETRIES', 0)) + 1 else: tests_run_count = 1 _logger.info('Auto retry disabled for %s', self) failure = False for retry in range(tests_run_count): if retry: _logger.runbot(f'Retrying a failed test: {self}') if retry < tests_run_count-1: with warnings.catch_warnings(), \ result.soft_fail(), \ lower_logging(25, logging.INFO) as quiet_log: super().run(result) failure = result.had_failure or quiet_log.had_error_log else: # last try super().run(result) if not failure: break @classmethod def setUpClass(cls): def check_remaining_patchers(): for patcher in _patch._active_patches: _logger.warning("A patcher (targeting %s.%s) was remaining active at the end of %s, disabling it...", patcher.target, patcher.attribute, cls.__name__) patcher.stop() cls.addClassCleanup(check_remaining_patchers) super().setUpClass() if 'standard' in cls.test_tags: # if the method is passed directly `patch` discards the session # object which we need # pylint: disable=unnecessary-lambda patcher = patch.object( requests.sessions.Session, 'send', lambda s, r, **kwargs: cls._request_handler(s, r, **kwargs), ) patcher.start() cls.addClassCleanup(patcher.stop) def cursor(self): return self.registry.cursor() @property def uid(self): """ Get the current uid. """ return self.env.uid @uid.setter def uid(self, user): """ Set the uid by changing the test's environment. """ self.env = self.env(user=user) def ref(self, xid): """ Returns database ID for the provided :term:`external identifier`, shortcut for ``_xmlid_lookup`` :param xid: fully-qualified :term:`external identifier`, in the form :samp:`{module}.{identifier}` :raise: ValueError if not found :returns: registered id """ return self.browse_ref(xid).id def browse_ref(self, xid): """ Returns a record object for the provided :term:`external identifier` :param xid: fully-qualified :term:`external identifier`, in the form :samp:`{module}.{identifier}` :raise: ValueError if not found :returns: :class:`~odoo.models.BaseModel` """ assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'" return self.env.ref(xid) def patch(self, obj, key, val): """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """ patcher = patch.object(obj, key, val) # this is unittest.mock.patch patcher.start() self.addCleanup(patcher.stop) @classmethod def classPatch(cls, obj, key, val): """ Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """ patcher = patch.object(obj, key, val) # this is unittest.mock.patch patcher.start() cls.addClassCleanup(patcher.stop) def startPatcher(self, patcher): mock = patcher.start() self.addCleanup(patcher.stop) return mock @classmethod def startClassPatcher(cls, patcher): mock = patcher.start() cls.addClassCleanup(patcher.stop) return mock @contextmanager def with_user(self, login): """ Change user for a given test, like with self.with_user() ... """ old_uid = self.uid try: user = self.env['res.users'].sudo().search([('login', '=', login)]) assert user, "Login %s not found" % login # switch user self.uid = user.id self.env = self.env(user=self.uid) yield finally: # back self.uid = old_uid self.env = self.env(user=self.uid) @contextmanager def debug_mode(self): """ Enable the effects of group 'base.group_no_one'; mainly useful with :class:`Form`. """ origin_user_has_groups = BaseModel.user_has_groups def user_has_groups(self, groups): group_set = set(groups.split(',')) if '!base.group_no_one' in group_set: return False elif 'base.group_no_one' in group_set: group_set.remove('base.group_no_one') return not group_set or origin_user_has_groups(self, ','.join(group_set)) return origin_user_has_groups(self, groups) with patch('odoo.models.BaseModel.user_has_groups', user_has_groups): yield @contextmanager def _assertRaises(self, exception, *, msg=None): """ Context manager that clears the environment upon failure. """ with ExitStack() as init: if hasattr(self, 'env'): init.enter_context(self.env.cr.savepoint()) if issubclass(exception, AccessError): # The savepoint() above calls flush(), which leaves the # record cache with lots of data. This can prevent # access errors to be detected. In order to avoid this # issue, we clear the cache before proceeding. self.env.cr.clear() with ExitStack() as inner: cm = inner.enter_context(super().assertRaises(exception, msg=msg)) # *moves* the cleanups from init to inner, this ensures the # savepoint gets rolled back when `yield` raises `exception`, # but still allows the initialisation to be protected *and* not # interfered with by `assertRaises`. inner.push(init.pop_all()) yield cm def assertRaises(self, exception, func=None, *args, **kwargs): if func: with self._assertRaises(exception): func(*args, **kwargs) else: return self._assertRaises(exception, **kwargs) if sys.version_info < (3, 10): # simplified backport of assertNoLogs() @contextmanager def assertNoLogs(self, logger: str, level: str): # assertLogs ensures there is at least one log record when # exiting the context manager. We insert one dummy record just # so we pass that silly test while still capturing the logs. with self.assertLogs(logger, level) as capture: logging.getLogger(logger).log(getattr(logging, level), "Dummy log record") yield if len(capture.output) > 1: raise self.failureException(f"Unexpected logs found: {capture.output[1:]}") @contextmanager def assertQueries(self, expected, flush=True): """ Check the queries made by the current cursor. ``expected`` is a list of strings representing the expected queries being made. Query strings are matched against each other, ignoring case and whitespaces. """ Cursor_execute = Cursor.execute actual_queries = [] def execute(self, query, params=None, log_exceptions=None): actual_queries.append(query.code if isinstance(query, SQL) else query) return Cursor_execute(self, query, params, log_exceptions) def get_unaccent_wrapper(cr): return lambda x: x if flush: self.env.flush_all() self.env.cr.flush() with patch('odoo.sql_db.Cursor.execute', execute): with patch('odoo.osv.expression.get_unaccent_wrapper', get_unaccent_wrapper): yield actual_queries if flush: self.env.flush_all() self.env.cr.flush() if not self.warm: return self.assertEqual( len(actual_queries), len(expected), "\n---- actual queries:\n%s\n---- expected queries:\n%s" % ( "\n".join(actual_queries), "\n".join(expected), ) ) for actual_query, expect_query in zip(actual_queries, expected): self.assertEqual( "".join(actual_query.lower().split()), "".join(expect_query.lower().split()), "\n---- actual query:\n%s\n---- not like:\n%s" % (actual_query, expect_query), ) @contextmanager def assertQueryCount(self, default=0, flush=True, **counters): """ Context manager that counts queries. It may be invoked either with one value, or with a set of named arguments like ``login=value``:: with self.assertQueryCount(42): ... with self.assertQueryCount(admin=3, demo=5): ... The second form is convenient when used with :func:`users`. """ if self.warm: # mock random in order to avoid random bus gc with patch('random.random', lambda: 1): login = self.env.user.login expected = counters.get(login, default) if flush: self.env.flush_all() self.env.cr.flush() count0 = self.cr.sql_log_count yield if flush: self.env.flush_all() self.env.cr.flush() count = self.cr.sql_log_count - count0 if count != expected: # add some info on caller to allow semi-automatic update of query count frame, filename, linenum, funcname, lines, index = inspect.stack()[2] filename = filename.replace('\\', '/') if "/odoo/addons/" in filename: filename = filename.rsplit("/odoo/addons/", 1)[1] if count > expected: msg = "Query count more than expected for user %s: %d > %d in %s at %s:%s" # add a subtest in order to continue the test_method in case of failures with self.subTest(): self.fail(msg % (login, count, expected, funcname, filename, linenum)) else: logger = logging.getLogger(type(self).__module__) msg = "Query count less than expected for user %s: %d < %d in %s at %s:%s" logger.info(msg, login, count, expected, funcname, filename, linenum) else: # flush before and after during warmup, in order to reproduce the # same operations, otherwise the caches might not be ready! if flush: self.env.flush_all() self.env.cr.flush() yield if flush: self.env.flush_all() self.env.cr.flush() def assertRecordValues(self, records, expected_values): ''' Compare a recordset with a list of dictionaries representing the expected results. This method performs a comparison element by element based on their index. Then, the order of the expected values is extremely important. Note that: - Comparison between falsy values is supported: False match with None. - Comparison between monetary field is also treated according the currency's rounding. - Comparison between x2many field is done by ids. Then, empty expected ids must be []. - Comparison between many2one field id done by id. Empty comparison can be done using any falsy value. :param records: The records to compare. :param expected_values: List of dicts expected to be exactly matched in records ''' def _compare_candidate(record, candidate, field_names): ''' Compare all the values in `candidate` with a record. :param record: record being compared :param candidate: dict of values to compare :return: A dictionary will encountered difference in values. ''' diff = {} for field_name in field_names: record_value = record[field_name] field = record._fields[field_name] field_type = field.type if field_type == 'monetary': # Compare monetary field. currency_field_name = record._fields[field_name].get_currency_field(record) record_currency = record[currency_field_name] if field_name not in candidate: diff[field_name] = (record_value, None) elif record_currency: if record_currency.compare_amounts(candidate[field_name], record_value): diff[field_name] = (record_value, record_currency.round(candidate[field_name])) elif candidate[field_name] != record_value: diff[field_name] = (record_value, candidate[field_name]) elif field_type == 'float' and field.get_digits(record.env): prec = field.get_digits(record.env)[1] if float_compare(candidate[field_name], record_value, precision_digits=prec) != 0: diff[field_name] = (record_value, candidate[field_name]) elif field_type in ('one2many', 'many2many'): # Compare x2many relational fields. # Empty comparison must be an empty list to be True. if field_name not in candidate: diff[field_name] = (sorted(record_value.ids), None) elif set(record_value.ids) != set(candidate[field_name]): diff[field_name] = (sorted(record_value.ids), sorted(candidate[field_name])) elif field_type == 'many2one': # Compare many2one relational fields. # Every falsy value is allowed to compare with an empty record. if field_name not in candidate: diff[field_name] = (record_value.id, None) elif (record_value or candidate[field_name]) and record_value.id != candidate[field_name]: diff[field_name] = (record_value.id, candidate[field_name]) else: # Compare others fields if not both interpreted as falsy values. if field_name not in candidate: diff[field_name] = (record_value, None) elif (candidate[field_name] or record_value) and record_value != candidate[field_name]: diff[field_name] = (record_value, candidate[field_name]) return diff # Compare records with candidates. different_values = [] field_names = list(expected_values[0].keys()) for index, record in enumerate(records): is_additional_record = index >= len(expected_values) candidate = {} if is_additional_record else expected_values[index] diff = _compare_candidate(record, candidate, field_names) if diff: different_values.append((index, 'additional_record' if is_additional_record else 'regular_diff', diff)) for index in range(len(records), len(expected_values)): diff = {} for field_name in field_names: diff[field_name] = (None, expected_values[index][field_name]) different_values.append((index, 'missing_record', diff)) # Build error message. if not different_values: return errors = ['The records and expected_values do not match.'] if len(records) != len(expected_values): errors.append('Wrong number of records to compare: %d records versus %d expected values.' % (len(records), len(expected_values))) for index, diff_type, diff in different_values: if diff_type == 'regular_diff': errors.append('\n==== Differences at index %s ====' % index) record_diff = ['%s:%s' % (k, v[0]) for k, v in diff.items()] candidate_diff = ['%s:%s' % (k, v[1]) for k, v in diff.items()] errors.append('\n'.join(difflib.unified_diff(record_diff, candidate_diff))) elif diff_type == 'additional_record': errors += [ '\n==== Additional record ====', pprint.pformat(dict((k, v[0]) for k, v in diff.items())), ] elif diff_type == 'missing_record': errors += [ '\n==== Missing record ====', pprint.pformat(dict((k, v[1]) for k, v in diff.items())), ] self.fail('\n'.join(errors)) # turns out this thing may not be quite as useful as we thought... def assertItemsEqual(self, a, b, msg=None): self.assertCountEqual(a, b, msg=None) def assertTreesEqual(self, n1, n2, msg=None): self.assertIsNotNone(n1, msg) self.assertIsNotNone(n2, msg) self.assertEqual(n1.tag, n2.tag, msg) # Because lxml.attrib is an ordereddict for which order is important # to equality, even though *we* don't care self.assertEqual(dict(n1.attrib), dict(n2.attrib), msg) self.assertEqual((n1.text or u'').strip(), (n2.text or u'').strip(), msg) self.assertEqual((n1.tail or u'').strip(), (n2.tail or u'').strip(), msg) for c1, c2 in izip_longest(n1, n2): self.assertTreesEqual(c1, c2, msg) def _assertXMLEqual(self, original, expected, parser="xml"): """Asserts that two xmls archs are equal :param original: the xml arch to test :type original: str :param expected: the xml arch of reference :type expected: str :param parser: an string representing which lxml.Parser class to use when normalizing both archs. Takes either "xml" or "html" :type parser: str """ self.maxDiff = 10000 if original: original = _normalize_arch_for_assert(original, parser) if expected: expected = _normalize_arch_for_assert(expected, parser) self.assertEqual(original, expected) def assertXMLEqual(self, original, expected): return self._assertXMLEqual(original, expected) def assertHTMLEqual(self, original, expected): return self._assertXMLEqual(original, expected, 'html') def profile(self, description='', **kwargs): test_method = getattr(self, '_testMethodName', 'Unknown test method') if not hasattr(self, 'profile_session'): self.profile_session = profiler.make_session(test_method) return profiler.Profiler( description='%s uid:%s %s %s' % (test_method, self.env.user.id, 'warm' if self.warm else 'cold', description), db=self.env.cr.dbname, profile_session=self.profile_session, **kwargs) savepoint_seq = itertools.count() class TransactionCase(BaseCase): """ Test class in which all test methods are run in a single transaction, but each test method is run in a sub-transaction managed by a savepoint. The transaction's cursor is always closed without committing. The data setup common to all methods should be done in the class method `setUpClass`, so that it is done once for all test methods. This is useful for test cases containing fast tests but with significant database setup common to all cases (complex in-db test data). After being run, each test method cleans up the record cache and the registry cache. However, there is no cleanup of the registry models and fields. If a test modifies the registry (custom models and/or fields), it should prepare the necessary cleanup (`self.registry.reset_changes()`). """ registry: Registry = None env: api.Environment = None cr: Cursor = None muted_registry_logger = mute_logger(odoo.modules.registry._logger.name) @classmethod def _gc_filestore(cls): # attachment can be created or unlink during the tests. # they can addup during test and take some disc space. # since cron are not running during tests, we need to gc manually # We need to check the status of the file system outside of the test cursor with odoo.registry(get_db_name()).cursor() as cr: gc_env = api.Environment(cr, odoo.SUPERUSER_ID, {}) gc_env['ir.attachment']._gc_file_store_unsafe() @classmethod def setUpClass(cls): super().setUpClass() cls.addClassCleanup(cls._gc_filestore) cls.registry = odoo.registry(get_db_name()) cls.registry_start_sequence = cls.registry.registry_sequence def reset_changes(): if (cls.registry_start_sequence != cls.registry.registry_sequence) or cls.registry.registry_invalidated: with cls.registry.cursor() as cr: cls.registry.setup_models(cr) cls.registry.registry_invalidated = False cls.registry.registry_sequence = cls.registry_start_sequence with cls.muted_registry_logger: cls.registry.clear_all_caches() cls.registry.cache_invalidated.clear() cls.addClassCleanup(reset_changes) cls.cr = cls.registry.cursor() cls.addClassCleanup(cls.cr.close) cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {}) def setUp(self): super().setUp() # restore environments after the test to avoid invoking flush() with an # invalid environment (inexistent user id) from another test envs = self.env.all.envs for env in list(envs): self.addCleanup(env.clear) # restore the set of known environments as it was at setUp self.addCleanup(envs.update, list(envs)) self.addCleanup(envs.clear) self.addCleanup(self.muted_registry_logger(self.registry.clear_all_caches)) # This prevents precommit functions and data from piling up # until cr.flush is called in 'assertRaises' clauses # (these are not cleared in self.env.clear or envs.clear) cr = self.env.cr def _reset(cb, funcs, data): cb._funcs = funcs cb.data = data for callback in [cr.precommit, cr.postcommit, cr.prerollback, cr.postrollback]: self.addCleanup(_reset, callback, deque(callback._funcs), dict(callback.data)) # flush everything in setUpClass before introducing a savepoint self.env.flush_all() self._savepoint_id = next(savepoint_seq) self.cr.execute('SAVEPOINT test_%d' % self._savepoint_id) self.addCleanup(self.cr.execute, 'ROLLBACK TO SAVEPOINT test_%d' % self._savepoint_id) class SingleTransactionCase(BaseCase): """ TestCase in which all test methods are run in the same transaction, the transaction is started with the first test method and rolled back at the end of the last. """ @classmethod def __init_subclass__(cls): super().__init_subclass__() if issubclass(cls, TransactionCase): _logger.warning("%s inherits from both TransactionCase and SingleTransactionCase") @classmethod def setUpClass(cls): super().setUpClass() cls.registry = odoo.registry(get_db_name()) cls.addClassCleanup(cls.registry.reset_changes) cls.addClassCleanup(cls.registry.clear_all_caches) cls.cr = cls.registry.cursor() cls.addClassCleanup(cls.cr.close) cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {}) def setUp(self): super(SingleTransactionCase, self).setUp() self.env.flush_all() class ChromeBrowserException(Exception): pass def fmap(future, map_fun): """Maps a future's result through a callback. Resolves to the application of ``map_fun`` to the result of ``future``. .. warning:: this does *not* recursively resolve futures, if that's what you need see :func:`fchain` """ fmap_future = Future() @future.add_done_callback def _(f): try: fmap_future.set_result(map_fun(f.result())) except Exception as e: fmap_future.set_exception(e) return fmap_future def fchain(future, next_callback): """Chains a future's result to a new future through a callback. Corresponds to the ``bind`` monadic operation (aka flatmap aka then... kinda). """ new_future = Future() @future.add_done_callback def _(f): try: n = next_callback(f.result()) @n.add_done_callback def _(f): try: new_future.set_result(f.result()) except Exception as e: new_future.set_exception(e) except Exception as e: new_future.set_exception(e) return new_future def save_test_file(test_name, content, prefix, extension='png', logger=_logger, document_type='Screenshot', date_format="%Y%m%d_%H%M%S_%f"): assert re.fullmatch(r'\w*_', prefix) assert re.fullmatch(r'[a-z]+', extension) assert re.fullmatch(r'\w+', test_name) now = datetime.now().strftime(date_format) screenshots_dir = pathlib.Path(odoo.tools.config['screenshots']) / get_db_name() / 'screenshots' screenshots_dir.mkdir(parents=True, exist_ok=True) fname = f'{prefix}{now}_{test_name}.{extension}' full_path = screenshots_dir / fname with full_path.open('wb') as f: f.write(content) logger.runbot(f'{document_type} in: {full_path}') class ChromeBrowser: """ Helper object to control a Chrome headless process. """ remote_debugging_port = 0 # 9222, change it in a non-git-tracked file def __init__(self, test_class, headless=True): self._logger = test_class._logger self.test_class = test_class if websocket is None: self._logger.warning("websocket-client module is not installed") raise unittest.SkipTest("websocket-client module is not installed") self.user_data_dir = tempfile.mkdtemp(suffix='_chrome_odoo') otc = odoo.tools.config self.screencasts_dir = None self.screencast_frames = [] if otc['screencasts']: self.screencasts_dir = os.path.join(otc['screencasts'], get_db_name(), 'screencasts') os.makedirs(self.screencasts_frames_dir, exist_ok=True) if os.name == 'posix': self.sigxcpu_handler = signal.getsignal(signal.SIGXCPU) signal.signal(signal.SIGXCPU, self.signal_handler) else: self.sigxcpu_handler = None self.chrome, self.devtools_port = self._chrome_start( user_data_dir=self.user_data_dir, window_size=test_class.browser_size, touch_enabled=test_class.touch_enabled, headless=headless, ) self.ws = self._open_websocket() self._request_id = itertools.count() self._result = Future() self.error_checker = None self.had_failure = False # maps request_id to Futures self._responses = {} # maps frame ids to callbacks self._frames = {} self._handlers = { 'Runtime.consoleAPICalled': self._handle_console, 'Runtime.exceptionThrown': self._handle_exception, 'Page.frameStoppedLoading': self._handle_frame_stopped_loading, 'Page.screencastFrame': self._handle_screencast_frame, } self._receiver = threading.Thread( target=self._receive, name="WebSocket events consumer", args=(get_db_name(),) ) self._receiver.start() self._logger.info('Enable chrome headless console log notification') self._websocket_send('Runtime.enable') self._logger.info('Chrome headless enable page notifications') self._websocket_send('Page.enable') @property def screencasts_frames_dir(self): return os.path.join(self.screencasts_dir, 'frames') def signal_handler(self, sig, frame): if sig == signal.SIGXCPU: _logger.info('CPU time limit reached, stopping Chrome and shutting down') self.stop() os._exit(0) def stop(self): if hasattr(self, 'ws'): self._websocket_send('Page.stopScreencast') if self.screencasts_dir: screencasts_frames_dir = self.screencasts_frames_dir self.screencasts_dir = None if os.path.isdir(screencasts_frames_dir): shutil.rmtree(screencasts_frames_dir, ignore_errors=True) self._websocket_request('Page.stopLoading') self._websocket_request('Runtime.evaluate', params={'expression': """ ('serviceWorker' in navigator) && navigator.serviceWorker.getRegistrations().then( registrations => Promise.all(registrations.map(r => r.unregister())) ) """, 'awaitPromise': True}) # wait for the screenshot or whatever wait(self._responses.values(), 10) self._result.cancel() self._logger.info("Closing chrome headless with pid %s", self.chrome.pid) self._websocket_send('Browser.close') self._logger.info("Closing websocket connection") self.ws.close() if self.chrome: self._logger.info("Terminating chrome headless with pid %s", self.chrome.pid) self.chrome.terminate() if self.user_data_dir and os.path.isdir(self.user_data_dir) and self.user_data_dir != '/': self._logger.info('Removing chrome user profile "%s"', self.user_data_dir) shutil.rmtree(self.user_data_dir, ignore_errors=True) # Restore previous signal handler if self.sigxcpu_handler and os.name == 'posix': signal.signal(signal.SIGXCPU, self.sigxcpu_handler) @property def executable(self): return _find_executable() def _chrome_without_limit(self, cmd): if os.name == 'posix' and platform.system() != 'Darwin': # since the introduction of pointer compression in Chrome 80 (v8 v8.0), # the memory reservation algorithm requires more than 8GiB of # virtual mem for alignment this exceeds our default memory limits. def preexec(): import resource resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)) else: preexec = None # pylint: disable=subprocess-popen-preexec-fn return subprocess.Popen(cmd, stderr=subprocess.DEVNULL, preexec_fn=preexec) def _spawn_chrome(self, cmd): proc = self._chrome_without_limit(cmd) port_file = pathlib.Path(self.user_data_dir, 'DevToolsActivePort') for _ in range(CHECK_BROWSER_ITERATIONS): time.sleep(CHECK_BROWSER_SLEEP) if port_file.is_file() and port_file.stat().st_size > 5: with port_file.open('r', encoding='utf-8') as f: return proc, int(f.readline()) raise unittest.SkipTest(f'Failed to detect chrome devtools port after {BROWSER_WAIT :.1f}s.') def _chrome_start( self, user_data_dir: str, window_size: str, touch_enabled: bool, headless=True ): headless_switches = { '--headless': '', '--disable-extensions': '', '--disable-background-networking' : '', '--disable-background-timer-throttling' : '', '--disable-backgrounding-occluded-windows': '', '--disable-renderer-backgrounding' : '', '--disable-breakpad': '', '--disable-client-side-phishing-detection': '', '--disable-crash-reporter': '', '--disable-dev-shm-usage': '', '--disable-namespace-sandbox': '', '--disable-translate': '', '--no-sandbox': '', '--disable-gpu': '', } switches = { # required for tours that use Youtube autoplay conditions (namely website_slides' "course_tour") '--autoplay-policy': 'no-user-gesture-required', '--disable-default-apps': '', '--disable-device-discovery-notifications': '', '--no-default-browser-check': '', '--remote-debugging-address': HOST, '--remote-debugging-port': str(self.remote_debugging_port), '--user-data-dir': user_data_dir, '--window-size': window_size, '--no-first-run': '', # '--enable-precise-memory-info': '', # uncomment to debug memory leaks in qunit suite # '--js-flags': '--expose-gc', # uncomment to debug memory leaks in qunit suite } if headless: switches.update(headless_switches) if touch_enabled: # enable Chrome's Touch mode, useful to detect touch capabilities using # "'ontouchstart' in window" switches['--touch-events'] = '' cmd = [self.executable] cmd += ['%s=%s' % (k, v) if v else k for k, v in switches.items()] url = 'about:blank' cmd.append(url) try: proc, devtools_port = self._spawn_chrome(cmd) except OSError: raise unittest.SkipTest("%s not found" % cmd[0]) self._logger.info('Chrome pid: %s', proc.pid) self._logger.info('Chrome headless temporary user profile dir: %s', self.user_data_dir) return proc, devtools_port def _json_command(self, command, timeout=3): """Queries browser state using JSON Available commands: ``''`` return list of tabs with their id ``list`` (or ``json/``) list tabs ``new`` open a new tab :samp:`activate/{id}` activate a tab :samp:`close/{id}` close a tab ``version`` get chrome and dev tools version ``protocol`` get the full protocol """ command = '/'.join(['json', command]).strip('/') url = werkzeug.urls.url_join('http://%s:%s/' % (HOST, self.devtools_port), command) self._logger.info("Issuing json command %s", url) delay = 0.1 tries = 0 failure_info = None message = None while timeout > 0: try: self.chrome.send_signal(0) except ProcessLookupError: message = 'Chrome crashed at startup' break try: r = requests.get(url, timeout=3) if r.ok: return r.json() except requests.ConnectionError as e: failure_info = str(e) message = 'Connection Error while trying to connect to Chrome debugger' except requests.exceptions.ReadTimeout as e: failure_info = str(e) message = 'Connection Timeout while trying to connect to Chrome debugger' break time.sleep(delay) timeout -= delay delay = delay * 1.5 tries += 1 self._logger.error("%s after %s tries" % (message, tries)) if failure_info: self._logger.info(failure_info) self.stop() raise unittest.SkipTest("Error during Chrome headless connection") def _open_websocket(self): version = self._json_command('version') self._logger.info('Browser version: %s', version['Browser']) start = time.time() while (time.time() - start) < 5.0: ws_url = next(( target['webSocketDebuggerUrl'] for target in self._json_command('') if target['type'] == 'page' if target['url'] == 'about:blank' ), None) if ws_url: break time.sleep(0.1) else: self.stop() raise unittest.SkipTest("Error during Chrome connection: never found 'page' target") self._logger.info('Websocket url found: %s', ws_url) ws = websocket.create_connection(ws_url, enable_multithread=True, suppress_origin=True) if ws.getstatus() != 101: raise unittest.SkipTest("Cannot connect to chrome dev tools") ws.settimeout(0.01) return ws def _receive(self, dbname): threading.current_thread().dbname = dbname # So CDT uses a streamed JSON-RPC structure, meaning a request is # {id, method, params} and eventually a {id, result | error} should # arrive the other way, however for events it uses "notifications" # meaning request objects without an ``id``, but *coming from the server while True: # or maybe until `self._result` is `done()`? try: msg = self.ws.recv() if not msg: continue self._logger.debug('\n<- %s', msg) except websocket.WebSocketTimeoutException: continue except Exception as e: # if the socket is still connected something bad happened, # otherwise the client was just shut down if self.ws.connected: self._result.set_exception(e) raise self._result.cancel() return res = json.loads(msg) request_id = res.get('id') try: if request_id is None: handler = self._handlers.get(res['method']) if handler: handler(**res['params']) else: f = self._responses.pop(request_id, None) if f: if 'result' in res: f.set_result(res['result']) else: f.set_exception(ChromeBrowserException(res['error']['message'])) except Exception: msg = str(msg) if msg and len(msg) > 500: msg = msg[:500] + '...' _logger.exception("While processing message %s", msg) def _websocket_request(self, method, *, params=None, timeout=10.0): assert threading.get_ident() != self._receiver.ident,\ "_websocket_request must not be called from the consumer thread" if self.ws is None: return f = self._websocket_send(method, params=params, with_future=True) try: return f.result(timeout=timeout) except concurrent.futures.TimeoutError: raise TimeoutError(f'{method}({params or ""})') def _websocket_send(self, method, *, params=None, with_future=False): """send chrome devtools protocol commands through websocket If ``with_future`` is set, returns a ``Future`` for the operation. """ if self.ws is None: return result = None request_id = next(self._request_id) if with_future: result = self._responses[request_id] = Future() payload = {'method': method, 'id': request_id} if params: payload['params'] = params self._logger.debug('\n-> %s', payload) self.ws.send(json.dumps(payload)) return result def _handle_console(self, type, args=None, stackTrace=None, **kw): # pylint: disable=redefined-builtin # console formatting differs somewhat from Python's, if args[0] has # format modifiers that many of args[1:] get formatted in, missing # args are replaced by empty strings and extra args are concatenated # (space-separated) # # current version modifies the args in place which could and should # probably be improved if args: arg0, args = str(self._from_remoteobject(args[0])), args[1:] else: arg0, args = '', [] formatted = [re.sub(r'%[%sdfoOc]', self.console_formatter(args), arg0)] # formatter consumes args it uses, leaves unformatted args untouched formatted.extend(str(self._from_remoteobject(arg)) for arg in args) message = ' '.join(formatted) stack = ''.join(self._format_stack({'type': type, 'stackTrace': stackTrace})) if stack: message += '\n' + stack log_type = type _logger = self._logger.getChild('browser') _logger.log( self._TO_LEVEL.get(log_type, logging.INFO), "%s%s", "Error received after termination: " if self._result.done() else "", message # might still have % characters ) if log_type == 'error': self.had_failure = True if self._result.done(): return if not self.error_checker or self.error_checker(message): self.take_screenshot() self._save_screencast() try: self._result.set_exception(ChromeBrowserException(message)) except CancelledError: ... except InvalidStateError: self._logger.warning( "Trying to set result to failed (%s) but found the future settled (%s)", message, self._result ) elif 'test successful' in message: if self.test_class.allow_end_on_form: self._result.set_result(True) return qs = fchain( self._websocket_send('DOM.getDocument', params={'depth': 0}, with_future=True), lambda d: self._websocket_send("DOM.querySelector", params={ 'nodeId': d['root']['nodeId'], 'selector': '.o_form_dirty', }, with_future=True) ) @qs.add_done_callback def _qs_result(fut): node_id = 0 with contextlib.suppress(Exception): node_id = fut.result()['nodeId'] if node_id: self.take_screenshot("unsaved_form_") msg = """\ Tour finished with an open form view in edition mode. Form views in edition mode are automatically saved when the page is closed, \ which leads to stray network requests and inconsistencies.""" if self._result.done(): _logger.error("%s", msg) else: self._result.set_exception(ChromeBrowserException(msg)) return if not self._result.done(): self._result.set_result(True) elif self._result.exception() is None: # if the future was already failed, we're happy, # otherwise swap for a new failed _logger.error("Tried to make the tour successful twice.") def _handle_exception(self, exceptionDetails, timestamp): message = exceptionDetails['text'] exception = exceptionDetails.get('exception') if exception: message += str(self._from_remoteobject(exception)) exceptionDetails['type'] = 'trace' # fake this so _format_stack works stack = ''.join(self._format_stack(exceptionDetails)) if stack: message += '\n' + stack if self._result.done(): self._logger.getChild('browser').error( "Exception received after termination: %s", message) return self.take_screenshot() self._save_screencast() try: self._result.set_exception(ChromeBrowserException(message)) except CancelledError: ... except InvalidStateError: self._logger.warning( "Trying to set result to failed (%s) but found the future settled (%s)", message, self._result ) def _handle_frame_stopped_loading(self, frameId): wait = self._frames.pop(frameId, None) if wait: wait() def _handle_screencast_frame(self, sessionId, data, metadata): if not self.screencasts_frames_dir: return self._websocket_send('Page.screencastFrameAck', params={'sessionId': sessionId}) if not self.screencasts_dir: return outfile = os.path.join(self.screencasts_frames_dir, 'frame_%05d.b64' % len(self.screencast_frames)) try: with open(outfile, 'w') as f: f.write(data) self.screencast_frames.append({ 'file_path': outfile, 'timestamp': metadata.get('timestamp') }) except FileNotFoundError: self._logger.debug('Useless screencast frame skipped: %s', outfile) _TO_LEVEL = { 'debug': logging.DEBUG, 'log': logging.INFO, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, # TODO: what do with # dir, dirxml, table, trace, clear, startGroup, startGroupCollapsed, # endGroup, assert, profile, profileEnd, count, timeEnd } def take_screenshot(self, prefix='sc_'): def handler(f): base_png = f.result(timeout=0)['data'] if not base_png: self._logger.warning("Couldn't capture screenshot: expected image data, got ?? error ??") return decoded = base64.b64decode(base_png, validate=True) save_test_file(self.test_class.__name__, decoded, prefix, logger=self._logger) self._logger.info('Asking for screenshot') f = self._websocket_send('Page.captureScreenshot', with_future=True) f.add_done_callback(handler) return f def _save_screencast(self, prefix='failed'): # could be encododed with something like that # ffmpeg -framerate 3 -i frame_%05d.png output.mp4 if not self.screencast_frames: self._logger.debug('No screencast frames to encode') return None self.stop_screencast() for f in self.screencast_frames: with open(f['file_path'], 'rb') as b64_file: frame = base64.decodebytes(b64_file.read()) os.unlink(f['file_path']) f['file_path'] = f['file_path'].replace('.b64', '.png') with open(f['file_path'], 'wb') as png_file: png_file.write(frame) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') fname = '%s_screencast_%s.mp4' % (prefix, timestamp) outfile = os.path.join(self.screencasts_dir, fname) try: ffmpeg_path = find_in_path('ffmpeg') except IOError: ffmpeg_path = None if ffmpeg_path: nb_frames = len(self.screencast_frames) concat_script_path = os.path.join(self.screencasts_dir, fname.replace('.mp4', '.txt')) with open(concat_script_path, 'w') as concat_file: for i in range(nb_frames): frame_file_path = os.path.join(self.screencasts_frames_dir, self.screencast_frames[i]['file_path']) end_time = time.time() if i == nb_frames - 1 else self.screencast_frames[i+1]['timestamp'] duration = end_time - self.screencast_frames[i]['timestamp'] concat_file.write("file '%s'\nduration %s\n" % (frame_file_path, duration)) concat_file.write("file '%s'" % frame_file_path) # needed by the concat plugin try: subprocess.run([ffmpeg_path, '-f', 'concat', '-safe', '0', '-i', concat_script_path, '-pix_fmt', 'yuv420p', '-g', '0', outfile], check=True) except subprocess.CalledProcessError: self._logger.error('Failed to encode screencast.') return self._logger.log(25, 'Screencast in: %s', outfile) else: outfile = outfile.strip('.mp4') shutil.move(self.screencasts_frames_dir, outfile) self._logger.runbot('Screencast frames in: %s', outfile) def start_screencast(self): assert self.screencasts_dir self._websocket_send('Page.startScreencast') def stop_screencast(self): self._websocket_send('Page.stopScreencast') def set_cookie(self, name, value, path, domain): params = {'name': name, 'value': value, 'path': path, 'domain': domain} self._websocket_request('Network.setCookie', params=params) return def delete_cookie(self, name, **kwargs): params = {k: v for k, v in kwargs.items() if k in ['url', 'domain', 'path']} params['name'] = name self._websocket_request('Network.deleteCookies', params=params) return def _wait_ready(self, ready_code=None, timeout=60): ready_code = ready_code or "document.readyState === 'complete'" self._logger.info('Evaluate ready code "%s"', ready_code) start_time = time.time() result = None while True: taken = time.time() - start_time if taken > timeout: break result = self._websocket_request('Runtime.evaluate', params={ 'expression': "try { %s } catch {}" % ready_code, 'awaitPromise': True, }, timeout=timeout-taken)['result'] if result == {'type': 'boolean', 'value': True}: time_to_ready = time.time() - start_time if taken > 2: self._logger.info('The ready code tooks too much time : %s', time_to_ready) return True self.take_screenshot(prefix='sc_failed_ready_') self._logger.info('Ready code last try result: %s', result) return False def _wait_code_ok(self, code, timeout, error_checker=None): self.error_checker = error_checker self._logger.info('Evaluate test code "%s"', code) start = time.time() res = self._websocket_request('Runtime.evaluate', params={ 'expression': code, 'awaitPromise': True, }, timeout=timeout)['result'] if res.get('subtype') == 'error': raise ChromeBrowserException("Running code returned an error: %s" % res) err = ChromeBrowserException("failed") try: # if the runcode was a promise which took some time to execute, # discount that from the timeout if self._result.result(time.time() - start + timeout) and not self.had_failure: return except CancelledError: # regular-ish shutdown return except Exception as e: err = e self.take_screenshot() self._save_screencast() if isinstance(err, ChromeBrowserException): raise err if isinstance(err, concurrent.futures.TimeoutError): raise ChromeBrowserException('Script timeout exceeded') from err raise ChromeBrowserException("Unknown error") from err def navigate_to(self, url, wait_stop=False): self._logger.info('Navigating to: "%s"', url) nav_result = self._websocket_request('Page.navigate', params={'url': url}, timeout=20.0) self._logger.info("Navigation result: %s", nav_result) if wait_stop: frame_id = nav_result['frameId'] e = threading.Event() self._frames[frame_id] = e.set self._logger.info('Waiting for frame %r to stop loading', frame_id) e.wait(10) def _from_remoteobject(self, arg): """ attempts to make a CDT RemoteObject comprehensible """ objtype = arg['type'] subtype = arg.get('subtype') if objtype == 'undefined': # the undefined remoteobject is literally just {type: undefined}... return 'undefined' elif objtype != 'object' or subtype not in (None, 'array'): # value is the json representation for json object # otherwise fallback on the description which is "a string # representation of the object" e.g. the traceback for errors, the # source for functions, ... finally fallback on the entire arg mess return arg.get('value', arg.get('description', arg)) elif subtype == 'array': # apparently value is *not* the JSON representation for arrays # instead it's just Array(3) which is useless, however the preview # properties are the same as object which is useful (just ignore the # name which is the index) return '[%s]' % ', '.join( repr(p['value']) if p['type'] == 'string' else str(p['value']) for p in arg.get('preview', {}).get('properties', []) if re.match(r'\d+', p['name']) ) # all that's left is type=object, subtype=None aka custom or # non-standard objects, print as TypeName(param=val, ...), sadly because # of the way Odoo widgets are created they all appear as Class(...) # nb: preview properties are *not* recursive, the value is *all* we get return '%s(%s)' % ( arg.get('className') or 'object', ', '.join( '%s=%s' % (p['name'], repr(p['value']) if p['type'] == 'string' else p['value']) for p in arg.get('preview', {}).get('properties', []) if p.get('value') is not None ) ) LINE_PATTERN = '\tat %(functionName)s (%(url)s:%(lineNumber)d:%(columnNumber)d)\n' def _format_stack(self, logrecord): if logrecord['type'] not in ['trace']: return trace = logrecord.get('stackTrace') while trace: for f in trace['callFrames']: yield self.LINE_PATTERN % f trace = trace.get('parent') def console_formatter(self, args): """ Formats similarly to the console API: * if there are no args, don't format (return string as-is) * %% -> % * %c -> replace by styling directives (ignore for us) * other known formatters -> replace by corresponding argument * leftover known formatters (args exhausted) -> replace by empty string * unknown formatters -> return as-is """ if not args: return lambda m: m[0] def replacer(m): fmt = m[0][1] if fmt == '%': return '%' if fmt in 'sdfoOc': if not args: return '' repl = args.pop(0) if fmt == 'c': return '' return str(self._from_remoteobject(repl)) return m[0] return replacer @lru_cache(1) def _find_executable(): system = platform.system() if system == 'Linux': for bin_ in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable']: try: return find_in_path(bin_) except IOError: continue elif system == 'Darwin': bins = [ '/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', '/Applications/Chromium.app/Contents/MacOS/Chromium', ] for bin_ in bins: if os.path.exists(bin_): return bin_ elif system == 'Windows': bins = [ '%ProgramFiles%\\Google\\Chrome\\Application\\chrome.exe', '%ProgramFiles(x86)%\\Google\\Chrome\\Application\\chrome.exe', '%LocalAppData%\\Google\\Chrome\\Application\\chrome.exe', ] for bin_ in bins: bin_ = os.path.expandvars(bin_) if os.path.exists(bin_): return bin_ raise unittest.SkipTest("Chrome executable not found") class Opener(requests.Session): """ Flushes and clears the current transaction when starting a request. This is likely necessary when we make a request to the server, as the request is made with a test cursor, which uses a different cache than this transaction. """ def __init__(self, cr: BaseCursor): super().__init__() self.cr = cr def request(self, *args, **kwargs): self.cr.flush() self.cr.clear() return super().request(*args, **kwargs) class Transport(xmlrpclib.Transport): """ see :class:`Opener` """ def __init__(self, cr: BaseCursor): self.cr = cr super().__init__() def request(self, *args, **kwargs): self.cr.flush() self.cr.clear() return super().request(*args, **kwargs) class JsonRpcException(Exception): def __init__(self, code, message): super().__init__(message) self.code = code class HttpCase(TransactionCase): """ Transactional HTTP TestCase with url_open and Chrome headless helpers. """ registry_test_mode = True browser = None browser_size = '1366x768' touch_enabled = False allow_end_on_form = False _logger: logging.Logger = None @classmethod def setUpClass(cls): super().setUpClass() if cls.registry_test_mode: cls.registry.enter_test_mode(cls.cr) cls.addClassCleanup(cls.registry.leave_test_mode) ICP = cls.env['ir.config_parameter'] ICP.set_param('web.base.url', cls.base_url()) ICP.env.flush_all() # v8 api with correct xmlrpc exception handling. cls.xmlrpc_url = f'http://{HOST}:{odoo.tools.config["http_port"]:d}/xmlrpc/2/' cls._logger = logging.getLogger('%s.%s' % (cls.__module__, cls.__name__)) def setUp(self): super().setUp() self.xmlrpc_common = xmlrpclib.ServerProxy(self.xmlrpc_url + 'common', transport=Transport(self.cr)) self.xmlrpc_db = xmlrpclib.ServerProxy(self.xmlrpc_url + 'db', transport=Transport(self.cr)) self.xmlrpc_object = xmlrpclib.ServerProxy(self.xmlrpc_url + 'object', transport=Transport(self.cr)) # setup an url opener helper self.opener = Opener(self.cr) def url_open(self, url, data=None, files=None, timeout=12, headers=None, allow_redirects=True, head=False): if url.startswith('/'): url = self.base_url() + url if head: return self.opener.head(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=False) if data or files: return self.opener.post(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=allow_redirects) return self.opener.get(url, timeout=timeout, headers=headers, allow_redirects=allow_redirects) def _wait_remaining_requests(self, timeout=10): def get_http_request_threads(): return [t for t in threading.enumerate() if t.name.startswith('odoo.service.http.request.')] start_time = time.time() request_threads = get_http_request_threads() self._logger.info('waiting for threads: %s', request_threads) for thread in request_threads: thread.join(timeout - (time.time() - start_time)) request_threads = get_http_request_threads() for thread in request_threads: self._logger.info("Stop waiting for thread %s handling request for url %s", thread.name, getattr(thread, 'url', '')) if request_threads: self._logger.info('remaining requests') odoo.tools.misc.dumpstacks() def logout(self, keep_db=True): self.session.logout(keep_db=keep_db) odoo.http.root.session_store.save(self.session) def authenticate(self, user, password, browser: ChromeBrowser = None): if getattr(self, 'session', None): odoo.http.root.session_store.delete(self.session) self.session = session = odoo.http.root.session_store.new() session.update(odoo.http.get_default_session(), db=get_db_name()) session.context['lang'] = odoo.http.DEFAULT_LANG if user: # if authenticated # Flush and clear the current transaction. This is useful, because # the call below opens a test cursor, which uses a different cache # than this transaction. self.cr.flush() self.cr.clear() uid = self.registry['res.users'].authenticate(session.db, user, password, {'interactive': False}) env = api.Environment(self.cr, uid, {}) session.uid = uid session.login = user session.session_token = uid and security.compute_session_token(session, env) session.context = dict(env['res.users'].context_get()) odoo.http.root.session_store.save(session) # Reset the opener: turns out when we set cookies['foo'] we're really # setting a cookie on domain='' path='/'. # # But then our friendly neighborhood server might set a cookie for # domain='localhost' path='/' (with the same value) which is considered # a *different* cookie following ours rather than the same. # # When we update our cookie, it's done in-place, so the server-set # cookie is still present and (as it follows ours and is more precise) # very likely to still be used, therefore our session change is ignored. # # An alternative would be to set the cookie to None (unsetting it # completely) or clear-ing session.cookies. self.opener = Opener(self.cr) self.opener.cookies['session_id'] = session.sid if browser: self._logger.info('Setting session cookie in browser') browser.set_cookie('session_id', session.sid, '/', HOST) return session def browser_js(self, url_path, code, ready='', login=None, timeout=60, cookies=None, error_checker=None, watch=False, **kw): """ Test js code running in the browser - optionnally log as 'login' - load page given by url_path - wait for ready object to be available - eval(code) inside the page To signal success test do: console.log('test successful') To signal test failure raise an exception or call console.error with a message. Test will stop when a failure occurs if error_checker is not defined or returns True for this message """ if not self.env.registry.loaded: self._logger.warning('HttpCase test should be in post_install only') # increase timeout if coverage is running if any(f.filename.endswith('/coverage/execfile.py') for f in inspect.stack() if f.filename): timeout = timeout * 1.5 if watch: _logger.warning('watch mode is only suitable for local testing') browser = ChromeBrowser(type(self), headless=not watch) try: self.authenticate(login, login, browser=browser) # Flush and clear the current transaction. This is useful in case # we make requests to the server, as these requests are made with # test cursors, which uses different caches than this transaction. self.cr.flush() self.cr.clear() url = werkzeug.urls.url_join(self.base_url(), url_path) if watch: parsed = werkzeug.urls.url_parse(url) qs = parsed.decode_query() qs['watch'] = '1' url = parsed.replace(query=werkzeug.urls.url_encode(qs)).to_url() self._logger.info('Open "%s" in browser', url) if browser.screencasts_dir: self._logger.info('Starting screencast') browser.start_screencast() if cookies: for name, value in cookies.items(): browser.set_cookie(name, value, '/', HOST) browser.navigate_to(url, wait_stop=not bool(ready)) # Needed because tests like test01.js (qunit tests) are passing a ready # code = "" self.assertTrue(browser._wait_ready(ready), 'The ready "%s" code was always falsy' % ready) error = False try: browser._wait_code_ok(code, timeout, error_checker=error_checker) except ChromeBrowserException as chrome_browser_exception: error = chrome_browser_exception if error: # dont keep initial traceback, keep that outside of except if code: message = 'The test code "%s" failed' % code else: message = "Some js test failed" self.fail('%s\n\n%s' % (message, error)) finally: browser.stop() self._wait_remaining_requests() @classmethod def base_url(cls): return f"http://{HOST}:{odoo.tools.config['http_port']}" def start_tour(self, url_path, tour_name, step_delay=None, **kwargs): """Wrapper for `browser_js` to start the given `tour_name` with the optional delay between steps `step_delay`. Other arguments from `browser_js` can be passed as keyword arguments.""" options = { 'stepDelay': step_delay if step_delay else 0, 'keepWatchBrowser': kwargs.get('watch', False), 'startUrl': url_path, } code = kwargs.pop('code', "odoo.startTour('%s', %s)" % (tour_name, json.dumps(options))) ready = kwargs.pop('ready', "odoo.isTourReady('%s')" % tour_name) return self.browser_js(url_path=url_path, code=code, ready=ready, **kwargs) def profile(self, **kwargs): """ for http_case, also patch _get_profiler_context_manager in order to profile all requests """ sup = super() _profiler = sup.profile(**kwargs) def route_profiler(request): return sup.profile(description=request.httprequest.full_path) return profiler.Nested(_profiler, patch('odoo.http.Request._get_profiler_context_manager', route_profiler)) def make_jsonrpc_request(self, route, params=None, headers=None): """Make a JSON-RPC request to the server. :param str route: the route to request :param dict params: the parameters to send :raises requests.HTTPError: if one occurred :raises JsonRpcException: if the response contains an error :return: The 'result' key from the response if any. """ data = json.dumps({ 'id': 0, 'jsonrpc': '2.0', 'method': 'call', 'params': params, }).encode() headers = headers or {} headers['Content-Type'] = 'application/json' response = self.url_open(route, data, headers=headers) response.raise_for_status() decoded_response = response.json() if 'result' in decoded_response: return decoded_response['result'] if 'error' in decoded_response: raise JsonRpcException( code=decoded_response['error']['code'], message=decoded_response['error']['data']['name'] ) def no_retry(arg): """Disable auto retry on decorated test method or test class""" arg._retry = False return arg def users(*logins): """ Decorate a method to execute it once for each given user. """ @decorator def _users(func, *args, **kwargs): self = args[0] old_uid = self.uid try: # retrieve users Users = self.env['res.users'].with_context(active_test=False) user_id = { user.login: user.id for user in Users.search([('login', 'in', list(logins))]) } for login in logins: with self.subTest(login=login): # switch user and execute func self.uid = user_id[login] func(*args, **kwargs) # Invalidate the cache between subtests, in order to not reuse # the former user's cache (`test_read_mail`, `test_write_mail`) self.env.invalidate_all() finally: self.uid = old_uid return _users @decorator def warmup(func, *args, **kwargs): """ Decorate a test method to run it twice: once for a warming up phase, and a second time for real. The test attribute ``warm`` is set to ``False`` during warm up, and ``True`` once the test is warmed up. Note that the effects of the warmup phase are rolled back thanks to a savepoint. """ self = args[0] self.env.flush_all() self.env.invalidate_all() # run once to warm up the caches self.warm = False self.cr.execute('SAVEPOINT test_warmup') func(*args, **kwargs) self.env.flush_all() # run once for real self.cr.execute('ROLLBACK TO SAVEPOINT test_warmup') self.env.invalidate_all() self.warm = True func(*args, **kwargs) def can_import(module): """ Checks if can be imported, returns ``True`` if it can be, ``False`` otherwise. To use with ``unittest.skipUnless`` for tests conditional on *optional* dependencies, which may or may be present but must still be tested if possible. """ try: importlib.import_module(module) except ImportError: return False else: return True def tagged(*tags): """A decorator to tag BaseCase objects. Tags are stored in a set that can be accessed from a 'test_tags' attribute. A tag prefixed by '-' will remove the tag e.g. to remove the 'standard' tag. By default, all Test classes from odoo.tests.common have a test_tags attribute that defaults to 'standard' and 'at_install'. When using class inheritance, the tags ARE inherited. """ include = {t for t in tags if not t.startswith('-')} exclude = {t[1:] for t in tags if t.startswith('-')} def tags_decorator(obj): obj.test_tags = (getattr(obj, 'test_tags', set()) | include) - exclude at_install = 'at_install' in obj.test_tags post_install = 'post_install' in obj.test_tags if not (at_install ^ post_install): _logger.warning('A tests should be either at_install or post_install, which is not the case of %r', obj) return obj return tags_decorator