analytic/models/analytic_mixin.py
Данил Воробьев 29d7087a26 initial commit
2024-05-03 12:01:29 +00:00

138 lines
6.6 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields, api, _
from odoo.tools import SQL
from odoo.tools.float_utils import float_round, float_compare
from odoo.exceptions import UserError, ValidationError
class AnalyticMixin(models.AbstractModel):
_name = 'analytic.mixin'
_description = 'Analytic Mixin'
analytic_distribution = fields.Json(
'Analytic Distribution',
compute="_compute_analytic_distribution", store=True, copy=True, readonly=False,
)
# Json non stored to be able to search on analytic_distribution.
analytic_distribution_search = fields.Json(
store=False,
search="_search_analytic_distribution"
)
analytic_precision = fields.Integer(
store=False,
default=lambda self: self.env['decimal.precision'].precision_get("Percentage Analytic"),
)
def init(self):
# Add a gin index for json search on the keys, on the models that actually have a table
query = ''' SELECT table_name
FROM information_schema.tables
WHERE table_name=%s '''
self.env.cr.execute(query, [self._table])
if self.env.cr.dictfetchone() and self._fields['analytic_distribution'].store:
query = fr"""
CREATE INDEX IF NOT EXISTS {self._table}_analytic_distribution_accounts_gin_index
ON {self._table} USING gin(regexp_split_to_array(jsonb_path_query_array(analytic_distribution, '$.keyvalue()."key"')::text, '\D+'));
"""
self.env.cr.execute(query)
super().init()
@api.model
def fields_get(self, allfields=None, attributes=None):
""" Hide analytic_distribution_search from filterable/searchable fields"""
res = super().fields_get(allfields, attributes)
if res.get('analytic_distribution_search'):
res['analytic_distribution_search']['searchable'] = False
return res
def _compute_analytic_distribution(self):
pass
def _search_analytic_distribution(self, operator, value):
if operator == 'in' and isinstance(value, (tuple, list)):
account_ids = value
operator_inselect = 'inselect'
elif operator in ('=', '!=', 'ilike', 'not ilike') and isinstance(value, (str, bool)):
operator_name_search = '=' if operator in ('=', '!=') else 'ilike'
account_ids = list(self.env['account.analytic.account']._name_search(name=value, operator=operator_name_search))
operator_inselect = 'inselect' if operator in ('=', 'ilike') else 'not inselect'
else:
raise UserError(_('Operation not supported'))
query = SQL(
fr"""
SELECT id
FROM {self._table}
WHERE %s && %s
""",
[str(account_id) for account_id in account_ids],
self._query_analytic_accounts(),
)
return [('id', operator_inselect, query)]
def _query_analytic_accounts(self, table=False):
return SQL(
r"""regexp_split_to_array(jsonb_path_query_array(%s.analytic_distribution, '$.keyvalue()."key"')::text, '\D+')""",
SQL(table or self._table),
)
@api.model
def _search(self, domain, offset=0, limit=None, order=None, access_rights_uid=None):
domain = self._apply_analytic_distribution_domain(domain)
return super()._search(domain, offset, limit, order, access_rights_uid)
@api.model
def read_group(self, domain, fields, groupby, offset=0, limit=None, orderby=False, lazy=True):
domain = self._apply_analytic_distribution_domain(domain)
return super().read_group(domain, fields, groupby, offset, limit, orderby, lazy)
def write(self, vals):
""" Format the analytic_distribution float value, so equality on analytic_distribution can be done """
decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
vals = self._sanitize_values(vals, decimal_precision)
return super().write(vals)
@api.model_create_multi
def create(self, vals_list):
""" Format the analytic_distribution float value, so equality on analytic_distribution can be done """
decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
vals_list = [self._sanitize_values(vals, decimal_precision) for vals in vals_list]
return super().create(vals_list)
def _validate_distribution(self, **kwargs):
if self.env.context.get('validate_analytic', False):
mandatory_plans_ids = [plan['id'] for plan in self.env['account.analytic.plan'].sudo().with_company(self.company_id).get_relevant_plans(**kwargs) if plan['applicability'] == 'mandatory']
if not mandatory_plans_ids:
return
decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
distribution_by_root_plan = {}
for analytic_account_ids, percentage in (self.analytic_distribution or {}).items():
for analytic_account in self.env['account.analytic.account'].browse(map(int, analytic_account_ids.split(","))).exists():
root_plan = analytic_account.root_plan_id
distribution_by_root_plan[root_plan.id] = distribution_by_root_plan.get(root_plan.id, 0) + percentage
for plan_id in mandatory_plans_ids:
if float_compare(distribution_by_root_plan.get(plan_id, 0), 100, precision_digits=decimal_precision) != 0:
raise ValidationError(_("One or more lines require a 100% analytic distribution."))
def _sanitize_values(self, vals, decimal_precision):
""" Normalize the float of the distribution """
if 'analytic_distribution' in vals:
vals['analytic_distribution'] = vals.get('analytic_distribution') and {
account_id: float_round(distribution, decimal_precision) for account_id, distribution in vals['analytic_distribution'].items()}
return vals
def _apply_analytic_distribution_domain(self, domain):
return [
('analytic_distribution_search', leaf[1], leaf[2])
if len(leaf) == 3 and leaf[0] == 'analytic_distribution' and isinstance(leaf[2], (str, tuple, list))
else leaf
for leaf in domain
]
def _get_analytic_account_ids(self) -> list[int]:
""" Get the analytic account ids from the analytic_distribution dict """
self.ensure_one()
return [int(account_id) for ids in self.analytic_distribution for account_id in ids.split(',')]