odoo_17.0.1/odoo/tests/common.py

1993 lines
80 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
"""
The module :mod:`odoo.tests.common` provides unittest test cases and a few
helpers and classes to write tests.
"""
import base64
import concurrent.futures
import contextlib
import difflib
import importlib
import inspect
import itertools
import json
import logging
import os
import pathlib
import platform
import pprint
import re
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
import unittest
import warnings
from collections import defaultdict, deque
from concurrent.futures import Future, CancelledError, wait
try:
from concurrent.futures import InvalidStateError
except ImportError:
InvalidStateError = NotImplementedError
from contextlib import contextmanager, ExitStack
from datetime import datetime
from functools import lru_cache
from itertools import zip_longest as izip_longest
from unittest.mock import patch, _patch
from xmlrpc import client as xmlrpclib
import requests
import werkzeug.urls
from lxml import etree, html
from requests import PreparedRequest, Session
import odoo
from odoo import api
from odoo.models import BaseModel
from odoo.exceptions import AccessError
from odoo.modules.registry import Registry
from odoo.service import security
from odoo.sql_db import BaseCursor, Cursor
from odoo.tools import float_compare, single_email_re, profiler, lower_logging, SQL
from odoo.tools.misc import find_in_path, mute_logger
from . import case
try:
# the behaviour of decorator changed in 5.0.5 changing the structure of the traceback when
# an error is raised inside a method using a decorator.
# this is not a hudge problem for test execution but this makes error message
# more difficult to read and breaks test_with_decorators
# This also changes the error format making runbot error matching fail
# This also breaks the first frame meaning that the module detection will also fail on runbot
# In 5.1 decoratorx was introduced and it looks like it has the same behaviour of old decorator
from decorator import decoratorx as decorator
except ImportError:
from decorator import decorator
try:
import websocket
except ImportError:
# chrome headless tests will be skipped
websocket = None
_logger = logging.getLogger(__name__)
# backward compatibility: Form was defined in this file
def __getattr__(name):
# pylint: disable=import-outside-toplevel
if name != 'Form':
raise AttributeError(name)
from .form import Form
warnings.warn(
"Since 17.0: odoo.tests.common.Form is deprecated, use odoo.tests.Form",
category=PendingDeprecationWarning,
stacklevel=2,
)
return Form
# The odoo library is supposed already configured.
ADDONS_PATH = odoo.tools.config['addons_path']
HOST = '127.0.0.1'
# Useless constant, tests are aware of the content of demo data
ADMIN_USER_ID = odoo.SUPERUSER_ID
CHECK_BROWSER_SLEEP = 0.1 # seconds
CHECK_BROWSER_ITERATIONS = 100
BROWSER_WAIT = CHECK_BROWSER_SLEEP * CHECK_BROWSER_ITERATIONS # seconds
def get_db_name():
db = odoo.tools.config['db_name']
# If the database name is not provided on the command-line,
# use the one on the thread (which means if it is provided on
# the command-line, this will break when installing another
# database from XML-RPC).
if not db and hasattr(threading.current_thread(), 'dbname'):
return threading.current_thread().dbname
return db
standalone_tests = defaultdict(list)
def standalone(*tags):
""" Decorator for standalone test functions. This is somewhat dedicated to
tests that install, upgrade or uninstall some modules, which is currently
forbidden in regular test cases. The function is registered under the given
``tags`` and the corresponding Odoo module name.
"""
def register(func):
# register func by odoo module name
if func.__module__.startswith('odoo.addons.'):
module = func.__module__.split('.')[2]
standalone_tests[module].append(func)
# register func with aribitrary name, if any
for tag in tags:
standalone_tests[tag].append(func)
standalone_tests['all'].append(func)
return func
return register
# For backwards-compatibility - get_db_name() should be used instead
DB = get_db_name()
def new_test_user(env, login='', groups='base.group_user', context=None, **kwargs):
""" Helper function to create a new test user. It allows to quickly create
users given its login and groups (being a comma separated list of xml ids).
Kwargs are directly propagated to the create to further customize the
created user.
User creation uses a potentially customized environment using the context
parameter allowing to specify a custom context. It can be used to force a
specific behavior and/or simplify record creation. An example is to use
mail-related context keys in mail tests to speedup record creation.
Some specific fields are automatically filled to avoid issues
* groups_id: it is filled using groups function parameter;
* name: "login (groups)" by default as it is required;
* email: it is either the login (if it is a valid email) or a generated
string 'x.x@example.com' (x being the first login letter). This is due
to email being required for most odoo operations;
"""
if not login:
raise ValueError('New users require at least a login')
if not groups:
raise ValueError('New users require at least user groups')
if context is None:
context = {}
groups_id = [(6, 0, [env.ref(g.strip()).id for g in groups.split(',')])]
create_values = dict(kwargs, login=login, groups_id=groups_id)
# automatically generate a name as "Login (groups)" to ease user comprehension
if not create_values.get('name'):
create_values['name'] = '%s (%s)' % (login, groups)
# automatically give a password equal to login
if not create_values.get('password'):
create_values['password'] = login + 'x' * (8 - len(login))
# generate email if not given as most test require an email
if 'email' not in create_values:
if single_email_re.match(login):
create_values['email'] = login
else:
create_values['email'] = '%s.%s@example.com' % (login[0], login[0])
# ensure company_id + allowed company constraint works if not given at create
if 'company_id' in create_values and 'company_ids' not in create_values:
create_values['company_ids'] = [(4, create_values['company_id'])]
return env['res.users'].with_context(**context).create(create_values)
def loaded_demo_data(env):
return bool(env.ref('base.user_demo', raise_if_not_found=False))
class RecordCapturer:
def __init__(self, model, domain):
self._model = model
self._domain = domain
def __enter__(self):
self._before = self._model.search(self._domain, order='id')
self._after = None
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if exc_type is None:
self._after = self._model.search(self._domain, order='id') - self._before
@property
def records(self):
if self._after is None:
return self._model.search(self._domain, order='id') - self._before
return self._after
class MetaCase(type):
""" Metaclass of test case classes to assign default 'test_tags':
'standard', 'at_install' and the name of the module.
"""
def __init__(cls, name, bases, attrs):
super(MetaCase, cls).__init__(name, bases, attrs)
# assign default test tags
if cls.__module__.startswith('odoo.addons.'):
if getattr(cls, 'test_tags', None) is None:
cls.test_tags = {'standard', 'at_install'}
cls.test_module = cls.__module__.split('.')[2]
cls.test_class = cls.__name__
cls.test_sequence = 0
def _normalize_arch_for_assert(arch_string, parser_method="xml"):
"""Takes some xml and normalize it to make it comparable to other xml
in particular, blank text is removed, and the output is pretty-printed
:param str arch_string: the string representing an XML arch
:param str parser_method: an string representing which lxml.Parser class to use
when normalizing both archs. Takes either "xml" or "html"
:return: the normalized arch
:rtype str:
"""
Parser = None
if parser_method == 'xml':
Parser = etree.XMLParser
elif parser_method == 'html':
Parser = etree.HTMLParser
parser = Parser(remove_blank_text=True)
arch_string = etree.fromstring(arch_string, parser=parser)
return etree.tostring(arch_string, pretty_print=True, encoding='unicode')
class BlockedRequest(requests.exceptions.ConnectionError):
pass
_super_send = requests.Session.send
class BaseCase(case.TestCase, metaclass=MetaCase):
""" Subclass of TestCase for Odoo-specific code. This class is abstract and
expects self.registry, self.cr and self.uid to be initialized by subclasses.
"""
longMessage = True # more verbose error message by default: https://www.odoo.com/r/Vmh
warm = True # False during warm-up phase (see :func:`warmup`)
_python_version = sys.version_info
def __init__(self, methodName='runTest'):
super().__init__(methodName)
self.addTypeEqualityFunc(etree._Element, self.assertTreesEqual)
self.addTypeEqualityFunc(html.HtmlElement, self.assertTreesEqual)
@classmethod
def _request_handler(cls, s: Session, r: PreparedRequest, /, **kw):
# allow localhost requests
# TODO: also check port?
url = werkzeug.urls.url_parse(r.url)
if url.host in (HOST, 'localhost'):
return _super_send(s, r, **kw)
if url.scheme == 'file':
return _super_send(s, r, **kw)
_logger.getChild('requests').info(
"Blocking un-mocked external HTTP request %s %s", r.method, r.url)
raise BlockedRequest(f"External requests verboten (was {r.method} {r.url})")
def run(self, result):
testMethod = getattr(self, self._testMethodName)
if getattr(testMethod, '_retry', True) and getattr(self, '_retry', True):
tests_run_count = int(os.environ.get('ODOO_TEST_FAILURE_RETRIES', 0)) + 1
else:
tests_run_count = 1
_logger.info('Auto retry disabled for %s', self)
failure = False
for retry in range(tests_run_count):
if retry:
_logger.runbot(f'Retrying a failed test: {self}')
if retry < tests_run_count-1:
with warnings.catch_warnings(), \
result.soft_fail(), \
lower_logging(25, logging.INFO) as quiet_log:
super().run(result)
failure = result.had_failure or quiet_log.had_error_log
else: # last try
super().run(result)
if not failure:
break
@classmethod
def setUpClass(cls):
def check_remaining_patchers():
for patcher in _patch._active_patches:
_logger.warning("A patcher (targeting %s.%s) was remaining active at the end of %s, disabling it...", patcher.target, patcher.attribute, cls.__name__)
patcher.stop()
cls.addClassCleanup(check_remaining_patchers)
super().setUpClass()
if 'standard' in cls.test_tags:
# if the method is passed directly `patch` discards the session
# object which we need
# pylint: disable=unnecessary-lambda
patcher = patch.object(
requests.sessions.Session,
'send',
lambda s, r, **kwargs: cls._request_handler(s, r, **kwargs),
)
patcher.start()
cls.addClassCleanup(patcher.stop)
def cursor(self):
return self.registry.cursor()
@property
def uid(self):
""" Get the current uid. """
return self.env.uid
@uid.setter
def uid(self, user):
""" Set the uid by changing the test's environment. """
self.env = self.env(user=user)
def ref(self, xid):
""" Returns database ID for the provided :term:`external identifier`,
shortcut for ``_xmlid_lookup``
:param xid: fully-qualified :term:`external identifier`, in the form
:samp:`{module}.{identifier}`
:raise: ValueError if not found
:returns: registered id
"""
return self.browse_ref(xid).id
def browse_ref(self, xid):
""" Returns a record object for the provided
:term:`external identifier`
:param xid: fully-qualified :term:`external identifier`, in the form
:samp:`{module}.{identifier}`
:raise: ValueError if not found
:returns: :class:`~odoo.models.BaseModel`
"""
assert "." in xid, "this method requires a fully qualified parameter, in the following form: 'module.identifier'"
return self.env.ref(xid)
def patch(self, obj, key, val):
""" Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """
patcher = patch.object(obj, key, val) # this is unittest.mock.patch
patcher.start()
self.addCleanup(patcher.stop)
@classmethod
def classPatch(cls, obj, key, val):
""" Do the patch ``setattr(obj, key, val)``, and prepare cleanup. """
patcher = patch.object(obj, key, val) # this is unittest.mock.patch
patcher.start()
cls.addClassCleanup(patcher.stop)
def startPatcher(self, patcher):
mock = patcher.start()
self.addCleanup(patcher.stop)
return mock
@classmethod
def startClassPatcher(cls, patcher):
mock = patcher.start()
cls.addClassCleanup(patcher.stop)
return mock
@contextmanager
def with_user(self, login):
""" Change user for a given test, like with self.with_user() ... """
old_uid = self.uid
try:
user = self.env['res.users'].sudo().search([('login', '=', login)])
assert user, "Login %s not found" % login
# switch user
self.uid = user.id
self.env = self.env(user=self.uid)
yield
finally:
# back
self.uid = old_uid
self.env = self.env(user=self.uid)
@contextmanager
def debug_mode(self):
""" Enable the effects of group 'base.group_no_one'; mainly useful with :class:`Form`. """
origin_user_has_groups = BaseModel.user_has_groups
def user_has_groups(self, groups):
group_set = set(groups.split(','))
if '!base.group_no_one' in group_set:
return False
elif 'base.group_no_one' in group_set:
group_set.remove('base.group_no_one')
return not group_set or origin_user_has_groups(self, ','.join(group_set))
return origin_user_has_groups(self, groups)
with patch('odoo.models.BaseModel.user_has_groups', user_has_groups):
yield
@contextmanager
def _assertRaises(self, exception, *, msg=None):
""" Context manager that clears the environment upon failure. """
with ExitStack() as init:
if hasattr(self, 'env'):
init.enter_context(self.env.cr.savepoint())
if issubclass(exception, AccessError):
# The savepoint() above calls flush(), which leaves the
# record cache with lots of data. This can prevent
# access errors to be detected. In order to avoid this
# issue, we clear the cache before proceeding.
self.env.cr.clear()
with ExitStack() as inner:
cm = inner.enter_context(super().assertRaises(exception, msg=msg))
# *moves* the cleanups from init to inner, this ensures the
# savepoint gets rolled back when `yield` raises `exception`,
# but still allows the initialisation to be protected *and* not
# interfered with by `assertRaises`.
inner.push(init.pop_all())
yield cm
def assertRaises(self, exception, func=None, *args, **kwargs):
if func:
with self._assertRaises(exception):
func(*args, **kwargs)
else:
return self._assertRaises(exception, **kwargs)
if sys.version_info < (3, 10):
# simplified backport of assertNoLogs()
@contextmanager
def assertNoLogs(self, logger: str, level: str):
# assertLogs ensures there is at least one log record when
# exiting the context manager. We insert one dummy record just
# so we pass that silly test while still capturing the logs.
with self.assertLogs(logger, level) as capture:
logging.getLogger(logger).log(getattr(logging, level), "Dummy log record")
yield
if len(capture.output) > 1:
raise self.failureException(f"Unexpected logs found: {capture.output[1:]}")
@contextmanager
def assertQueries(self, expected, flush=True):
""" Check the queries made by the current cursor. ``expected`` is a list
of strings representing the expected queries being made. Query strings
are matched against each other, ignoring case and whitespaces.
"""
Cursor_execute = Cursor.execute
actual_queries = []
def execute(self, query, params=None, log_exceptions=None):
actual_queries.append(query.code if isinstance(query, SQL) else query)
return Cursor_execute(self, query, params, log_exceptions)
def get_unaccent_wrapper(cr):
return lambda x: x
if flush:
self.env.flush_all()
self.env.cr.flush()
with patch('odoo.sql_db.Cursor.execute', execute):
with patch('odoo.osv.expression.get_unaccent_wrapper', get_unaccent_wrapper):
yield actual_queries
if flush:
self.env.flush_all()
self.env.cr.flush()
if not self.warm:
return
self.assertEqual(
len(actual_queries), len(expected),
"\n---- actual queries:\n%s\n---- expected queries:\n%s" % (
"\n".join(actual_queries), "\n".join(expected),
)
)
for actual_query, expect_query in zip(actual_queries, expected):
self.assertEqual(
"".join(actual_query.lower().split()),
"".join(expect_query.lower().split()),
"\n---- actual query:\n%s\n---- not like:\n%s" % (actual_query, expect_query),
)
@contextmanager
def assertQueryCount(self, default=0, flush=True, **counters):
""" Context manager that counts queries. It may be invoked either with
one value, or with a set of named arguments like ``login=value``::
with self.assertQueryCount(42):
...
with self.assertQueryCount(admin=3, demo=5):
...
The second form is convenient when used with :func:`users`.
"""
if self.warm:
# mock random in order to avoid random bus gc
with patch('random.random', lambda: 1):
login = self.env.user.login
expected = counters.get(login, default)
if flush:
self.env.flush_all()
self.env.cr.flush()
count0 = self.cr.sql_log_count
yield
if flush:
self.env.flush_all()
self.env.cr.flush()
count = self.cr.sql_log_count - count0
if count != expected:
# add some info on caller to allow semi-automatic update of query count
frame, filename, linenum, funcname, lines, index = inspect.stack()[2]
filename = filename.replace('\\', '/')
if "/odoo/addons/" in filename:
filename = filename.rsplit("/odoo/addons/", 1)[1]
if count > expected:
msg = "Query count more than expected for user %s: %d > %d in %s at %s:%s"
# add a subtest in order to continue the test_method in case of failures
with self.subTest():
self.fail(msg % (login, count, expected, funcname, filename, linenum))
else:
logger = logging.getLogger(type(self).__module__)
msg = "Query count less than expected for user %s: %d < %d in %s at %s:%s"
logger.info(msg, login, count, expected, funcname, filename, linenum)
else:
# flush before and after during warmup, in order to reproduce the
# same operations, otherwise the caches might not be ready!
if flush:
self.env.flush_all()
self.env.cr.flush()
yield
if flush:
self.env.flush_all()
self.env.cr.flush()
def assertRecordValues(self, records, expected_values):
''' Compare a recordset with a list of dictionaries representing the expected results.
This method performs a comparison element by element based on their index.
Then, the order of the expected values is extremely important.
Note that:
- Comparison between falsy values is supported: False match with None.
- Comparison between monetary field is also treated according the currency's rounding.
- Comparison between x2many field is done by ids. Then, empty expected ids must be [].
- Comparison between many2one field id done by id. Empty comparison can be done using any falsy value.
:param records: The records to compare.
:param expected_values: List of dicts expected to be exactly matched in records
'''
def _compare_candidate(record, candidate, field_names):
''' Compare all the values in `candidate` with a record.
:param record: record being compared
:param candidate: dict of values to compare
:return: A dictionary will encountered difference in values.
'''
diff = {}
for field_name in field_names:
record_value = record[field_name]
field = record._fields[field_name]
field_type = field.type
if field_type == 'monetary':
# Compare monetary field.
currency_field_name = record._fields[field_name].get_currency_field(record)
record_currency = record[currency_field_name]
if field_name not in candidate:
diff[field_name] = (record_value, None)
elif record_currency:
if record_currency.compare_amounts(candidate[field_name], record_value):
diff[field_name] = (record_value, record_currency.round(candidate[field_name]))
elif candidate[field_name] != record_value:
diff[field_name] = (record_value, candidate[field_name])
elif field_type == 'float' and field.get_digits(record.env):
prec = field.get_digits(record.env)[1]
if float_compare(candidate[field_name], record_value, precision_digits=prec) != 0:
diff[field_name] = (record_value, candidate[field_name])
elif field_type in ('one2many', 'many2many'):
# Compare x2many relational fields.
# Empty comparison must be an empty list to be True.
if field_name not in candidate:
diff[field_name] = (sorted(record_value.ids), None)
elif set(record_value.ids) != set(candidate[field_name]):
diff[field_name] = (sorted(record_value.ids), sorted(candidate[field_name]))
elif field_type == 'many2one':
# Compare many2one relational fields.
# Every falsy value is allowed to compare with an empty record.
if field_name not in candidate:
diff[field_name] = (record_value.id, None)
elif (record_value or candidate[field_name]) and record_value.id != candidate[field_name]:
diff[field_name] = (record_value.id, candidate[field_name])
else:
# Compare others fields if not both interpreted as falsy values.
if field_name not in candidate:
diff[field_name] = (record_value, None)
elif (candidate[field_name] or record_value) and record_value != candidate[field_name]:
diff[field_name] = (record_value, candidate[field_name])
return diff
# Compare records with candidates.
different_values = []
field_names = list(expected_values[0].keys())
for index, record in enumerate(records):
is_additional_record = index >= len(expected_values)
candidate = {} if is_additional_record else expected_values[index]
diff = _compare_candidate(record, candidate, field_names)
if diff:
different_values.append((index, 'additional_record' if is_additional_record else 'regular_diff', diff))
for index in range(len(records), len(expected_values)):
diff = {}
for field_name in field_names:
diff[field_name] = (None, expected_values[index][field_name])
different_values.append((index, 'missing_record', diff))
# Build error message.
if not different_values:
return
errors = ['The records and expected_values do not match.']
if len(records) != len(expected_values):
errors.append('Wrong number of records to compare: %d records versus %d expected values.' % (len(records), len(expected_values)))
for index, diff_type, diff in different_values:
if diff_type == 'regular_diff':
errors.append('\n==== Differences at index %s ====' % index)
record_diff = ['%s:%s' % (k, v[0]) for k, v in diff.items()]
candidate_diff = ['%s:%s' % (k, v[1]) for k, v in diff.items()]
errors.append('\n'.join(difflib.unified_diff(record_diff, candidate_diff)))
elif diff_type == 'additional_record':
errors += [
'\n==== Additional record ====',
pprint.pformat(dict((k, v[0]) for k, v in diff.items())),
]
elif diff_type == 'missing_record':
errors += [
'\n==== Missing record ====',
pprint.pformat(dict((k, v[1]) for k, v in diff.items())),
]
self.fail('\n'.join(errors))
# turns out this thing may not be quite as useful as we thought...
def assertItemsEqual(self, a, b, msg=None):
self.assertCountEqual(a, b, msg=None)
def assertTreesEqual(self, n1, n2, msg=None):
self.assertIsNotNone(n1, msg)
self.assertIsNotNone(n2, msg)
self.assertEqual(n1.tag, n2.tag, msg)
# Because lxml.attrib is an ordereddict for which order is important
# to equality, even though *we* don't care
self.assertEqual(dict(n1.attrib), dict(n2.attrib), msg)
self.assertEqual((n1.text or u'').strip(), (n2.text or u'').strip(), msg)
self.assertEqual((n1.tail or u'').strip(), (n2.tail or u'').strip(), msg)
for c1, c2 in izip_longest(n1, n2):
self.assertTreesEqual(c1, c2, msg)
def _assertXMLEqual(self, original, expected, parser="xml"):
"""Asserts that two xmls archs are equal
:param original: the xml arch to test
:type original: str
:param expected: the xml arch of reference
:type expected: str
:param parser: an string representing which lxml.Parser class to use
when normalizing both archs. Takes either "xml" or "html"
:type parser: str
"""
self.maxDiff = 10000
if original:
original = _normalize_arch_for_assert(original, parser)
if expected:
expected = _normalize_arch_for_assert(expected, parser)
self.assertEqual(original, expected)
def assertXMLEqual(self, original, expected):
return self._assertXMLEqual(original, expected)
def assertHTMLEqual(self, original, expected):
return self._assertXMLEqual(original, expected, 'html')
def profile(self, description='', **kwargs):
test_method = getattr(self, '_testMethodName', 'Unknown test method')
if not hasattr(self, 'profile_session'):
self.profile_session = profiler.make_session(test_method)
return profiler.Profiler(
description='%s uid:%s %s %s' % (test_method, self.env.user.id, 'warm' if self.warm else 'cold', description),
db=self.env.cr.dbname,
profile_session=self.profile_session,
**kwargs)
savepoint_seq = itertools.count()
class TransactionCase(BaseCase):
""" Test class in which all test methods are run in a single transaction,
but each test method is run in a sub-transaction managed by a savepoint.
The transaction's cursor is always closed without committing.
The data setup common to all methods should be done in the class method
`setUpClass`, so that it is done once for all test methods. This is useful
for test cases containing fast tests but with significant database setup
common to all cases (complex in-db test data).
After being run, each test method cleans up the record cache and the
registry cache. However, there is no cleanup of the registry models and
fields. If a test modifies the registry (custom models and/or fields), it
should prepare the necessary cleanup (`self.registry.reset_changes()`).
"""
registry: Registry = None
env: api.Environment = None
cr: Cursor = None
muted_registry_logger = mute_logger(odoo.modules.registry._logger.name)
@classmethod
def _gc_filestore(cls):
# attachment can be created or unlink during the tests.
# they can addup during test and take some disc space.
# since cron are not running during tests, we need to gc manually
# We need to check the status of the file system outside of the test cursor
with odoo.registry(get_db_name()).cursor() as cr:
gc_env = api.Environment(cr, odoo.SUPERUSER_ID, {})
gc_env['ir.attachment']._gc_file_store_unsafe()
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.addClassCleanup(cls._gc_filestore)
cls.registry = odoo.registry(get_db_name())
cls.registry_start_sequence = cls.registry.registry_sequence
def reset_changes():
if (cls.registry_start_sequence != cls.registry.registry_sequence) or cls.registry.registry_invalidated:
with cls.registry.cursor() as cr:
cls.registry.setup_models(cr)
cls.registry.registry_invalidated = False
cls.registry.registry_sequence = cls.registry_start_sequence
with cls.muted_registry_logger:
cls.registry.clear_all_caches()
cls.registry.cache_invalidated.clear()
cls.addClassCleanup(reset_changes)
cls.cr = cls.registry.cursor()
cls.addClassCleanup(cls.cr.close)
cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {})
def setUp(self):
super().setUp()
# restore environments after the test to avoid invoking flush() with an
# invalid environment (inexistent user id) from another test
envs = self.env.all.envs
for env in list(envs):
self.addCleanup(env.clear)
# restore the set of known environments as it was at setUp
self.addCleanup(envs.update, list(envs))
self.addCleanup(envs.clear)
self.addCleanup(self.muted_registry_logger(self.registry.clear_all_caches))
# This prevents precommit functions and data from piling up
# until cr.flush is called in 'assertRaises' clauses
# (these are not cleared in self.env.clear or envs.clear)
cr = self.env.cr
def _reset(cb, funcs, data):
cb._funcs = funcs
cb.data = data
for callback in [cr.precommit, cr.postcommit, cr.prerollback, cr.postrollback]:
self.addCleanup(_reset, callback, deque(callback._funcs), dict(callback.data))
# flush everything in setUpClass before introducing a savepoint
self.env.flush_all()
self._savepoint_id = next(savepoint_seq)
self.cr.execute('SAVEPOINT test_%d' % self._savepoint_id)
self.addCleanup(self.cr.execute, 'ROLLBACK TO SAVEPOINT test_%d' % self._savepoint_id)
class SingleTransactionCase(BaseCase):
""" TestCase in which all test methods are run in the same transaction,
the transaction is started with the first test method and rolled back at
the end of the last.
"""
@classmethod
def __init_subclass__(cls):
super().__init_subclass__()
if issubclass(cls, TransactionCase):
_logger.warning("%s inherits from both TransactionCase and SingleTransactionCase")
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.registry = odoo.registry(get_db_name())
cls.addClassCleanup(cls.registry.reset_changes)
cls.addClassCleanup(cls.registry.clear_all_caches)
cls.cr = cls.registry.cursor()
cls.addClassCleanup(cls.cr.close)
cls.env = api.Environment(cls.cr, odoo.SUPERUSER_ID, {})
def setUp(self):
super(SingleTransactionCase, self).setUp()
self.env.flush_all()
class ChromeBrowserException(Exception):
pass
def fmap(future, map_fun):
"""Maps a future's result through a callback.
Resolves to the application of ``map_fun`` to the result of ``future``.
.. warning:: this does *not* recursively resolve futures, if that's what
you need see :func:`fchain`
"""
fmap_future = Future()
@future.add_done_callback
def _(f):
try:
fmap_future.set_result(map_fun(f.result()))
except Exception as e:
fmap_future.set_exception(e)
return fmap_future
def fchain(future, next_callback):
"""Chains a future's result to a new future through a callback.
Corresponds to the ``bind`` monadic operation (aka flatmap aka then...
kinda).
"""
new_future = Future()
@future.add_done_callback
def _(f):
try:
n = next_callback(f.result())
@n.add_done_callback
def _(f):
try:
new_future.set_result(f.result())
except Exception as e:
new_future.set_exception(e)
except Exception as e:
new_future.set_exception(e)
return new_future
def save_test_file(test_name, content, prefix, extension='png', logger=_logger, document_type='Screenshot', date_format="%Y%m%d_%H%M%S_%f"):
assert re.fullmatch(r'\w*_', prefix)
assert re.fullmatch(r'[a-z]+', extension)
assert re.fullmatch(r'\w+', test_name)
now = datetime.now().strftime(date_format)
screenshots_dir = pathlib.Path(odoo.tools.config['screenshots']) / get_db_name() / 'screenshots'
screenshots_dir.mkdir(parents=True, exist_ok=True)
fname = f'{prefix}{now}_{test_name}.{extension}'
full_path = screenshots_dir / fname
with full_path.open('wb') as f:
f.write(content)
logger.runbot(f'{document_type} in: {full_path}')
class ChromeBrowser:
""" Helper object to control a Chrome headless process. """
remote_debugging_port = 0 # 9222, change it in a non-git-tracked file
def __init__(self, test_class, headless=True):
self._logger = test_class._logger
self.test_class = test_class
if websocket is None:
self._logger.warning("websocket-client module is not installed")
raise unittest.SkipTest("websocket-client module is not installed")
self.user_data_dir = tempfile.mkdtemp(suffix='_chrome_odoo')
otc = odoo.tools.config
self.screencasts_dir = None
self.screencast_frames = []
if otc['screencasts']:
self.screencasts_dir = os.path.join(otc['screencasts'], get_db_name(), 'screencasts')
os.makedirs(self.screencasts_frames_dir, exist_ok=True)
if os.name == 'posix':
self.sigxcpu_handler = signal.getsignal(signal.SIGXCPU)
signal.signal(signal.SIGXCPU, self.signal_handler)
else:
self.sigxcpu_handler = None
self.chrome, self.devtools_port = self._chrome_start(
user_data_dir=self.user_data_dir,
window_size=test_class.browser_size,
touch_enabled=test_class.touch_enabled,
headless=headless,
)
self.ws = self._open_websocket()
self._request_id = itertools.count()
self._result = Future()
self.error_checker = None
self.had_failure = False
# maps request_id to Futures
self._responses = {}
# maps frame ids to callbacks
self._frames = {}
self._handlers = {
'Runtime.consoleAPICalled': self._handle_console,
'Runtime.exceptionThrown': self._handle_exception,
'Page.frameStoppedLoading': self._handle_frame_stopped_loading,
'Page.screencastFrame': self._handle_screencast_frame,
}
self._receiver = threading.Thread(
target=self._receive,
name="WebSocket events consumer",
args=(get_db_name(),)
)
self._receiver.start()
self._logger.info('Enable chrome headless console log notification')
self._websocket_send('Runtime.enable')
self._logger.info('Chrome headless enable page notifications')
self._websocket_send('Page.enable')
@property
def screencasts_frames_dir(self):
return os.path.join(self.screencasts_dir, 'frames')
def signal_handler(self, sig, frame):
if sig == signal.SIGXCPU:
_logger.info('CPU time limit reached, stopping Chrome and shutting down')
self.stop()
os._exit(0)
def stop(self):
if hasattr(self, 'ws'):
self._websocket_send('Page.stopScreencast')
if self.screencasts_dir:
screencasts_frames_dir = self.screencasts_frames_dir
self.screencasts_dir = None
if os.path.isdir(screencasts_frames_dir):
shutil.rmtree(screencasts_frames_dir, ignore_errors=True)
self._websocket_request('Page.stopLoading')
self._websocket_request('Runtime.evaluate', params={'expression': """
('serviceWorker' in navigator) &&
navigator.serviceWorker.getRegistrations().then(
registrations => Promise.all(registrations.map(r => r.unregister()))
)
""", 'awaitPromise': True})
# wait for the screenshot or whatever
wait(self._responses.values(), 10)
self._result.cancel()
self._logger.info("Closing chrome headless with pid %s", self.chrome.pid)
self._websocket_send('Browser.close')
self._logger.info("Closing websocket connection")
self.ws.close()
if self.chrome:
self._logger.info("Terminating chrome headless with pid %s", self.chrome.pid)
self.chrome.terminate()
if self.user_data_dir and os.path.isdir(self.user_data_dir) and self.user_data_dir != '/':
self._logger.info('Removing chrome user profile "%s"', self.user_data_dir)
shutil.rmtree(self.user_data_dir, ignore_errors=True)
# Restore previous signal handler
if self.sigxcpu_handler and os.name == 'posix':
signal.signal(signal.SIGXCPU, self.sigxcpu_handler)
@property
def executable(self):
return _find_executable()
def _chrome_without_limit(self, cmd):
if os.name == 'posix' and platform.system() != 'Darwin':
# since the introduction of pointer compression in Chrome 80 (v8 v8.0),
# the memory reservation algorithm requires more than 8GiB of
# virtual mem for alignment this exceeds our default memory limits.
def preexec():
import resource
resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
else:
preexec = None
# pylint: disable=subprocess-popen-preexec-fn
return subprocess.Popen(cmd, stderr=subprocess.DEVNULL, preexec_fn=preexec)
def _spawn_chrome(self, cmd):
proc = self._chrome_without_limit(cmd)
port_file = pathlib.Path(self.user_data_dir, 'DevToolsActivePort')
for _ in range(CHECK_BROWSER_ITERATIONS):
time.sleep(CHECK_BROWSER_SLEEP)
if port_file.is_file() and port_file.stat().st_size > 5:
with port_file.open('r', encoding='utf-8') as f:
return proc, int(f.readline())
raise unittest.SkipTest(f'Failed to detect chrome devtools port after {BROWSER_WAIT :.1f}s.')
def _chrome_start(
self,
user_data_dir: str,
window_size: str, touch_enabled: bool,
headless=True
):
headless_switches = {
'--headless': '',
'--disable-extensions': '',
'--disable-background-networking' : '',
'--disable-background-timer-throttling' : '',
'--disable-backgrounding-occluded-windows': '',
'--disable-renderer-backgrounding' : '',
'--disable-breakpad': '',
'--disable-client-side-phishing-detection': '',
'--disable-crash-reporter': '',
'--disable-dev-shm-usage': '',
'--disable-namespace-sandbox': '',
'--disable-translate': '',
'--no-sandbox': '',
'--disable-gpu': '',
}
switches = {
# required for tours that use Youtube autoplay conditions (namely website_slides' "course_tour")
'--autoplay-policy': 'no-user-gesture-required',
'--disable-default-apps': '',
'--disable-device-discovery-notifications': '',
'--no-default-browser-check': '',
'--remote-debugging-address': HOST,
'--remote-debugging-port': str(self.remote_debugging_port),
'--user-data-dir': user_data_dir,
'--window-size': window_size,
'--no-first-run': '',
# '--enable-precise-memory-info': '', # uncomment to debug memory leaks in qunit suite
# '--js-flags': '--expose-gc', # uncomment to debug memory leaks in qunit suite
}
if headless:
switches.update(headless_switches)
if touch_enabled:
# enable Chrome's Touch mode, useful to detect touch capabilities using
# "'ontouchstart' in window"
switches['--touch-events'] = ''
cmd = [self.executable]
cmd += ['%s=%s' % (k, v) if v else k for k, v in switches.items()]
url = 'about:blank'
cmd.append(url)
try:
proc, devtools_port = self._spawn_chrome(cmd)
except OSError:
raise unittest.SkipTest("%s not found" % cmd[0])
self._logger.info('Chrome pid: %s', proc.pid)
self._logger.info('Chrome headless temporary user profile dir: %s', self.user_data_dir)
return proc, devtools_port
def _json_command(self, command, timeout=3):
"""Queries browser state using JSON
Available commands:
``''``
return list of tabs with their id
``list`` (or ``json/``)
list tabs
``new``
open a new tab
:samp:`activate/{id}`
activate a tab
:samp:`close/{id}`
close a tab
``version``
get chrome and dev tools version
``protocol``
get the full protocol
"""
command = '/'.join(['json', command]).strip('/')
url = werkzeug.urls.url_join('http://%s:%s/' % (HOST, self.devtools_port), command)
self._logger.info("Issuing json command %s", url)
delay = 0.1
tries = 0
failure_info = None
message = None
while timeout > 0:
try:
self.chrome.send_signal(0)
except ProcessLookupError:
message = 'Chrome crashed at startup'
break
try:
r = requests.get(url, timeout=3)
if r.ok:
return r.json()
except requests.ConnectionError as e:
failure_info = str(e)
message = 'Connection Error while trying to connect to Chrome debugger'
except requests.exceptions.ReadTimeout as e:
failure_info = str(e)
message = 'Connection Timeout while trying to connect to Chrome debugger'
break
time.sleep(delay)
timeout -= delay
delay = delay * 1.5
tries += 1
self._logger.error("%s after %s tries" % (message, tries))
if failure_info:
self._logger.info(failure_info)
self.stop()
raise unittest.SkipTest("Error during Chrome headless connection")
def _open_websocket(self):
version = self._json_command('version')
self._logger.info('Browser version: %s', version['Browser'])
start = time.time()
while (time.time() - start) < 5.0:
ws_url = next((
target['webSocketDebuggerUrl']
for target in self._json_command('')
if target['type'] == 'page'
if target['url'] == 'about:blank'
), None)
if ws_url:
break
time.sleep(0.1)
else:
self.stop()
raise unittest.SkipTest("Error during Chrome connection: never found 'page' target")
self._logger.info('Websocket url found: %s', ws_url)
ws = websocket.create_connection(ws_url, enable_multithread=True, suppress_origin=True)
if ws.getstatus() != 101:
raise unittest.SkipTest("Cannot connect to chrome dev tools")
ws.settimeout(0.01)
return ws
def _receive(self, dbname):
threading.current_thread().dbname = dbname
# So CDT uses a streamed JSON-RPC structure, meaning a request is
# {id, method, params} and eventually a {id, result | error} should
# arrive the other way, however for events it uses "notifications"
# meaning request objects without an ``id``, but *coming from the server
while True: # or maybe until `self._result` is `done()`?
try:
msg = self.ws.recv()
if not msg:
continue
self._logger.debug('\n<- %s', msg)
except websocket.WebSocketTimeoutException:
continue
except Exception as e:
# if the socket is still connected something bad happened,
# otherwise the client was just shut down
if self.ws.connected:
self._result.set_exception(e)
raise
self._result.cancel()
return
res = json.loads(msg)
request_id = res.get('id')
try:
if request_id is None:
handler = self._handlers.get(res['method'])
if handler:
handler(**res['params'])
else:
f = self._responses.pop(request_id, None)
if f:
if 'result' in res:
f.set_result(res['result'])
else:
f.set_exception(ChromeBrowserException(res['error']['message']))
except Exception:
msg = str(msg)
if msg and len(msg) > 500:
msg = msg[:500] + '...'
_logger.exception("While processing message %s", msg)
def _websocket_request(self, method, *, params=None, timeout=10.0):
assert threading.get_ident() != self._receiver.ident,\
"_websocket_request must not be called from the consumer thread"
if self.ws is None:
return
f = self._websocket_send(method, params=params, with_future=True)
try:
return f.result(timeout=timeout)
except concurrent.futures.TimeoutError:
raise TimeoutError(f'{method}({params or ""})')
def _websocket_send(self, method, *, params=None, with_future=False):
"""send chrome devtools protocol commands through websocket
If ``with_future`` is set, returns a ``Future`` for the operation.
"""
if self.ws is None:
return
result = None
request_id = next(self._request_id)
if with_future:
result = self._responses[request_id] = Future()
payload = {'method': method, 'id': request_id}
if params:
payload['params'] = params
self._logger.debug('\n-> %s', payload)
self.ws.send(json.dumps(payload))
return result
def _handle_console(self, type, args=None, stackTrace=None, **kw): # pylint: disable=redefined-builtin
# console formatting differs somewhat from Python's, if args[0] has
# format modifiers that many of args[1:] get formatted in, missing
# args are replaced by empty strings and extra args are concatenated
# (space-separated)
#
# current version modifies the args in place which could and should
# probably be improved
if args:
arg0, args = str(self._from_remoteobject(args[0])), args[1:]
else:
arg0, args = '', []
formatted = [re.sub(r'%[%sdfoOc]', self.console_formatter(args), arg0)]
# formatter consumes args it uses, leaves unformatted args untouched
formatted.extend(str(self._from_remoteobject(arg)) for arg in args)
message = ' '.join(formatted)
stack = ''.join(self._format_stack({'type': type, 'stackTrace': stackTrace}))
if stack:
message += '\n' + stack
log_type = type
_logger = self._logger.getChild('browser')
_logger.log(
self._TO_LEVEL.get(log_type, logging.INFO),
"%s%s",
"Error received after termination: " if self._result.done() else "",
message # might still have %<x> characters
)
if log_type == 'error':
self.had_failure = True
if self._result.done():
return
if not self.error_checker or self.error_checker(message):
self.take_screenshot()
self._save_screencast()
try:
self._result.set_exception(ChromeBrowserException(message))
except CancelledError:
...
except InvalidStateError:
self._logger.warning(
"Trying to set result to failed (%s) but found the future settled (%s)",
message, self._result
)
elif 'test successful' in message:
if self.test_class.allow_end_on_form:
self._result.set_result(True)
return
qs = fchain(
self._websocket_send('DOM.getDocument', params={'depth': 0}, with_future=True),
lambda d: self._websocket_send("DOM.querySelector", params={
'nodeId': d['root']['nodeId'],
'selector': '.o_form_dirty',
}, with_future=True)
)
@qs.add_done_callback
def _qs_result(fut):
node_id = 0
with contextlib.suppress(Exception):
node_id = fut.result()['nodeId']
if node_id:
self.take_screenshot("unsaved_form_")
msg = """\
Tour finished with an open form view in edition mode.
Form views in edition mode are automatically saved when the page is closed, \
which leads to stray network requests and inconsistencies."""
if self._result.done():
_logger.error("%s", msg)
else:
self._result.set_exception(ChromeBrowserException(msg))
return
if not self._result.done():
self._result.set_result(True)
elif self._result.exception() is None:
# if the future was already failed, we're happy,
# otherwise swap for a new failed
_logger.error("Tried to make the tour successful twice.")
def _handle_exception(self, exceptionDetails, timestamp):
message = exceptionDetails['text']
exception = exceptionDetails.get('exception')
if exception:
message += str(self._from_remoteobject(exception))
exceptionDetails['type'] = 'trace' # fake this so _format_stack works
stack = ''.join(self._format_stack(exceptionDetails))
if stack:
message += '\n' + stack
if self._result.done():
self._logger.getChild('browser').error(
"Exception received after termination: %s", message)
return
self.take_screenshot()
self._save_screencast()
try:
self._result.set_exception(ChromeBrowserException(message))
except CancelledError:
...
except InvalidStateError:
self._logger.warning(
"Trying to set result to failed (%s) but found the future settled (%s)",
message, self._result
)
def _handle_frame_stopped_loading(self, frameId):
wait = self._frames.pop(frameId, None)
if wait:
wait()
def _handle_screencast_frame(self, sessionId, data, metadata):
if not self.screencasts_frames_dir:
return
self._websocket_send('Page.screencastFrameAck', params={'sessionId': sessionId})
if not self.screencasts_dir:
return
outfile = os.path.join(self.screencasts_frames_dir, 'frame_%05d.b64' % len(self.screencast_frames))
try:
with open(outfile, 'w') as f:
f.write(data)
self.screencast_frames.append({
'file_path': outfile,
'timestamp': metadata.get('timestamp')
})
except FileNotFoundError:
self._logger.debug('Useless screencast frame skipped: %s', outfile)
_TO_LEVEL = {
'debug': logging.DEBUG,
'log': logging.INFO,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
# TODO: what do with
# dir, dirxml, table, trace, clear, startGroup, startGroupCollapsed,
# endGroup, assert, profile, profileEnd, count, timeEnd
}
def take_screenshot(self, prefix='sc_'):
def handler(f):
base_png = f.result(timeout=0)['data']
if not base_png:
self._logger.warning("Couldn't capture screenshot: expected image data, got ?? error ??")
return
decoded = base64.b64decode(base_png, validate=True)
save_test_file(self.test_class.__name__, decoded, prefix, logger=self._logger)
self._logger.info('Asking for screenshot')
f = self._websocket_send('Page.captureScreenshot', with_future=True)
f.add_done_callback(handler)
return f
def _save_screencast(self, prefix='failed'):
# could be encododed with something like that
# ffmpeg -framerate 3 -i frame_%05d.png output.mp4
if not self.screencast_frames:
self._logger.debug('No screencast frames to encode')
return None
self.stop_screencast()
for f in self.screencast_frames:
with open(f['file_path'], 'rb') as b64_file:
frame = base64.decodebytes(b64_file.read())
os.unlink(f['file_path'])
f['file_path'] = f['file_path'].replace('.b64', '.png')
with open(f['file_path'], 'wb') as png_file:
png_file.write(frame)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
fname = '%s_screencast_%s.mp4' % (prefix, timestamp)
outfile = os.path.join(self.screencasts_dir, fname)
try:
ffmpeg_path = find_in_path('ffmpeg')
except IOError:
ffmpeg_path = None
if ffmpeg_path:
nb_frames = len(self.screencast_frames)
concat_script_path = os.path.join(self.screencasts_dir, fname.replace('.mp4', '.txt'))
with open(concat_script_path, 'w') as concat_file:
for i in range(nb_frames):
frame_file_path = os.path.join(self.screencasts_frames_dir, self.screencast_frames[i]['file_path'])
end_time = time.time() if i == nb_frames - 1 else self.screencast_frames[i+1]['timestamp']
duration = end_time - self.screencast_frames[i]['timestamp']
concat_file.write("file '%s'\nduration %s\n" % (frame_file_path, duration))
concat_file.write("file '%s'" % frame_file_path) # needed by the concat plugin
try:
subprocess.run([ffmpeg_path, '-f', 'concat', '-safe', '0', '-i', concat_script_path, '-pix_fmt', 'yuv420p', '-g', '0', outfile], check=True)
except subprocess.CalledProcessError:
self._logger.error('Failed to encode screencast.')
return
self._logger.log(25, 'Screencast in: %s', outfile)
else:
outfile = outfile.strip('.mp4')
shutil.move(self.screencasts_frames_dir, outfile)
self._logger.runbot('Screencast frames in: %s', outfile)
def start_screencast(self):
assert self.screencasts_dir
self._websocket_send('Page.startScreencast')
def stop_screencast(self):
self._websocket_send('Page.stopScreencast')
def set_cookie(self, name, value, path, domain):
params = {'name': name, 'value': value, 'path': path, 'domain': domain}
self._websocket_request('Network.setCookie', params=params)
return
def delete_cookie(self, name, **kwargs):
params = {k: v for k, v in kwargs.items() if k in ['url', 'domain', 'path']}
params['name'] = name
self._websocket_request('Network.deleteCookies', params=params)
return
def _wait_ready(self, ready_code=None, timeout=60):
ready_code = ready_code or "document.readyState === 'complete'"
self._logger.info('Evaluate ready code "%s"', ready_code)
start_time = time.time()
result = None
while True:
taken = time.time() - start_time
if taken > timeout:
break
result = self._websocket_request('Runtime.evaluate', params={
'expression': "try { %s } catch {}" % ready_code,
'awaitPromise': True,
}, timeout=timeout-taken)['result']
if result == {'type': 'boolean', 'value': True}:
time_to_ready = time.time() - start_time
if taken > 2:
self._logger.info('The ready code tooks too much time : %s', time_to_ready)
return True
self.take_screenshot(prefix='sc_failed_ready_')
self._logger.info('Ready code last try result: %s', result)
return False
def _wait_code_ok(self, code, timeout, error_checker=None):
self.error_checker = error_checker
self._logger.info('Evaluate test code "%s"', code)
start = time.time()
res = self._websocket_request('Runtime.evaluate', params={
'expression': code,
'awaitPromise': True,
}, timeout=timeout)['result']
if res.get('subtype') == 'error':
raise ChromeBrowserException("Running code returned an error: %s" % res)
err = ChromeBrowserException("failed")
try:
# if the runcode was a promise which took some time to execute,
# discount that from the timeout
if self._result.result(time.time() - start + timeout) and not self.had_failure:
return
except CancelledError:
# regular-ish shutdown
return
except Exception as e:
err = e
self.take_screenshot()
self._save_screencast()
if isinstance(err, ChromeBrowserException):
raise err
if isinstance(err, concurrent.futures.TimeoutError):
raise ChromeBrowserException('Script timeout exceeded') from err
raise ChromeBrowserException("Unknown error") from err
def navigate_to(self, url, wait_stop=False):
self._logger.info('Navigating to: "%s"', url)
nav_result = self._websocket_request('Page.navigate', params={'url': url}, timeout=20.0)
self._logger.info("Navigation result: %s", nav_result)
if wait_stop:
frame_id = nav_result['frameId']
e = threading.Event()
self._frames[frame_id] = e.set
self._logger.info('Waiting for frame %r to stop loading', frame_id)
e.wait(10)
def _from_remoteobject(self, arg):
""" attempts to make a CDT RemoteObject comprehensible
"""
objtype = arg['type']
subtype = arg.get('subtype')
if objtype == 'undefined':
# the undefined remoteobject is literally just {type: undefined}...
return 'undefined'
elif objtype != 'object' or subtype not in (None, 'array'):
# value is the json representation for json object
# otherwise fallback on the description which is "a string
# representation of the object" e.g. the traceback for errors, the
# source for functions, ... finally fallback on the entire arg mess
return arg.get('value', arg.get('description', arg))
elif subtype == 'array':
# apparently value is *not* the JSON representation for arrays
# instead it's just Array(3) which is useless, however the preview
# properties are the same as object which is useful (just ignore the
# name which is the index)
return '[%s]' % ', '.join(
repr(p['value']) if p['type'] == 'string' else str(p['value'])
for p in arg.get('preview', {}).get('properties', [])
if re.match(r'\d+', p['name'])
)
# all that's left is type=object, subtype=None aka custom or
# non-standard objects, print as TypeName(param=val, ...), sadly because
# of the way Odoo widgets are created they all appear as Class(...)
# nb: preview properties are *not* recursive, the value is *all* we get
return '%s(%s)' % (
arg.get('className') or 'object',
', '.join(
'%s=%s' % (p['name'], repr(p['value']) if p['type'] == 'string' else p['value'])
for p in arg.get('preview', {}).get('properties', [])
if p.get('value') is not None
)
)
LINE_PATTERN = '\tat %(functionName)s (%(url)s:%(lineNumber)d:%(columnNumber)d)\n'
def _format_stack(self, logrecord):
if logrecord['type'] not in ['trace']:
return
trace = logrecord.get('stackTrace')
while trace:
for f in trace['callFrames']:
yield self.LINE_PATTERN % f
trace = trace.get('parent')
def console_formatter(self, args):
""" Formats similarly to the console API:
* if there are no args, don't format (return string as-is)
* %% -> %
* %c -> replace by styling directives (ignore for us)
* other known formatters -> replace by corresponding argument
* leftover known formatters (args exhausted) -> replace by empty string
* unknown formatters -> return as-is
"""
if not args:
return lambda m: m[0]
def replacer(m):
fmt = m[0][1]
if fmt == '%':
return '%'
if fmt in 'sdfoOc':
if not args:
return ''
repl = args.pop(0)
if fmt == 'c':
return ''
return str(self._from_remoteobject(repl))
return m[0]
return replacer
@lru_cache(1)
def _find_executable():
system = platform.system()
if system == 'Linux':
for bin_ in ['google-chrome', 'chromium', 'chromium-browser', 'google-chrome-stable']:
try:
return find_in_path(bin_)
except IOError:
continue
elif system == 'Darwin':
bins = [
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
'/Applications/Chromium.app/Contents/MacOS/Chromium',
]
for bin_ in bins:
if os.path.exists(bin_):
return bin_
elif system == 'Windows':
bins = [
'%ProgramFiles%\\Google\\Chrome\\Application\\chrome.exe',
'%ProgramFiles(x86)%\\Google\\Chrome\\Application\\chrome.exe',
'%LocalAppData%\\Google\\Chrome\\Application\\chrome.exe',
]
for bin_ in bins:
bin_ = os.path.expandvars(bin_)
if os.path.exists(bin_):
return bin_
raise unittest.SkipTest("Chrome executable not found")
class Opener(requests.Session):
"""
Flushes and clears the current transaction when starting a request.
This is likely necessary when we make a request to the server, as the
request is made with a test cursor, which uses a different cache than this
transaction.
"""
def __init__(self, cr: BaseCursor):
super().__init__()
self.cr = cr
def request(self, *args, **kwargs):
self.cr.flush()
self.cr.clear()
return super().request(*args, **kwargs)
class Transport(xmlrpclib.Transport):
""" see :class:`Opener` """
def __init__(self, cr: BaseCursor):
self.cr = cr
super().__init__()
def request(self, *args, **kwargs):
self.cr.flush()
self.cr.clear()
return super().request(*args, **kwargs)
class JsonRpcException(Exception):
def __init__(self, code, message):
super().__init__(message)
self.code = code
class HttpCase(TransactionCase):
""" Transactional HTTP TestCase with url_open and Chrome headless helpers. """
registry_test_mode = True
browser = None
browser_size = '1366x768'
touch_enabled = False
allow_end_on_form = False
_logger: logging.Logger = None
@classmethod
def setUpClass(cls):
super().setUpClass()
if cls.registry_test_mode:
cls.registry.enter_test_mode(cls.cr)
cls.addClassCleanup(cls.registry.leave_test_mode)
ICP = cls.env['ir.config_parameter']
ICP.set_param('web.base.url', cls.base_url())
ICP.env.flush_all()
# v8 api with correct xmlrpc exception handling.
cls.xmlrpc_url = f'http://{HOST}:{odoo.tools.config["http_port"]:d}/xmlrpc/2/'
cls._logger = logging.getLogger('%s.%s' % (cls.__module__, cls.__name__))
def setUp(self):
super().setUp()
self.xmlrpc_common = xmlrpclib.ServerProxy(self.xmlrpc_url + 'common', transport=Transport(self.cr))
self.xmlrpc_db = xmlrpclib.ServerProxy(self.xmlrpc_url + 'db', transport=Transport(self.cr))
self.xmlrpc_object = xmlrpclib.ServerProxy(self.xmlrpc_url + 'object', transport=Transport(self.cr))
# setup an url opener helper
self.opener = Opener(self.cr)
def url_open(self, url, data=None, files=None, timeout=12, headers=None, allow_redirects=True, head=False):
if url.startswith('/'):
url = self.base_url() + url
if head:
return self.opener.head(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=False)
if data or files:
return self.opener.post(url, data=data, files=files, timeout=timeout, headers=headers, allow_redirects=allow_redirects)
return self.opener.get(url, timeout=timeout, headers=headers, allow_redirects=allow_redirects)
def _wait_remaining_requests(self, timeout=10):
def get_http_request_threads():
return [t for t in threading.enumerate() if t.name.startswith('odoo.service.http.request.')]
start_time = time.time()
request_threads = get_http_request_threads()
self._logger.info('waiting for threads: %s', request_threads)
for thread in request_threads:
thread.join(timeout - (time.time() - start_time))
request_threads = get_http_request_threads()
for thread in request_threads:
self._logger.info("Stop waiting for thread %s handling request for url %s",
thread.name, getattr(thread, 'url', '<UNKNOWN>'))
if request_threads:
self._logger.info('remaining requests')
odoo.tools.misc.dumpstacks()
def logout(self, keep_db=True):
self.session.logout(keep_db=keep_db)
odoo.http.root.session_store.save(self.session)
def authenticate(self, user, password, browser: ChromeBrowser = None):
if getattr(self, 'session', None):
odoo.http.root.session_store.delete(self.session)
self.session = session = odoo.http.root.session_store.new()
session.update(odoo.http.get_default_session(), db=get_db_name())
session.context['lang'] = odoo.http.DEFAULT_LANG
if user: # if authenticated
# Flush and clear the current transaction. This is useful, because
# the call below opens a test cursor, which uses a different cache
# than this transaction.
self.cr.flush()
self.cr.clear()
uid = self.registry['res.users'].authenticate(session.db, user, password, {'interactive': False})
env = api.Environment(self.cr, uid, {})
session.uid = uid
session.login = user
session.session_token = uid and security.compute_session_token(session, env)
session.context = dict(env['res.users'].context_get())
odoo.http.root.session_store.save(session)
# Reset the opener: turns out when we set cookies['foo'] we're really
# setting a cookie on domain='' path='/'.
#
# But then our friendly neighborhood server might set a cookie for
# domain='localhost' path='/' (with the same value) which is considered
# a *different* cookie following ours rather than the same.
#
# When we update our cookie, it's done in-place, so the server-set
# cookie is still present and (as it follows ours and is more precise)
# very likely to still be used, therefore our session change is ignored.
#
# An alternative would be to set the cookie to None (unsetting it
# completely) or clear-ing session.cookies.
self.opener = Opener(self.cr)
self.opener.cookies['session_id'] = session.sid
if browser:
self._logger.info('Setting session cookie in browser')
browser.set_cookie('session_id', session.sid, '/', HOST)
return session
def browser_js(self, url_path, code, ready='', login=None, timeout=60, cookies=None, error_checker=None, watch=False, **kw):
""" Test js code running in the browser
- optionnally log as 'login'
- load page given by url_path
- wait for ready object to be available
- eval(code) inside the page
To signal success test do: console.log('test successful')
To signal test failure raise an exception or call console.error with a message.
Test will stop when a failure occurs if error_checker is not defined or returns True for this message
"""
if not self.env.registry.loaded:
self._logger.warning('HttpCase test should be in post_install only')
# increase timeout if coverage is running
if any(f.filename.endswith('/coverage/execfile.py') for f in inspect.stack() if f.filename):
timeout = timeout * 1.5
if watch:
_logger.warning('watch mode is only suitable for local testing')
browser = ChromeBrowser(type(self), headless=not watch)
try:
self.authenticate(login, login, browser=browser)
# Flush and clear the current transaction. This is useful in case
# we make requests to the server, as these requests are made with
# test cursors, which uses different caches than this transaction.
self.cr.flush()
self.cr.clear()
url = werkzeug.urls.url_join(self.base_url(), url_path)
if watch:
parsed = werkzeug.urls.url_parse(url)
qs = parsed.decode_query()
qs['watch'] = '1'
url = parsed.replace(query=werkzeug.urls.url_encode(qs)).to_url()
self._logger.info('Open "%s" in browser', url)
if browser.screencasts_dir:
self._logger.info('Starting screencast')
browser.start_screencast()
if cookies:
for name, value in cookies.items():
browser.set_cookie(name, value, '/', HOST)
browser.navigate_to(url, wait_stop=not bool(ready))
# Needed because tests like test01.js (qunit tests) are passing a ready
# code = ""
self.assertTrue(browser._wait_ready(ready), 'The ready "%s" code was always falsy' % ready)
error = False
try:
browser._wait_code_ok(code, timeout, error_checker=error_checker)
except ChromeBrowserException as chrome_browser_exception:
error = chrome_browser_exception
if error: # dont keep initial traceback, keep that outside of except
if code:
message = 'The test code "%s" failed' % code
else:
message = "Some js test failed"
self.fail('%s\n\n%s' % (message, error))
finally:
browser.stop()
self._wait_remaining_requests()
@classmethod
def base_url(cls):
return f"http://{HOST}:{odoo.tools.config['http_port']}"
def start_tour(self, url_path, tour_name, step_delay=None, **kwargs):
"""Wrapper for `browser_js` to start the given `tour_name` with the
optional delay between steps `step_delay`. Other arguments from
`browser_js` can be passed as keyword arguments."""
options = {
'stepDelay': step_delay if step_delay else 0,
'keepWatchBrowser': kwargs.get('watch', False),
'startUrl': url_path,
}
code = kwargs.pop('code', "odoo.startTour('%s', %s)" % (tour_name, json.dumps(options)))
ready = kwargs.pop('ready', "odoo.isTourReady('%s')" % tour_name)
return self.browser_js(url_path=url_path, code=code, ready=ready, **kwargs)
def profile(self, **kwargs):
"""
for http_case, also patch _get_profiler_context_manager in order to profile all requests
"""
sup = super()
_profiler = sup.profile(**kwargs)
def route_profiler(request):
return sup.profile(description=request.httprequest.full_path)
return profiler.Nested(_profiler, patch('odoo.http.Request._get_profiler_context_manager', route_profiler))
def make_jsonrpc_request(self, route, params=None, headers=None):
"""Make a JSON-RPC request to the server.
:param str route: the route to request
:param dict params: the parameters to send
:raises requests.HTTPError: if one occurred
:raises JsonRpcException: if the response contains an error
:return: The 'result' key from the response if any.
"""
data = json.dumps({
'id': 0,
'jsonrpc': '2.0',
'method': 'call',
'params': params,
}).encode()
headers = headers or {}
headers['Content-Type'] = 'application/json'
response = self.url_open(route, data, headers=headers)
response.raise_for_status()
decoded_response = response.json()
if 'result' in decoded_response:
return decoded_response['result']
if 'error' in decoded_response:
raise JsonRpcException(
code=decoded_response['error']['code'],
message=decoded_response['error']['data']['name']
)
def no_retry(arg):
"""Disable auto retry on decorated test method or test class"""
arg._retry = False
return arg
def users(*logins):
""" Decorate a method to execute it once for each given user. """
@decorator
def _users(func, *args, **kwargs):
self = args[0]
old_uid = self.uid
try:
# retrieve users
Users = self.env['res.users'].with_context(active_test=False)
user_id = {
user.login: user.id
for user in Users.search([('login', 'in', list(logins))])
}
for login in logins:
with self.subTest(login=login):
# switch user and execute func
self.uid = user_id[login]
func(*args, **kwargs)
# Invalidate the cache between subtests, in order to not reuse
# the former user's cache (`test_read_mail`, `test_write_mail`)
self.env.invalidate_all()
finally:
self.uid = old_uid
return _users
@decorator
def warmup(func, *args, **kwargs):
""" Decorate a test method to run it twice: once for a warming up phase, and
a second time for real. The test attribute ``warm`` is set to ``False``
during warm up, and ``True`` once the test is warmed up. Note that the
effects of the warmup phase are rolled back thanks to a savepoint.
"""
self = args[0]
self.env.flush_all()
self.env.invalidate_all()
# run once to warm up the caches
self.warm = False
self.cr.execute('SAVEPOINT test_warmup')
func(*args, **kwargs)
self.env.flush_all()
# run once for real
self.cr.execute('ROLLBACK TO SAVEPOINT test_warmup')
self.env.invalidate_all()
self.warm = True
func(*args, **kwargs)
def can_import(module):
""" Checks if <module> can be imported, returns ``True`` if it can be,
``False`` otherwise.
To use with ``unittest.skipUnless`` for tests conditional on *optional*
dependencies, which may or may be present but must still be tested if
possible.
"""
try:
importlib.import_module(module)
except ImportError:
return False
else:
return True
def tagged(*tags):
"""A decorator to tag BaseCase objects.
Tags are stored in a set that can be accessed from a 'test_tags' attribute.
A tag prefixed by '-' will remove the tag e.g. to remove the 'standard' tag.
By default, all Test classes from odoo.tests.common have a test_tags
attribute that defaults to 'standard' and 'at_install'.
When using class inheritance, the tags ARE inherited.
"""
include = {t for t in tags if not t.startswith('-')}
exclude = {t[1:] for t in tags if t.startswith('-')}
def tags_decorator(obj):
obj.test_tags = (getattr(obj, 'test_tags', set()) | include) - exclude
at_install = 'at_install' in obj.test_tags
post_install = 'post_install' in obj.test_tags
if not (at_install ^ post_install):
_logger.warning('A tests should be either at_install or post_install, which is not the case of %r', obj)
return obj
return tags_decorator