odoo_17.0.1/core/web/models/models.py

1262 lines
54 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from typing import Dict, List
import babel.dates
import base64
import copy
import itertools
import json
import pytz
from odoo import _, _lt, api, fields, models
from odoo.fields import Command
from odoo.models import BaseModel, NewId
from odoo.osv.expression import AND, TRUE_DOMAIN, normalize_domain
from odoo.tools import date_utils, unique
from odoo.tools.misc import OrderedSet, get_lang
from odoo.exceptions import UserError
from collections import defaultdict
SEARCH_PANEL_ERROR_MESSAGE = _lt("Too many items to display.")
def is_true_domain(domain):
return normalize_domain(domain) == TRUE_DOMAIN
class lazymapping(defaultdict):
def __missing__(self, key):
value = self.default_factory(key)
self[key] = value
return value
DISPLAY_DATE_FORMATS = {
'day': 'dd MMM yyyy',
'week': "'W'w YYYY",
'month': 'MMMM yyyy',
'quarter': 'QQQ yyyy',
'year': 'yyyy',
}
class Base(models.AbstractModel):
_inherit = 'base'
@api.model
def web_search_read(self, domain, specification, offset=0, limit=None, order=None, count_limit=None):
records = self.search_fetch(domain, specification.keys(), offset=offset, limit=limit, order=order)
values_records = records.web_read(specification)
return self._format_web_search_read_results(domain, values_records, offset, limit, count_limit)
def _format_web_search_read_results(self, domain, records, offset=0, limit=None, count_limit=None):
if not records:
return {
'length': 0,
'records': [],
}
current_length = len(records) + offset
limit_reached = len(records) == limit
force_search_count = self._context.get('force_search_count')
count_limit_reached = count_limit and count_limit <= current_length
if limit and ((limit_reached and not count_limit_reached) or force_search_count):
length = self.search_count(domain, limit=count_limit)
else:
length = current_length
return {
'length': length,
'records': records,
}
def web_save(self, vals, specification: Dict[str, Dict], next_id=None) -> List[Dict]:
if self:
self.write(vals)
else:
self = self.create(vals)
if next_id:
self = self.browse(next_id)
return self.with_context(bin_size=True).web_read(specification)
def web_read(self, specification: Dict[str, Dict]) -> List[Dict]:
fields_to_read = list(specification) or ['id']
if fields_to_read == ['id']:
# if we request to read only the ids, we have them already so we can build the return dictionaries immediately
# this also avoid a call to read on the co-model that might have different access rules
values_list = [{'id': id_} for id_ in self._ids]
else:
values_list: List[Dict] = self.read(fields_to_read, load=None)
if not values_list:
return values_list
def cleanup(vals: Dict) -> Dict:
""" Fixup vals['id'] of a new record. """
if not vals['id']:
vals['id'] = vals['id'].origin or False
return vals
for field_name, field_spec in specification.items():
field = self._fields.get(field_name)
if field is None:
continue
if field.type == 'many2one':
if 'fields' not in field_spec:
for values in values_list:
if isinstance(values[field_name], NewId):
values[field_name] = values[field_name].origin
continue
co_records = self[field_name]
if 'context' in field_spec:
co_records = co_records.with_context(**field_spec['context'])
extra_fields = dict(field_spec['fields'])
extra_fields.pop('display_name', None)
many2one_data = {
vals['id']: cleanup(vals)
for vals in co_records.web_read(extra_fields)
}
if 'display_name' in field_spec['fields']:
for rec in co_records.sudo():
many2one_data[rec.id]['display_name'] = rec.display_name
for values in values_list:
if values[field_name] is False:
continue
vals = many2one_data[values[field_name]]
values[field_name] = vals['id'] and vals
elif field.type in ('one2many', 'many2many'):
if not field_spec:
continue
co_records = self[field_name]
if 'order' in field_spec and field_spec['order']:
co_records = co_records.search([('id', 'in', co_records.ids)], order=field_spec['order'])
order_key = {
co_record.id: index
for index, co_record in enumerate(co_records)
}
for values in values_list:
# filter out inaccessible corecords in case of "cache pollution"
values[field_name] = [id_ for id_ in values[field_name] if id_ in order_key]
values[field_name] = sorted(values[field_name], key=order_key.__getitem__)
if 'context' in field_spec:
co_records = co_records.with_context(**field_spec['context'])
if 'fields' in field_spec:
if field_spec.get('limit') is not None:
limit = field_spec['limit']
ids_to_read = OrderedSet(
id_
for values in values_list
for id_ in values[field_name][:limit]
)
co_records = co_records.browse(ids_to_read)
x2many_data = {
vals['id']: vals
for vals in co_records.web_read(field_spec['fields'])
}
for values in values_list:
values[field_name] = [x2many_data.get(id_) or {'id': id_} for id_ in values[field_name]]
elif field.type in ('reference', 'many2one_reference'):
if not field_spec:
continue
values_by_id = {
vals['id']: vals
for vals in values_list
}
for record in self:
if not record[field_name]:
continue
if field.type == 'reference':
co_record = record[field_name]
else: # field.type == 'many2one_reference'
co_record = self.env[record[field.model_field]].browse(record[field_name])
if 'context' in field_spec:
co_record = co_record.with_context(**field_spec['context'])
if 'fields' in field_spec:
reference_read = co_record.web_read(field_spec['fields'])
if any(fname != 'id' for fname in field_spec['fields']):
# we can infer that if we can read fields for the co-record, it exists
co_record_exists = bool(reference_read)
else:
co_record_exists = co_record.exists()
else:
# If there are no fields to read (field_spec.get('fields') --> None) and we web_read ids, it will
# not actually read the records so we do not know if they exist.
# This ensures the record actually exists
co_record_exists = co_record.exists()
record_values = values_by_id[record.id]
if not co_record_exists:
record_values[field_name] = False
if field.type == 'many2one_reference':
record_values[field.model_field] = False
continue
if 'fields' in field_spec:
record_values[field_name] = reference_read[0]
if field.type == 'reference':
record_values[field_name]['id'] = {
'id': co_record.id,
'model': co_record._name
}
return values_list
@api.model
def web_read_group(self, domain, fields, groupby, limit=None, offset=0, orderby=False, lazy=True):
"""
Returns the result of a read_group and the total number of groups matching the search domain.
:param domain: search domain
:param fields: list of fields to read (see ``fields``` param of ``read_group``)
:param groupby: list of fields to group on (see ``groupby``` param of ``read_group``)
:param limit: see ``limit`` param of ``read_group``
:param offset: see ``offset`` param of ``read_group``
:param orderby: see ``orderby`` param of ``read_group``
:param lazy: see ``lazy`` param of ``read_group``
:return: {
'groups': array of read groups
'length': total number of groups
}
"""
groups = self._web_read_group(domain, fields, groupby, limit, offset, orderby, lazy)
if not groups:
length = 0
elif limit and len(groups) == limit:
length = limit + len(self._read_group(
domain,
groupby=groupby if not lazy else [groupby[0]],
offset=limit,
))
else:
length = len(groups) + offset
return {
'groups': groups,
'length': length
}
@api.model
def _web_read_group(self, domain, fields, groupby, limit=None, offset=0, orderby=False, lazy=True):
"""
See ``web_read_group`` for params description.
:returns: array of groups
"""
groups = self.read_group(domain, fields, groupby, offset=offset, limit=limit,
orderby=orderby, lazy=lazy)
return groups
@api.model
def read_progress_bar(self, domain, group_by, progress_bar):
"""
Gets the data needed for all the kanban column progressbars.
These are fetched alongside read_group operation.
:param domain - the domain used in the kanban view to filter records
:param group_by - the name of the field used to group records into
kanban columns
:param progress_bar - the <progressbar/> declaration attributes
(field, colors, sum)
:return a dictionnary mapping group_by values to dictionnaries mapping
progress bar field values to the related number of records
"""
group_by_fullname = group_by.partition(':')[0]
group_by_fieldname = group_by_fullname.split(".")[0] # split on "." in case we group on a property
field_type = self._fields[group_by_fieldname].type
if field_type == 'selection':
selection_labels = dict(self.fields_get()[group_by]['selection'])
def adapt(value):
if field_type == 'selection':
value = selection_labels.get(value, False)
if isinstance(value, tuple):
value = value[1] # FIXME should use technical value (0)
return value
result = {}
for group in self._read_progress_bar(domain, group_by, progress_bar):
group_by_value = str(adapt(group[group_by]))
field_value = group[progress_bar['field']]
if group_by_value not in result:
result[group_by_value] = dict.fromkeys(progress_bar['colors'], 0)
if field_value in result[group_by_value]:
result[group_by_value][field_value] += group['__count']
return result
def _read_progress_bar(self, domain, group_by, progress_bar):
""" Implementation of read_progress_bar() that returns results in the
format of read_group().
"""
try:
fname = progress_bar['field']
return self.read_group(domain, [fname], [group_by, fname], lazy=False)
except ValueError:
# possibly failed because of grouping on or aggregating non-stored
# field; fallback on alternative implementation
pass
# Workaround to match read_group's infrastructure
# TO DO in master: harmonize this function and readgroup to allow factorization
group_by_fullname = group_by.partition(':')[0]
group_by_fieldname = group_by_fullname.split(".")[0] # split on "." in case we group on a property
group_by_modifier = group_by.partition(':')[2] or 'month'
records_values = self.search_read(domain or [], [progress_bar['field'], group_by_fieldname])
field_type = self._fields[group_by_fieldname].type
for record_values in records_values:
group_by_value = record_values.pop(group_by_fieldname)
property_name = group_by_fullname.partition('.')[2]
if field_type == "properties" and group_by_value:
group_by_value = next(
(definition['value'] for definition in group_by_value
if definition['name'] == property_name),
False,
)
# Again, imitating what _read_group_format_result and _read_group_prepare_data do
if group_by_value and field_type in ['date', 'datetime']:
locale = get_lang(self.env).code
group_by_value = fields.Datetime.to_datetime(group_by_value)
if group_by_modifier != 'week':
# start_of(v, 'week') does not take into account the locale
# to determine the first day of the week; this part is not
# necessary, since the formatting below handles the locale
# as expected, and outputs correct results
group_by_value = date_utils.start_of(group_by_value, group_by_modifier)
group_by_value = pytz.timezone('UTC').localize(group_by_value)
tz_info = None
if field_type == 'datetime' and self._context.get('tz') in pytz.all_timezones:
tz_info = self._context.get('tz')
group_by_value = babel.dates.format_datetime(
group_by_value, format=DISPLAY_DATE_FORMATS[group_by_modifier],
tzinfo=tz_info, locale=locale)
else:
group_by_value = babel.dates.format_date(
group_by_value, format=DISPLAY_DATE_FORMATS[group_by_modifier],
locale=locale)
if field_type == 'many2many' and isinstance(group_by_value, list):
group_by_value = str(tuple(group_by_value)) or False
record_values[group_by] = group_by_value
record_values['__count'] = 1
return records_values
@api.model
def _search_panel_field_image(self, field_name, **kwargs):
"""
Return the values in the image of the provided domain by field_name.
:param model_domain: domain whose image is returned
:param extra_domain: extra domain to use when counting records associated with field values
:param field_name: the name of a field (type many2one or selection)
:param enable_counters: whether to set the key '__count' in image values
:param only_counters: whether to retrieve information on the model_domain image or only
counts based on model_domain and extra_domain. In the later case,
the counts are set whatever is enable_counters.
:param limit: integer, maximal number of values to fetch
:param set_limit: boolean, whether to use the provided limit (if any)
:return: a dict of the form
{
id: { 'id': id, 'display_name': display_name, ('__count': c,) },
...
}
"""
enable_counters = kwargs.get('enable_counters')
only_counters = kwargs.get('only_counters')
extra_domain = kwargs.get('extra_domain', [])
no_extra = is_true_domain(extra_domain)
model_domain = kwargs.get('model_domain', [])
count_domain = AND([model_domain, extra_domain])
limit = kwargs.get('limit')
set_limit = kwargs.get('set_limit')
if only_counters:
return self._search_panel_domain_image(field_name, count_domain, True)
model_domain_image = self._search_panel_domain_image(field_name, model_domain,
enable_counters and no_extra,
set_limit and limit,
)
if enable_counters and not no_extra:
count_domain_image = self._search_panel_domain_image(field_name, count_domain, True)
for id, values in model_domain_image.items():
element = count_domain_image.get(id)
values['__count'] = element['__count'] if element else 0
return model_domain_image
@api.model
def _search_panel_domain_image(self, field_name, domain, set_count=False, limit=False):
"""
Return the values in the image of the provided domain by field_name.
:param domain: domain whose image is returned
:param field_name: the name of a field (type many2one or selection)
:param set_count: whether to set the key '__count' in image values. Default is False.
:param limit: integer, maximal number of values to fetch. Default is False.
:return: a dict of the form
{
id: { 'id': id, 'display_name': display_name, ('__count': c,) },
...
}
"""
field = self._fields[field_name]
if field.type == 'many2one':
def group_id_name(value):
return value
else:
# field type is selection: see doc above
desc = self.fields_get([field_name])[field_name]
field_name_selection = dict(desc['selection'])
def group_id_name(value):
return value, field_name_selection[value]
domain = AND([
domain,
[(field_name, '!=', False)],
])
groups = self.read_group(domain, [field_name], [field_name], limit=limit)
domain_image = {}
for group in groups:
id, display_name = group_id_name(group[field_name])
values = {
'id': id,
'display_name': display_name,
}
if set_count:
values['__count'] = group[field_name + '_count']
domain_image[id] = values
return domain_image
@api.model
def _search_panel_global_counters(self, values_range, parent_name):
"""
Modify in place values_range to transform the (local) counts
into global counts (local count + children local counts)
in case a parent field parent_name has been set on the range values.
Note that we save the initial (local) counts into an auxiliary dict
before they could be changed in the for loop below.
:param values_range: dict of the form
{
id: { 'id': id, '__count': c, parent_name: parent_id, ... }
...
}
:param parent_name: string, indicates which key determines the parent
"""
local_counters = lazymapping(lambda id: values_range[id]['__count'])
for id in values_range:
values = values_range[id]
# here count is the initial value = local count set on values
count = local_counters[id]
if count:
parent_id = values[parent_name]
while parent_id:
values = values_range[parent_id]
local_counters[parent_id]
values['__count'] += count
parent_id = values[parent_name]
@api.model
def _search_panel_sanitized_parent_hierarchy(self, records, parent_name, ids):
"""
Filter the provided list of records to ensure the following properties of
the resulting sublist:
1) it is closed for the parent relation
2) every record in it is an ancestor of a record with id in ids
(if ids = records.ids, that condition is automatically satisfied)
3) it is maximal among other sublists with properties 1 and 2.
:param records, the list of records to filter, the records must have the form
{ 'id': id, parent_name: False or (id, display_name),... }
:param parent_name, string, indicates which key determines the parent
:param ids: list of record ids
:return: the sublist of records with the above properties
}
"""
def get_parent_id(record):
value = record[parent_name]
return value and value[0]
allowed_records = { record['id']: record for record in records }
records_to_keep = {}
for id in ids:
record_id = id
ancestor_chain = {}
chain_is_fully_included = True
while chain_is_fully_included and record_id:
known_status = records_to_keep.get(record_id)
if known_status != None:
# the record and its known ancestors have already been considered
chain_is_fully_included = known_status
break
record = allowed_records.get(record_id)
if record:
ancestor_chain[record_id] = record
record_id = get_parent_id(record)
else:
chain_is_fully_included = False
for id, record in ancestor_chain.items():
records_to_keep[id] = chain_is_fully_included
# we keep initial order
return [rec for rec in records if records_to_keep.get(rec['id'])]
@api.model
def _search_panel_selection_range(self, field_name, **kwargs):
"""
Return the values of a field of type selection possibly enriched
with counts of associated records in domain.
:param enable_counters: whether to set the key '__count' on values returned.
Default is False.
:param expand: whether to return the full range of values for the selection
field or only the field image values. Default is False.
:param field_name: the name of a field of type selection
:param model_domain: domain used to determine the field image values and counts.
Default is [].
:return: a list of dicts of the form
{ 'id': id, 'display_name': display_name, ('__count': c,) }
with key '__count' set if enable_counters is True
"""
enable_counters = kwargs.get('enable_counters')
expand = kwargs.get('expand')
if enable_counters or not expand:
domain_image = self._search_panel_field_image(field_name, only_counters=expand, **kwargs)
if not expand:
return list(domain_image.values())
selection = self.fields_get([field_name])[field_name]['selection']
selection_range = []
for value, label in selection:
values = {
'id': value,
'display_name': label,
}
if enable_counters:
image_element = domain_image.get(value)
values['__count'] = image_element['__count'] if image_element else 0
selection_range.append(values)
return selection_range
@api.model
def search_panel_select_range(self, field_name, **kwargs):
"""
Return possible values of the field field_name (case select="one"),
possibly with counters, and the parent field (if any and required)
used to hierarchize them.
:param field_name: the name of a field;
of type many2one or selection.
:param category_domain: domain generated by categories. Default is [].
:param comodel_domain: domain of field values (if relational). Default is [].
:param enable_counters: whether to count records by value. Default is False.
:param expand: whether to return the full range of field values in comodel_domain
or only the field image values (possibly filtered and/or completed
with parents if hierarchize is set). Default is False.
:param filter_domain: domain generated by filters. Default is [].
:param hierarchize: determines if the categories must be displayed hierarchically
(if possible). If set to true and _parent_name is set on the
comodel field, the information necessary for the hierarchization will
be returned. Default is True.
:param limit: integer, maximal number of values to fetch. Default is None.
:param search_domain: base domain of search. Default is [].
with parents if hierarchize is set)
:return: {
'parent_field': parent field on the comodel of field, or False
'values': array of dictionaries containing some info on the records
available on the comodel of the field 'field_name'.
The display name, the __count (how many records with that value)
and possibly parent_field are fetched.
}
or an object with an error message when limit is defined and is reached.
"""
field = self._fields[field_name]
supported_types = ['many2one', 'selection']
if field.type not in supported_types:
types = dict(self.env["ir.model.fields"]._fields["ttype"]._description_selection(self.env))
raise UserError(_(
'Only types %(supported_types)s are supported for category (found type %(field_type)s)',
supported_types=", ".join(types[t] for t in supported_types),
field_type=types[field.type],
))
model_domain = kwargs.get('search_domain', [])
extra_domain = AND([
kwargs.get('category_domain', []),
kwargs.get('filter_domain', []),
])
if field.type == 'selection':
return {
'parent_field': False,
'values': self._search_panel_selection_range(field_name, model_domain=model_domain,
extra_domain=extra_domain, **kwargs
),
}
Comodel = self.env[field.comodel_name].with_context(hierarchical_naming=False)
field_names = ['display_name']
hierarchize = kwargs.get('hierarchize', True)
parent_name = False
if hierarchize and Comodel._parent_name in Comodel._fields:
parent_name = Comodel._parent_name
field_names.append(parent_name)
def get_parent_id(record):
value = record[parent_name]
return value and value[0]
else:
hierarchize = False
comodel_domain = kwargs.get('comodel_domain', [])
enable_counters = kwargs.get('enable_counters')
expand = kwargs.get('expand')
limit = kwargs.get('limit')
if enable_counters or not expand:
domain_image = self._search_panel_field_image(field_name,
model_domain=model_domain, extra_domain=extra_domain,
only_counters=expand,
set_limit= limit and not (expand or hierarchize or comodel_domain), **kwargs
)
if not (expand or hierarchize or comodel_domain):
values = list(domain_image.values())
if limit and len(values) == limit:
return {'error_msg': str(SEARCH_PANEL_ERROR_MESSAGE)}
return {
'parent_field': parent_name,
'values': values,
}
if not expand:
image_element_ids = list(domain_image.keys())
if hierarchize:
condition = [('id', 'parent_of', image_element_ids)]
else:
condition = [('id', 'in', image_element_ids)]
comodel_domain = AND([comodel_domain, condition])
comodel_records = Comodel.search_read(comodel_domain, field_names, limit=limit)
if hierarchize:
ids = [rec['id'] for rec in comodel_records] if expand else image_element_ids
comodel_records = self._search_panel_sanitized_parent_hierarchy(comodel_records, parent_name, ids)
if limit and len(comodel_records) == limit:
return {'error_msg': str(SEARCH_PANEL_ERROR_MESSAGE)}
field_range = {}
for record in comodel_records:
record_id = record['id']
values = {
'id': record_id,
'display_name': record['display_name'],
}
if hierarchize:
values[parent_name] = get_parent_id(record)
if enable_counters:
image_element = domain_image.get(record_id)
values['__count'] = image_element['__count'] if image_element else 0
field_range[record_id] = values
if hierarchize and enable_counters:
self._search_panel_global_counters(field_range, parent_name)
return {
'parent_field': parent_name,
'values': list(field_range.values()),
}
@api.model
def search_panel_select_multi_range(self, field_name, **kwargs):
"""
Return possible values of the field field_name (case select="multi"),
possibly with counters and groups.
:param field_name: the name of a filter field;
possible types are many2one, many2many, selection.
:param category_domain: domain generated by categories. Default is [].
:param comodel_domain: domain of field values (if relational)
(this parameter is used in _search_panel_range). Default is [].
:param enable_counters: whether to count records by value. Default is False.
:param expand: whether to return the full range of field values in comodel_domain
or only the field image values. Default is False.
:param filter_domain: domain generated by filters. Default is [].
:param group_by: extra field to read on comodel, to group comodel records
:param group_domain: dict, one domain for each activated group
for the group_by (if any). Those domains are
used to fech accurate counters for values in each group.
Default is [] (many2one case) or None.
:param limit: integer, maximal number of values to fetch. Default is None.
:param search_domain: base domain of search. Default is [].
:return: {
'values': a list of possible values, each being a dict with keys
'id' (value),
'name' (value label),
'__count' (how many records with that value),
'group_id' (value of group), set if a group_by has been provided,
'group_name' (label of group), set if a group_by has been provided
}
or an object with an error message when limit is defined and reached.
"""
field = self._fields[field_name]
supported_types = ['many2one', 'many2many', 'selection']
if field.type not in supported_types:
raise UserError(_('Only types %(supported_types)s are supported for filter (found type %(field_type)s)',
supported_types=supported_types, field_type=field.type))
model_domain = kwargs.get('search_domain', [])
extra_domain = AND([
kwargs.get('category_domain', []),
kwargs.get('filter_domain', []),
])
if field.type == 'selection':
return {
'values': self._search_panel_selection_range(field_name, model_domain=model_domain,
extra_domain=extra_domain, **kwargs
)
}
Comodel = self.env.get(field.comodel_name).with_context(hierarchical_naming=False)
field_names = ['display_name']
group_by = kwargs.get('group_by')
limit = kwargs.get('limit')
if group_by:
group_by_field = Comodel._fields[group_by]
field_names.append(group_by)
if group_by_field.type == 'many2one':
def group_id_name(value):
return value or (False, _("Not Set"))
elif group_by_field.type == 'selection':
desc = Comodel.fields_get([group_by])[group_by]
group_by_selection = dict(desc['selection'])
group_by_selection[False] = _("Not Set")
def group_id_name(value):
return value, group_by_selection[value]
else:
def group_id_name(value):
return (value, value) if value else (False, _("Not Set"))
comodel_domain = kwargs.get('comodel_domain', [])
enable_counters = kwargs.get('enable_counters')
expand = kwargs.get('expand')
if field.type == 'many2many':
comodel_records = Comodel.search_read(comodel_domain, field_names, limit=limit)
if expand and limit and len(comodel_records) == limit:
return {'error_msg': str(SEARCH_PANEL_ERROR_MESSAGE)}
group_domain = kwargs.get('group_domain')
field_range = []
for record in comodel_records:
record_id = record['id']
values= {
'id': record_id,
'display_name': record['display_name'],
}
if group_by:
group_id, group_name = group_id_name(record[group_by])
values['group_id'] = group_id
values['group_name'] = group_name
if enable_counters or not expand:
search_domain = AND([
model_domain,
[(field_name, 'in', record_id)],
])
local_extra_domain = extra_domain
if group_by and group_domain:
local_extra_domain = AND([
local_extra_domain,
group_domain.get(json.dumps(group_id), []),
])
search_count_domain = AND([
search_domain,
local_extra_domain
])
if enable_counters:
count = self.search_count(search_count_domain)
if not expand:
if enable_counters and is_true_domain(local_extra_domain):
inImage = count
else:
inImage = self.search(search_domain, limit=1)
if expand or inImage:
if enable_counters:
values['__count'] = count
field_range.append(values)
if not expand and limit and len(field_range) == limit:
return {'error_msg': str(SEARCH_PANEL_ERROR_MESSAGE)}
return { 'values': field_range, }
if field.type == 'many2one':
if enable_counters or not expand:
extra_domain = AND([
extra_domain,
kwargs.get('group_domain', []),
])
domain_image = self._search_panel_field_image(field_name,
model_domain=model_domain, extra_domain=extra_domain,
only_counters=expand,
set_limit=limit and not (expand or group_by or comodel_domain), **kwargs
)
if not (expand or group_by or comodel_domain):
values = list(domain_image.values())
if limit and len(values) == limit:
return {'error_msg': str(SEARCH_PANEL_ERROR_MESSAGE)}
return {'values': values, }
if not expand:
image_element_ids = list(domain_image.keys())
comodel_domain = AND([
comodel_domain,
[('id', 'in', image_element_ids)],
])
comodel_records = Comodel.search_read(comodel_domain, field_names, limit=limit)
if limit and len(comodel_records) == limit:
return {'error_msg': str(SEARCH_PANEL_ERROR_MESSAGE)}
field_range = []
for record in comodel_records:
record_id = record['id']
values= {
'id': record_id,
'display_name': record['display_name'],
}
if group_by:
group_id, group_name = group_id_name(record[group_by])
values['group_id'] = group_id
values['group_name'] = group_name
if enable_counters:
image_element = domain_image.get(record_id)
values['__count'] = image_element['__count'] if image_element else 0
field_range.append(values)
return { 'values': field_range, }
def onchange(self, values: Dict, field_names: List[str], fields_spec: Dict):
"""
Perform an onchange on the given fields, and return the result.
:param values: dictionary mapping field names to values on the form view,
giving the current state of modification
:param field_names: names of the modified fields
:param fields_spec: dictionary specifying the fields in the view,
just like the one used by :meth:`web_read`; it is used to format
the resulting values
When creating a record from scratch, the client should call this with an
empty list as ``field_names``. In that case, the method first adds
default values to ``values``, computes the remaining fields, applies
onchange methods to them, and return all the fields in ``fields_spec``.
The result is a dictionary with two optional keys. The key ``"value"``
is used to return field values that should be modified on the caller.
The corresponding value is a dict mapping field names to their value,
in the format of :meth:`web_read`, except for x2many fields, where the
value is a list of commands to be applied on the caller's field value.
The key ``"warning"`` provides a warning message to the caller. The
corresponding value is a dictionary like::
{
"title": "Be careful!", # subject of message
"message": "Blah blah blah.", # full warning message
"type": "dialog", # how to display the warning
}
"""
# this is for tests using `Form`
self.env.flush_all()
env = self.env
cache = env.cache
first_call = not field_names
if any(fname not in self._fields for fname in field_names):
return {}
if first_call:
field_names = [fname for fname in values if fname != 'id']
missing_names = [fname for fname in fields_spec if fname not in values]
defaults = self.default_get(missing_names)
for field_name in missing_names:
values[field_name] = defaults.get(field_name, False)
if field_name in defaults:
field_names.append(field_name)
# prefetch x2many lines: this speeds up the initial snapshot by avoiding
# computing fields on new records as much as possible, as that can be
# costly and is not necessary at all
self.fetch(fields_spec.keys())
for field_name, field_spec in fields_spec.items():
field = self._fields[field_name]
if field.type not in ('one2many', 'many2many'):
continue
sub_fields_spec = field_spec.get('fields') or {}
if sub_fields_spec and values.get(field_name):
# retrieve all line ids in commands
line_ids = OrderedSet(self[field_name].ids)
for cmd in values[field_name]:
if cmd[0] in (Command.UPDATE, Command.LINK):
line_ids.add(cmd[1])
elif cmd[0] == Command.SET:
line_ids.update(cmd[2])
# prefetch stored fields on lines
lines = self[field_name].browse(line_ids)
lines.fetch(sub_fields_spec.keys())
# copy the cache of lines to their corresponding new records;
# this avoids computing computed stored fields on new_lines
new_lines = lines.browse(map(NewId, line_ids))
for field_name in sub_fields_spec:
field = lines._fields[field_name]
line_values = [
field.convert_to_cache(line[field_name], new_line, validate=False)
for new_line, line in zip(new_lines, lines)
]
cache.update(new_lines, field, line_values)
# Isolate changed values, to handle inconsistent data sent from the
# client side: when a form view contains two one2many fields that
# overlap, the lines that appear in both fields may be sent with
# different data. Consider, for instance:
#
# foo_ids: [line with value=1, ...]
# bar_ids: [line with value=1, ...]
#
# If value=2 is set on 'line' in 'bar_ids', the client sends
#
# foo_ids: [line with value=1, ...]
# bar_ids: [line with value=2, ...]
#
# The idea is to put 'foo_ids' in cache first, so that the snapshot
# contains value=1 for line in 'foo_ids'. The snapshot is then updated
# with the value of `bar_ids`, which will contain value=2 on line.
#
# The issue also occurs with other fields. For instance, an onchange on
# a move line has a value for the field 'move_id' that contains the
# values of the move, among which the one2many that contains the line
# itself, with old values!
#
initial_values = dict(values)
changed_values = {fname: initial_values.pop(fname) for fname in field_names}
# do not force delegate fields to False
for parent_name in self._inherits.values():
if not initial_values.get(parent_name, True):
initial_values.pop(parent_name)
# create a new record with initial values
if self:
# fill in the cache of record with the values of self
cache_values = {fname: self[fname] for fname in fields_spec}
record = self.new(cache_values, origin=self)
# apply initial values on top of the values of self
record._update_cache(initial_values)
else:
# set changed values to null in initial_values; not setting them
# triggers default_get() on the new record when creating snapshot0
initial_values.update(dict.fromkeys(field_names, False))
record = self.new(initial_values, origin=self)
# make parent records match with the form values; this ensures that
# computed fields on parent records have all their dependencies at
# their expected value
for field_name in initial_values:
field = self._fields.get(field_name)
if field and field.inherited:
parent_name, field_name = field.related.split('.', 1)
if parent := record[parent_name]:
parent._update_cache({field_name: record[field_name]})
# make a snapshot based on the initial values of record
snapshot0 = RecordSnapshot(record, fields_spec, fetch=(not first_call))
# store changed values in cache; also trigger recomputations based on
# subfields (e.g., line.a has been modified, line.b is computed stored
# and depends on line.a, but line.b is not in the form view)
record._update_cache(changed_values)
# update snapshot0 with changed values
for field_name in field_names:
snapshot0.fetch(field_name)
# Determine which field(s) should be triggered an onchange. On the first
# call, 'names' only contains fields with a default. If 'self' is a new
# line in a one2many field, 'names' also contains the one2many's inverse
# field, and that field may not be in nametree.
todo = list(unique(itertools.chain(field_names, fields_spec))) if first_call else list(field_names)
done = set()
# mark fields to do as modified to trigger recomputations
protected = [self._fields[fname] for fname in field_names]
with self.env.protecting(protected, record):
record.modified(todo)
for field_name in todo:
field = self._fields[field_name]
if field.inherited:
# modifying an inherited field should modify the parent
# record accordingly; because we don't actually assign the
# modified field on the record, the modification on the
# parent record has to be done explicitly
parent = record[field.related.split('.')[0]]
parent[field_name] = record[field_name]
result = {'warnings': OrderedSet()}
# process names in order
while todo:
# apply field-specific onchange methods
for field_name in todo:
record._apply_onchange_methods(field_name, result)
done.add(field_name)
if not env.context.get('recursive_onchanges', True):
break
# determine which fields to process for the next pass
todo = [
field_name
for field_name in fields_spec
if field_name not in done and snapshot0.has_changed(field_name)
]
# make the snapshot with the final values of record
snapshot1 = RecordSnapshot(record, fields_spec)
# determine values that have changed by comparing snapshots
result['value'] = snapshot1.diff(snapshot0, force=first_call)
# format warnings
warnings = result.pop('warnings')
if len(warnings) == 1:
title, message, type_ = warnings.pop()
if not type_:
type_ = 'dialog'
result['warning'] = dict(title=title, message=message, type=type_)
elif len(warnings) > 1:
# concatenate warning titles and messages
title = _("Warnings")
message = '\n\n'.join([warn_title + '\n\n' + warn_message for warn_title, warn_message, warn_type in warnings])
result['warning'] = dict(title=title, message=message, type='dialog')
return result
def web_override_translations(self, values):
"""
This method is used to override all the modal translations of the given fields
with the provided value for each field.
:param values: dictionary of the translations to apply for each field name
ex: { "field_name": "new_value" }
"""
self.ensure_one()
for field_name in values:
field = self._fields[field_name]
if field.translate is True:
translations = {lang: False for lang, _ in self.env['res.lang'].get_installed()}
translations['en_US'] = values[field_name]
translations[self.env.lang or 'en_US'] = values[field_name]
self.update_field_translations(field_name, translations)
class ResCompany(models.Model):
_inherit = 'res.company'
@api.model_create_multi
def create(self, vals_list):
companies = super().create(vals_list)
style_fields = {'external_report_layout_id', 'font', 'primary_color', 'secondary_color'}
if any(not style_fields.isdisjoint(values) for values in vals_list):
self._update_asset_style()
return companies
def write(self, values):
res = super().write(values)
style_fields = {'external_report_layout_id', 'font', 'primary_color', 'secondary_color'}
if not style_fields.isdisjoint(values):
self._update_asset_style()
return res
def _get_asset_style_b64(self):
# One bundle for everyone, so this method
# necessarily updates the style for every company at once
company_ids = self.sudo().search([])
company_styles = self.env['ir.qweb']._render('web.styles_company_report', {
'company_ids': company_ids,
}, raise_if_not_found=False)
return base64.b64encode(company_styles.encode())
def _update_asset_style(self):
asset_attachment = self.env.ref('web.asset_styles_company_report', raise_if_not_found=False)
if not asset_attachment:
return
asset_attachment = asset_attachment.sudo()
b64_val = self._get_asset_style_b64()
if b64_val != asset_attachment.datas:
asset_attachment.write({'datas': b64_val})
class RecordSnapshot(dict):
""" A dict with the values of a record, following a prefix tree. """
__slots__ = ['record', 'fields_spec']
def __init__(self, record: BaseModel, fields_spec: Dict, fetch=True):
# put record in dict to include it when comparing snapshots
super().__init__()
self.record = record
self.fields_spec = fields_spec
if fetch:
for name in fields_spec:
self.fetch(name)
def __eq__(self, other: 'RecordSnapshot'):
return self.record == other.record and super().__eq__(other)
def fetch(self, field_name):
""" Set the value of field ``name`` from the record's value. """
if self.record._fields[field_name].type in ('one2many', 'many2many'):
# x2many fields are serialized as a dict of line snapshots
lines = self.record[field_name]
if 'context' in self.fields_spec[field_name]:
lines = lines.with_context(**self.fields_spec[field_name]['context'])
sub_fields_spec = self.fields_spec[field_name].get('fields') or {}
self[field_name] = {line.id: RecordSnapshot(line, sub_fields_spec) for line in lines}
else:
self[field_name] = self.record[field_name]
def has_changed(self, field_name) -> bool:
""" Return whether a field on the record has changed. """
if field_name not in self:
return True
if self.record._fields[field_name].type not in ('one2many', 'many2many'):
return self[field_name] != self.record[field_name]
return self[field_name].keys() != set(self.record[field_name]._ids) or any(
line_snapshot.has_changed(subname)
for line_snapshot in self[field_name].values()
for subname in self.fields_spec[field_name].get('fields') or {}
)
def diff(self, other: 'RecordSnapshot', force=False):
""" Return the values in ``self`` that differ from ``other``. """
# determine fields to return
simple_fields_spec = {}
x2many_fields_spec = {}
for field_name, field_spec in self.fields_spec.items():
if field_name == 'id':
continue
if not force and other.get(field_name) == self[field_name]:
continue
field = self.record._fields[field_name]
if field.type in ('one2many', 'many2many'):
x2many_fields_spec[field_name] = field_spec
else:
simple_fields_spec[field_name] = field_spec
# use web_read() for simple fields
[result] = self.record.web_read(simple_fields_spec)
# discard the NewId from the dict
result.pop('id')
# for x2many fields: serialize value as commands
for field_name, field_spec in x2many_fields_spec.items():
commands = []
self_value = self[field_name]
other_value = {} if force else other.get(field_name) or {}
if any(other_value):
# other may be a snapshot for a real record, adapt its x2many ids
other_value = {NewId(id_): snap for id_, snap in other_value.items()}
# commands for removed lines
field = self.record._fields[field_name]
remove = Command.delete if field.type == 'one2many' else Command.unlink
for id_ in other_value:
if id_ not in self_value:
commands.append(remove(id_.origin or id_.ref or 0))
# commands for modified or extra lines
for id_, line_snapshot in self_value.items():
if not force and id_ in other_value:
# existing line: check diff and send update
line_diff = line_snapshot.diff(other_value[id_])
if line_diff:
commands.append(Command.update(id_.origin or id_.ref or 0, line_diff))
elif not id_.origin:
# new line: send diff from scratch
line_diff = line_snapshot.diff({})
commands.append((Command.CREATE, id_.origin or id_.ref or 0, line_diff))
else:
# link line: send data to client
base_line = line_snapshot.record._origin
[base_data] = base_line.web_read(field_spec.get('fields') or {})
commands.append((Command.LINK, base_line.id, base_data))
# check diff and send update
base_snapshot = RecordSnapshot(base_line, field_spec.get('fields') or {})
line_diff = line_snapshot.diff(base_snapshot)
if line_diff:
commands.append(Command.update(id_.origin, line_diff))
if commands:
result[field_name] = commands
return result