# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import operator as py_operator from operator import attrgetter from re import findall as regex_findall, split as regex_split from odoo import _, api, fields, models from odoo.exceptions import UserError, ValidationError from odoo.osv import expression OPERATORS = { '<': py_operator.lt, '>': py_operator.gt, '<=': py_operator.le, '>=': py_operator.ge, '=': py_operator.eq, '!=': py_operator.ne } class StockLot(models.Model): _name = 'stock.lot' _inherit = ['mail.thread', 'mail.activity.mixin'] _description = 'Lot/Serial' _check_company_auto = True _order = 'name, id' def _read_group_location_id(self, locations, domain, order): partner_locations = locations.search([('usage', 'in', ('customer', 'supplier'))]) return partner_locations + locations.warehouse_id.search([]).lot_stock_id name = fields.Char( 'Lot/Serial Number', default=lambda self: self.env['ir.sequence'].next_by_code('stock.lot.serial'), required=True, help="Unique Lot/Serial Number", index='trigram') ref = fields.Char('Internal Reference', help="Internal reference number in case it differs from the manufacturer's lot/serial number") product_id = fields.Many2one( 'product.product', 'Product', index=True, domain=("[('tracking', '!=', 'none'), ('type', '=', 'product')] +" " ([('product_tmpl_id', '=', context['default_product_tmpl_id'])] if context.get('default_product_tmpl_id') else [])"), required=True, check_company=True) product_uom_id = fields.Many2one( 'uom.uom', 'Unit of Measure', related='product_id.uom_id', store=True) quant_ids = fields.One2many('stock.quant', 'lot_id', 'Quants', readonly=True) product_qty = fields.Float('On Hand Quantity', compute='_product_qty', search='_search_product_qty') note = fields.Html(string='Description') display_complete = fields.Boolean(compute='_compute_display_complete') company_id = fields.Many2one('res.company', 'Company', required=True, index=True, default=lambda self: self.env.company.id) delivery_ids = fields.Many2many('stock.picking', compute='_compute_delivery_ids', string='Transfers') delivery_count = fields.Integer('Delivery order count', compute='_compute_delivery_ids') last_delivery_partner_id = fields.Many2one('res.partner', compute='_compute_last_delivery_partner_id') lot_properties = fields.Properties('Properties', definition='product_id.lot_properties_definition', copy=True) location_id = fields.Many2one( 'stock.location', 'Location', compute='_compute_single_location', store=True, readonly=False, inverse='_set_single_location', domain="[('usage', '!=', 'view')]", group_expand='_read_group_location_id') @api.model def generate_lot_names(self, first_lot, count): """Generate `lot_names` from a string.""" # We look if the first lot contains at least one digit. caught_initial_number = regex_findall(r"\d+", first_lot) if not caught_initial_number: return self.generate_lot_names(first_lot + "0", count) # We base the series on the last number found in the base lot. initial_number = caught_initial_number[-1] padding = len(initial_number) # We split the lot name to get the prefix and suffix. splitted = regex_split(initial_number, first_lot) # initial_number could appear several times, e.g. BAV023B00001S00001 prefix = initial_number.join(splitted[:-1]) suffix = splitted[-1] initial_number = int(initial_number) return [{ 'lot_name': '%s%s%s' % (prefix, str(initial_number + i).zfill(padding), suffix), } for i in range(0, count)] @api.model def _get_next_serial(self, company, product): """Return the next serial number to be attributed to the product.""" if product.tracking != "none": last_serial = self.env['stock.lot'].search( [('company_id', '=', company.id), ('product_id', '=', product.id)], limit=1, order='id DESC') if last_serial: return self.env['stock.lot'].generate_lot_names(last_serial.name, 2)[1]['lot_name'] return False @api.constrains('name', 'product_id', 'company_id') def _check_unique_lot(self): domain = [('product_id', 'in', self.product_id.ids), ('company_id', 'in', self.company_id.ids), ('name', 'in', self.mapped('name'))] groupby = ['company_id', 'product_id', 'name'] records = self._read_group(domain, groupby, having=[('__count', '>', 1)]) error_message_lines = [] for __, product, name in records: error_message_lines.append(_(" - Product: %s, Serial Number: %s", product.display_name, name)) if error_message_lines: raise ValidationError(_('The combination of serial number and product must be unique across a company.\nFollowing combination contains duplicates:\n') + '\n'.join(error_message_lines)) def _check_create(self): active_picking_id = self.env.context.get('active_picking_id', False) if active_picking_id: picking_id = self.env['stock.picking'].browse(active_picking_id) if picking_id and not picking_id.picking_type_id.use_create_lots: raise UserError(_('You are not allowed to create a lot or serial number with this operation type. To change this, go on the operation type and tick the box "Create New Lots/Serial Numbers".')) @api.depends('name') def _compute_display_complete(self): """ Defines if we want to display all fields in the stock.production.lot form view. It will if the record exists (`id` set) or if we precised it into the context. This compute depends on field `name` because as it has always a default value, it'll be always triggered. """ for prod_lot in self: prod_lot.display_complete = prod_lot.id or self._context.get('display_complete') def _compute_delivery_ids(self): delivery_ids_by_lot = self._find_delivery_ids_by_lot() for lot in self: lot.delivery_ids = delivery_ids_by_lot[lot.id] lot.delivery_count = len(lot.delivery_ids) def _compute_last_delivery_partner_id(self): serial_products = self.filtered(lambda l: l.product_id.tracking == 'serial') delivery_ids_by_lot = serial_products._find_delivery_ids_by_lot() (self - serial_products).last_delivery_partner_id = False for lot in serial_products: if lot.product_id.tracking == 'serial' and len(delivery_ids_by_lot[lot.id]) > 0: lot.last_delivery_partner_id = self.env['stock.picking'].browse(delivery_ids_by_lot[lot.id]).sorted(key='date_done', reverse=True)[0].partner_id else: lot.last_delivery_partner_id = False @api.depends('quant_ids') def _compute_single_location(self): for lot in self: quants = lot.quant_ids.filtered(lambda q: q.quantity > 0) lot.location_id = quants.location_id if len(quants.location_id) == 1 else False def _set_single_location(self): quants = self.quant_ids.filtered(lambda q: q.quantity > 0) if len(quants.location_id) == 1: unpack = len(quants.package_id.quant_ids) > 1 quants.move_quants(location_dest_id=self.location_id, message=_("Lot/Serial Number Relocated"), unpack=unpack) elif len(quants.location_id) > 1: raise UserError(_('You can only move a lot/serial to a new location if it exists in a single location.')) @api.model_create_multi def create(self, vals_list): self._check_create() return super(StockLot, self.with_context(mail_create_nosubscribe=True)).create(vals_list) def write(self, vals): if 'company_id' in vals: for lot in self: if lot.company_id.id != vals['company_id']: raise UserError(_("Changing the company of this record is forbidden at this point, you should rather archive it and create a new one.")) if 'product_id' in vals and any(vals['product_id'] != lot.product_id.id for lot in self): move_lines = self.env['stock.move.line'].search([('lot_id', 'in', self.ids), ('product_id', '!=', vals['product_id'])]) if move_lines: raise UserError(_( 'You are not allowed to change the product linked to a serial or lot number ' 'if some stock moves have already been created with that number. ' 'This would lead to inconsistencies in your stock.' )) return super().write(vals) def copy(self, default=None): if default is None: default = {} if 'name' not in default: default['name'] = _("(copy of) %s", self.name) return super().copy(default) @api.depends('quant_ids', 'quant_ids.quantity') def _product_qty(self): for lot in self: # We only care for the quants in internal or transit locations. quants = lot.quant_ids.filtered(lambda q: q.location_id.usage == 'internal' or (q.location_id.usage == 'transit' and q.location_id.company_id)) lot.product_qty = sum(quants.mapped('quantity')) def _search_product_qty(self, operator, value): if operator not in OPERATORS: raise UserError(_("Invalid domain operator %s", operator)) if not isinstance(value, (float, int)): raise UserError(_("Invalid domain right operand '%s'. It must be of type Integer/Float", value)) domain = [ ('lot_id', '!=', False), '|', ('location_id.usage', '=', 'internal'), '&', ('location_id.usage', '=', 'transit'), ('location_id.company_id', '!=', False) ] lots_w_qty = self.env['stock.quant']._read_group(domain=domain, groupby=['lot_id'], aggregates=['quantity:sum'], having=[('quantity:sum', '!=', 0)]) ids = [] lot_ids_w_qty = [] for lot, quantity_sum in lots_w_qty: lot_id = lot.id lot_ids_w_qty.append(lot_id) if OPERATORS[operator](quantity_sum, value): ids.append(lot_id) if value == 0.0 and operator == '=': return [('id', 'not in', lot_ids_w_qty)] if value == 0.0 and operator == '!=': return [('id', 'in', lot_ids_w_qty)] # check if we need include zero values in result include_zero = ( value < 0.0 and operator in ('>', '>=') or value > 0.0 and operator in ('<', '<=') or value == 0.0 and operator in ('>=', '<=') ) if include_zero: return ['|', ('id', 'in', ids), ('id', 'not in', lot_ids_w_qty)] return [('id', 'in', ids)] def action_lot_open_quants(self): self = self.with_context(search_default_lot_id=self.id, create=False) if self.user_has_groups('stock.group_stock_manager'): self = self.with_context(inventory_mode=True) return self.env['stock.quant'].action_view_quants() def action_lot_open_transfers(self): self.ensure_one() action = { 'res_model': 'stock.picking', 'type': 'ir.actions.act_window' } if len(self.delivery_ids) == 1: action.update({ 'view_mode': 'form', 'res_id': self.delivery_ids[0].id }) else: action.update({ 'name': _("Delivery orders of %s", self.display_name), 'domain': [('id', 'in', self.delivery_ids.ids)], 'view_mode': 'tree,form' }) return action @api.model def _get_outgoing_domain(self): return [ '|', ('picking_code', '=', 'outgoing'), ('produce_line_ids', '!=', False), ] def _find_delivery_ids_by_lot(self, lot_path=None, delivery_by_lot=None): if lot_path is None: lot_path = set() domain = [ ('lot_id', 'in', self.ids), ('state', '=', 'done'), ] domain_restriction = self._get_outgoing_domain() domain = expression.AND([domain, domain_restriction]) move_lines = self.env['stock.move.line'].search(domain) moves_by_lot = { lot_id: {'producing_lines': set(), 'barren_lines': set()} for lot_id in move_lines.lot_id.ids } for line in move_lines: if line.produce_line_ids: moves_by_lot[line.lot_id.id]['producing_lines'].add(line.id) else: moves_by_lot[line.lot_id.id]['barren_lines'].add(line.id) if delivery_by_lot is None: delivery_by_lot = dict() for lot in self: delivery_ids = set() if moves_by_lot.get(lot.id): producing_move_lines = self.env['stock.move.line'].browse(moves_by_lot[lot.id]['producing_lines']) barren_move_lines = self.env['stock.move.line'].browse(moves_by_lot[lot.id]['barren_lines']) if producing_move_lines: lot_path.add(lot.id) next_lots = producing_move_lines.produce_line_ids.lot_id.filtered(lambda l: l.id not in lot_path) next_lots_ids = set(next_lots.ids) # If some producing lots are in lot_path, it means that they have been previously processed. # Their results are therefore already in delivery_by_lot and we add them to delivery_ids directly. delivery_ids.update(*(delivery_by_lot.get(lot_id, []) for lot_id in (producing_move_lines.produce_line_ids.lot_id - next_lots).ids)) for lot_id, delivery_ids_set in next_lots._find_delivery_ids_by_lot(lot_path=lot_path, delivery_by_lot=delivery_by_lot).items(): if lot_id in next_lots_ids: delivery_ids.update(delivery_ids_set) delivery_ids.update(barren_move_lines.picking_id.ids) delivery_by_lot[lot.id] = list(delivery_ids) return delivery_by_lot