odoo_17.0.1/odoo/addons/base/models/ir_asset.py

433 lines
18 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import os
from glob import glob
from logging import getLogger
from werkzeug import urls
import odoo
import odoo.modules.module # get_manifest, don't from-import it
from odoo import api, fields, models, tools
from odoo.tools import misc
from odoo.tools.constants import ASSET_EXTENSIONS, EXTERNAL_ASSET
_logger = getLogger(__name__)
DEFAULT_SEQUENCE = 16
# Directives are stored in variables for ease of use and syntax checks.
APPEND_DIRECTIVE = 'append'
PREPEND_DIRECTIVE = 'prepend'
AFTER_DIRECTIVE = 'after'
BEFORE_DIRECTIVE = 'before'
REMOVE_DIRECTIVE = 'remove'
REPLACE_DIRECTIVE = 'replace'
INCLUDE_DIRECTIVE = 'include'
# Those are the directives used with a 'target' argument/field.
DIRECTIVES_WITH_TARGET = [AFTER_DIRECTIVE, BEFORE_DIRECTIVE, REPLACE_DIRECTIVE]
def fs2web(path):
"""Converts a file system path to a web path"""
if os.path.sep == '/':
return path
return '/'.join(path.split(os.path.sep))
def can_aggregate(url):
parsed = urls.url_parse(url)
return not parsed.scheme and not parsed.netloc and not url.startswith('/web/content')
def is_wildcard_glob(path):
"""Determine whether a path is a wildcarded glob eg: "/web/file[14].*"
or a genuine single file path "/web/myfile.scss"""
return '*' in path or '[' in path or ']' in path or '?' in path
def _glob_static_file(pattern):
files = glob(pattern, recursive=True)
return sorted((file, os.path.getmtime(file)) for file in files if file.rsplit('.', 1)[-1] in ASSET_EXTENSIONS)
class IrAsset(models.Model):
"""This model contributes to two things:
1. It provides a function returning a list of all file paths declared
in a given list of addons (see _get_addon_paths);
2. It allows to create 'ir.asset' records to add additional directives
to certain bundles.
"""
_name = 'ir.asset'
_description = 'Asset'
_order = 'sequence, id'
_allow_sudo_commands = False
@api.model_create_multi
def create(self, vals_list):
self.env.registry.clear_cache('assets')
return super().create(vals_list)
def write(self, values):
if self:
self.env.registry.clear_cache('assets')
return super().write(values)
def unlink(self):
self.env.registry.clear_cache('assets')
return super().unlink()
name = fields.Char(string='Name', required=True)
bundle = fields.Char(string='Bundle name', required=True)
directive = fields.Selection(string='Directive', selection=[
(APPEND_DIRECTIVE, 'Append'),
(PREPEND_DIRECTIVE, 'Prepend'),
(AFTER_DIRECTIVE, 'After'),
(BEFORE_DIRECTIVE, 'Before'),
(REMOVE_DIRECTIVE, 'Remove'),
(REPLACE_DIRECTIVE, 'Replace'),
(INCLUDE_DIRECTIVE, 'Include')], default=APPEND_DIRECTIVE)
path = fields.Char(string='Path (or glob pattern)', required=True)
target = fields.Char(string='Target')
active = fields.Boolean(string='active', default=True)
sequence = fields.Integer(string="Sequence", default=DEFAULT_SEQUENCE, required=True)
def _get_asset_params(self):
"""
This method can be overriden to add param _get_asset_paths call.
Those params will be part of the orm cache key
"""
return {}
def _get_asset_bundle_url(self, filename, unique, assets_params, ignore_params=False):
return f'/web/assets/{unique}/{filename}'
def _parse_bundle_name(self, bundle_name, debug_assets):
bundle_name, asset_type = bundle_name.rsplit('.', 1)
rtl = False
if not debug_assets:
bundle_name, min_ = bundle_name.rsplit('.', 1)
if min_ != 'min':
raise ValueError("'min' expected in extension in non debug mode")
if asset_type == 'css':
if bundle_name.endswith('.rtl'):
bundle_name = bundle_name[:-4]
rtl = True
elif asset_type != 'js':
raise ValueError('Only js and css assets bundle are supported for now')
if len(bundle_name.split('.')) != 2:
raise ValueError(f'{bundle_name} is not a valid bundle name, should have two parts')
return bundle_name, rtl, asset_type
@tools.conditional(
'xml' not in tools.config['dev_mode'],
tools.ormcache('bundle', 'tuple(sorted(assets_params.items()))', cache='assets'),
)
def _get_asset_paths(self, bundle, assets_params):
"""
Fetches all asset file paths from a given list of addons matching a
certain bundle. The returned list is composed of tuples containing the
file path [1], the first addon calling it [0] and the bundle name.
Asset loading is performed as follows:
1. All 'ir.asset' records matching the given bundle and with a sequence
strictly less than 16 are applied.
3. The manifests of the given addons are checked for assets declaration
for the given bundle. If any, they are read sequentially and their
operations are applied to the current list.
4. After all manifests have been parsed, the remaining 'ir.asset'
records matching the bundle are also applied to the current list.
:param bundle: name of the bundle from which to fetch the file paths
:param assets_params: parameters needed by overrides, mainly website_id
see _get_asset_params
:returns: the list of tuples (path, addon, bundle)
"""
installed = self._get_installed_addons_list()
addons = self._get_active_addons_list(**assets_params)
asset_paths = AssetPaths()
addons = self._topological_sort(tuple(addons))
self._fill_asset_paths(bundle, asset_paths, [], addons, installed, **assets_params)
return asset_paths.list
def _fill_asset_paths(self, bundle, asset_paths, seen, addons, installed, **assets_params):
"""
Fills the given AssetPaths instance by applying the operations found in
the matching bundle of the given addons manifests.
See `_get_asset_paths` for more information.
:param bundle: name of the bundle from which to fetch the file paths
:param addons: list of addon names as strings
:param css: boolean: whether or not to include style files
:param js: boolean: whether or not to include script files
:param xml: boolean: whether or not to include template files
:param asset_paths: the AssetPath object to fill
:param seen: a list of bundles already checked to avoid circularity
"""
if bundle in seen:
raise Exception("Circular assets bundle declaration: %s" % " > ".join(seen + [bundle]))
# this index is used for prepending: files are inserted at the beginning
# of the CURRENT bundle.
bundle_start_index = len(asset_paths.list)
assets = self._get_related_assets([('bundle', '=', bundle)], **assets_params).filtered('active')
# 1. Process the first sequence of 'ir.asset' records
for asset in assets.filtered(lambda a: a.sequence < DEFAULT_SEQUENCE):
self._process_path(bundle, asset.directive, asset.target, asset.path, asset_paths, seen, addons, installed, bundle_start_index, **assets_params)
# 2. Process all addons' manifests.
for addon in addons:
for command in odoo.modules.module._get_manifest_cached(addon)['assets'].get(bundle, ()):
directive, target, path_def = self._process_command(command)
self._process_path(bundle, directive, target, path_def, asset_paths, seen, addons, installed, bundle_start_index, **assets_params)
# 3. Process the rest of 'ir.asset' records
for asset in assets.filtered(lambda a: a.sequence >= DEFAULT_SEQUENCE):
self._process_path(bundle, asset.directive, asset.target, asset.path, asset_paths, seen, addons, installed, bundle_start_index, **assets_params)
def _process_path(self, bundle, directive, target, path_def, asset_paths, seen, addons, installed, bundle_start_index, **assets_params):
"""
This sub function is meant to take a directive and a set of
arguments and apply them to the current asset_paths list
accordingly.
It is nested inside `_get_asset_paths` since we need the current
list of addons, extensions and asset_paths.
:param directive: string
:param target: string or None or False
:param path_def: string
"""
if directive == INCLUDE_DIRECTIVE:
# recursively call this function for each INCLUDE_DIRECTIVE directive.
self._fill_asset_paths(path_def, asset_paths, seen + [bundle], addons, installed, **assets_params)
return
if can_aggregate(path_def):
paths = self._get_paths(path_def, installed)
else:
paths = [(path_def, EXTERNAL_ASSET, -1)] # external urls
# retrieve target index when it applies
if directive in DIRECTIVES_WITH_TARGET:
target_paths = self._get_paths(target, installed)
if not target_paths and target.rpartition('.')[2] not in ASSET_EXTENSIONS:
# nothing to do: the extension of the target is wrong
return
if target_paths:
target = target_paths[0][0]
target_index = asset_paths.index(target, bundle)
if directive == APPEND_DIRECTIVE:
asset_paths.append(paths, bundle)
elif directive == PREPEND_DIRECTIVE:
asset_paths.insert(paths, bundle, bundle_start_index)
elif directive == AFTER_DIRECTIVE:
asset_paths.insert(paths, bundle, target_index + 1)
elif directive == BEFORE_DIRECTIVE:
asset_paths.insert(paths, bundle, target_index)
elif directive == REMOVE_DIRECTIVE:
asset_paths.remove(paths, bundle)
elif directive == REPLACE_DIRECTIVE:
asset_paths.insert(paths, bundle, target_index)
asset_paths.remove(target_paths, bundle)
else:
# this should never happen
raise ValueError("Unexpected directive")
def _get_related_assets(self, domain):
"""
Returns a set of assets matching the domain, regardless of their
active state. This method can be overridden to filter the results.
:param domain: search domain
:returns: ir.asset recordset
"""
# active_test is needed to disable some assets through filter_duplicate for website
# they will be filtered on active afterward
return self.with_context(active_test=False).sudo().search(domain, order='sequence, id')
def _get_related_bundle(self, target_path_def, root_bundle):
"""
Returns the first bundle directly defining a glob matching the target
path. This is useful when generating an 'ir.asset' record to override
a specific asset and target the right bundle, i.e. the first one
defining the target path.
:param target_path_def: string: path to match.
:root_bundle: string: bundle from which to initiate the search.
:returns: the first matching bundle or None
"""
installed = self._get_installed_addons_list()
target_path, _full_path, _modified = self._get_paths(target_path_def, installed)[0]
assets_params = self._get_asset_params()
asset_paths = self._get_asset_paths(root_bundle, assets_params)
for path, _full_path, bundle, _modified in asset_paths:
if path == target_path:
return bundle
return root_bundle
def _get_active_addons_list(self):
"""Can be overridden to filter the returned list of active modules."""
return self._get_installed_addons_list()
@api.model
@tools.ormcache('addons_tuple')
def _topological_sort(self, addons_tuple):
"""Returns a list of sorted modules name accord to the spec in ir.module.module
that is, application desc, sequence, name then topologically sorted"""
IrModule = self.env['ir.module.module']
def mapper(addon):
manif = odoo.modules.module._get_manifest_cached(addon)
from_terp = IrModule.get_values_from_terp(manif)
from_terp['name'] = addon
from_terp['depends'] = manif.get('depends', ['base'])
return from_terp
manifs = map(mapper, addons_tuple)
def sort_key(manif):
return (not manif['application'], int(manif['sequence']), manif['name'])
manifs = sorted(manifs, key=sort_key)
return misc.topological_sort({manif['name']: tuple(manif['depends']) for manif in manifs})
@api.model
@tools.ormcache()
def _get_installed_addons_list(self):
"""
Returns the list of all installed addons.
:returns: string[]: list of module names
"""
# Main source: the current registry list
# Second source of modules: server wide modules
return self.env.registry._init_modules.union(odoo.conf.server_wide_modules or [])
def _get_paths(self, path_def, installed):
"""
Returns a list of tuple (path, full_path, modified) matching a given glob (path_def).
The glob can only occur in the static direcory of an installed addon.
If the path_def matches a (list of) file, the result will contain the full_path
and the modified time.
Ex: ('/base/static/file.js', '/home/user/source/odoo/odoo/addons/base/static/file.js', 643636800)
If the path_def looks like a non aggregable path (http://, /web/assets), only return the path
Ex: ('http://example.com/lib.js', None, -1)
The timestamp -1 is given to be thruthy while carrying no information.
If the path_def is not a wildward, but may still be a valid addons path, return a False path
with No timetamp
Ex: ('/_custom/web.asset_frontend', False, None)
:param path_def: the definition (glob) of file paths to match
:param installed: the list of installed addons
:param extensions: a list of extensions that found files must match
:returns: a list of tuple: (path, full_path, modified)
"""
paths = None
path_def = fs2web(path_def) # we expect to have all path definition unix style or url style, this is a safety
path_parts = [part for part in path_def.split('/') if part]
addon = path_parts[0]
addon_manifest = odoo.modules.module._get_manifest_cached(addon)
safe_path = True
if addon_manifest:
if addon not in installed:
# Assert that the path is in the installed addons
raise Exception(f"Unallowed to fetch files from addon {addon} for file {path_def}")
addons_path = addon_manifest['addons_path']
full_path = os.path.normpath(os.sep.join([addons_path, *path_parts]))
# forbid escape from the current addon
# "/mymodule/../myothermodule" is forbidden
static_prefix = os.sep.join([addons_path, addon, 'static', ''])
if full_path.startswith(static_prefix):
paths_with_timestamps = _glob_static_file(full_path)
paths = [
(fs2web(absolute_path[len(addons_path):]), absolute_path, timestamp)
for absolute_path, timestamp in paths_with_timestamps
]
else:
safe_path = False
else:
safe_path = False
if not paths and not can_aggregate(path_def): # http:// or /web/content
paths = [(path_def, EXTERNAL_ASSET, -1)]
if not paths and not is_wildcard_glob(path_def): # an attachment url most likely
paths = [(path_def, None, None)]
if not paths:
msg = f'IrAsset: the path "{path_def}" did not resolve to anything.'
if not safe_path:
msg += " It may be due to security reasons."
_logger.warning(msg)
# Paths are filtered on the extensions (if any).
return paths
def _process_command(self, command):
"""Parses a given command to return its directive, target and path definition."""
if isinstance(command, str):
# Default directive: append
directive, target, path_def = APPEND_DIRECTIVE, None, command
elif command[0] in DIRECTIVES_WITH_TARGET:
directive, target, path_def = command
else:
directive, path_def = command
target = None
return directive, target, path_def
class AssetPaths:
""" A list of asset paths (path, addon, bundle) with efficient operations. """
def __init__(self):
self.list = []
self.memo = set()
def index(self, path, bundle):
"""Returns the index of the given path in the current assets list."""
if path not in self.memo:
self._raise_not_found(path, bundle)
for index, asset in enumerate(self.list):
if asset[0] == path:
return index
def append(self, paths, bundle):
"""Appends the given paths to the current list."""
for path, full_path, last_modified in paths:
if path not in self.memo:
self.list.append((path, full_path, bundle, last_modified))
self.memo.add(path)
def insert(self, paths, bundle, index):
"""Inserts the given paths to the current list at the given position."""
to_insert = []
for path, full_path, last_modified in paths:
if path not in self.memo:
to_insert.append((path, full_path, bundle, last_modified))
self.memo.add(path)
self.list[index:index] = to_insert
def remove(self, paths_to_remove, bundle):
"""Removes the given paths from the current list."""
paths = {path for path, _full_path, _last_modified in paths_to_remove if path in self.memo}
if paths:
self.list[:] = [asset for asset in self.list if asset[0] not in paths]
self.memo.difference_update(paths)
return
if paths_to_remove:
self._raise_not_found([path for path, _full_path, _last_modified in paths_to_remove], bundle)
def _raise_not_found(self, path, bundle):
raise ValueError("File(s) %s not found in bundle %s" % (path, bundle))