stock/models/stock_lot.py

293 lines
14 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import operator as py_operator
from operator import attrgetter
from re import findall as regex_findall, split as regex_split
from odoo import _, api, fields, models
from odoo.exceptions import UserError, ValidationError
from odoo.osv import expression
OPERATORS = {
'<': py_operator.lt,
'>': py_operator.gt,
'<=': py_operator.le,
'>=': py_operator.ge,
'=': py_operator.eq,
'!=': py_operator.ne
}
class StockLot(models.Model):
_name = 'stock.lot'
_inherit = ['mail.thread', 'mail.activity.mixin']
_description = 'Lot/Serial'
_check_company_auto = True
_order = 'name, id'
def _read_group_location_id(self, locations, domain, order):
partner_locations = locations.search([('usage', 'in', ('customer', 'supplier'))])
return partner_locations + locations.warehouse_id.search([]).lot_stock_id
name = fields.Char(
'Lot/Serial Number', default=lambda self: self.env['ir.sequence'].next_by_code('stock.lot.serial'),
required=True, help="Unique Lot/Serial Number", index='trigram')
ref = fields.Char('Internal Reference', help="Internal reference number in case it differs from the manufacturer's lot/serial number")
product_id = fields.Many2one(
'product.product', 'Product', index=True,
domain=("[('tracking', '!=', 'none'), ('type', '=', 'product')] +"
" ([('product_tmpl_id', '=', context['default_product_tmpl_id'])] if context.get('default_product_tmpl_id') else [])"),
required=True, check_company=True)
product_uom_id = fields.Many2one(
'uom.uom', 'Unit of Measure',
related='product_id.uom_id', store=True)
quant_ids = fields.One2many('stock.quant', 'lot_id', 'Quants', readonly=True)
product_qty = fields.Float('On Hand Quantity', compute='_product_qty', search='_search_product_qty')
note = fields.Html(string='Description')
display_complete = fields.Boolean(compute='_compute_display_complete')
company_id = fields.Many2one('res.company', 'Company', required=True, index=True, default=lambda self: self.env.company.id)
delivery_ids = fields.Many2many('stock.picking', compute='_compute_delivery_ids', string='Transfers')
delivery_count = fields.Integer('Delivery order count', compute='_compute_delivery_ids')
last_delivery_partner_id = fields.Many2one('res.partner', compute='_compute_last_delivery_partner_id')
lot_properties = fields.Properties('Properties', definition='product_id.lot_properties_definition', copy=True)
location_id = fields.Many2one(
'stock.location', 'Location', compute='_compute_single_location', store=True, readonly=False,
inverse='_set_single_location', domain="[('usage', '!=', 'view')]", group_expand='_read_group_location_id')
@api.model
def generate_lot_names(self, first_lot, count):
"""Generate `lot_names` from a string."""
# We look if the first lot contains at least one digit.
caught_initial_number = regex_findall(r"\d+", first_lot)
if not caught_initial_number:
return self.generate_lot_names(first_lot + "0", count)
# We base the series on the last number found in the base lot.
initial_number = caught_initial_number[-1]
padding = len(initial_number)
# We split the lot name to get the prefix and suffix.
splitted = regex_split(initial_number, first_lot)
# initial_number could appear several times, e.g. BAV023B00001S00001
prefix = initial_number.join(splitted[:-1])
suffix = splitted[-1]
initial_number = int(initial_number)
return [{
'lot_name': '%s%s%s' % (prefix, str(initial_number + i).zfill(padding), suffix),
} for i in range(0, count)]
@api.model
def _get_next_serial(self, company, product):
"""Return the next serial number to be attributed to the product."""
if product.tracking != "none":
last_serial = self.env['stock.lot'].search(
[('company_id', '=', company.id), ('product_id', '=', product.id)],
limit=1, order='id DESC')
if last_serial:
return self.env['stock.lot'].generate_lot_names(last_serial.name, 2)[1]['lot_name']
return False
@api.constrains('name', 'product_id', 'company_id')
def _check_unique_lot(self):
domain = [('product_id', 'in', self.product_id.ids),
('company_id', 'in', self.company_id.ids),
('name', 'in', self.mapped('name'))]
groupby = ['company_id', 'product_id', 'name']
records = self._read_group(domain, groupby, having=[('__count', '>', 1)])
error_message_lines = []
for __, product, name in records:
error_message_lines.append(_(" - Product: %s, Serial Number: %s", product.display_name, name))
if error_message_lines:
raise ValidationError(_('The combination of serial number and product must be unique across a company.\nFollowing combination contains duplicates:\n') + '\n'.join(error_message_lines))
def _check_create(self):
active_picking_id = self.env.context.get('active_picking_id', False)
if active_picking_id:
picking_id = self.env['stock.picking'].browse(active_picking_id)
if picking_id and not picking_id.picking_type_id.use_create_lots:
raise UserError(_('You are not allowed to create a lot or serial number with this operation type. To change this, go on the operation type and tick the box "Create New Lots/Serial Numbers".'))
@api.depends('name')
def _compute_display_complete(self):
""" Defines if we want to display all fields in the stock.production.lot form view.
It will if the record exists (`id` set) or if we precised it into the context.
This compute depends on field `name` because as it has always a default value, it'll be
always triggered.
"""
for prod_lot in self:
prod_lot.display_complete = prod_lot.id or self._context.get('display_complete')
def _compute_delivery_ids(self):
delivery_ids_by_lot = self._find_delivery_ids_by_lot()
for lot in self:
lot.delivery_ids = delivery_ids_by_lot[lot.id]
lot.delivery_count = len(lot.delivery_ids)
def _compute_last_delivery_partner_id(self):
serial_products = self.filtered(lambda l: l.product_id.tracking == 'serial')
delivery_ids_by_lot = serial_products._find_delivery_ids_by_lot()
(self - serial_products).last_delivery_partner_id = False
for lot in serial_products:
if lot.product_id.tracking == 'serial' and len(delivery_ids_by_lot[lot.id]) > 0:
lot.last_delivery_partner_id = self.env['stock.picking'].browse(delivery_ids_by_lot[lot.id]).sorted(key='date_done', reverse=True)[0].partner_id
else:
lot.last_delivery_partner_id = False
@api.depends('quant_ids')
def _compute_single_location(self):
for lot in self:
quants = lot.quant_ids.filtered(lambda q: q.quantity > 0)
lot.location_id = quants.location_id if len(quants.location_id) == 1 else False
def _set_single_location(self):
quants = self.quant_ids.filtered(lambda q: q.quantity > 0)
if len(quants.location_id) == 1:
unpack = len(quants.package_id.quant_ids) > 1
quants.move_quants(location_dest_id=self.location_id, message=_("Lot/Serial Number Relocated"), unpack=unpack)
elif len(quants.location_id) > 1:
raise UserError(_('You can only move a lot/serial to a new location if it exists in a single location.'))
@api.model_create_multi
def create(self, vals_list):
self._check_create()
return super(StockLot, self.with_context(mail_create_nosubscribe=True)).create(vals_list)
def write(self, vals):
if 'company_id' in vals:
for lot in self:
if lot.company_id.id != vals['company_id']:
raise UserError(_("Changing the company of this record is forbidden at this point, you should rather archive it and create a new one."))
if 'product_id' in vals and any(vals['product_id'] != lot.product_id.id for lot in self):
move_lines = self.env['stock.move.line'].search([('lot_id', 'in', self.ids), ('product_id', '!=', vals['product_id'])])
if move_lines:
raise UserError(_(
'You are not allowed to change the product linked to a serial or lot number '
'if some stock moves have already been created with that number. '
'This would lead to inconsistencies in your stock.'
))
return super().write(vals)
def copy(self, default=None):
if default is None:
default = {}
if 'name' not in default:
default['name'] = _("(copy of) %s", self.name)
return super().copy(default)
@api.depends('quant_ids', 'quant_ids.quantity')
def _product_qty(self):
for lot in self:
# We only care for the quants in internal or transit locations.
quants = lot.quant_ids.filtered(lambda q: q.location_id.usage == 'internal' or (q.location_id.usage == 'transit' and q.location_id.company_id))
lot.product_qty = sum(quants.mapped('quantity'))
def _search_product_qty(self, operator, value):
if operator not in OPERATORS:
raise UserError(_("Invalid domain operator %s", operator))
if not isinstance(value, (float, int)):
raise UserError(_("Invalid domain right operand '%s'. It must be of type Integer/Float", value))
domain = [
('lot_id', '!=', False),
'|', ('location_id.usage', '=', 'internal'),
'&', ('location_id.usage', '=', 'transit'), ('location_id.company_id', '!=', False)
]
lots_w_qty = self.env['stock.quant']._read_group(domain=domain, groupby=['lot_id'], aggregates=['quantity:sum'], having=[('quantity:sum', '!=', 0)])
ids = []
lot_ids_w_qty = []
for lot, quantity_sum in lots_w_qty:
lot_id = lot.id
lot_ids_w_qty.append(lot_id)
if OPERATORS[operator](quantity_sum, value):
ids.append(lot_id)
if value == 0.0 and operator == '=':
return [('id', 'not in', lot_ids_w_qty)]
if value == 0.0 and operator == '!=':
return [('id', 'in', lot_ids_w_qty)]
# check if we need include zero values in result
include_zero = (
value < 0.0 and operator in ('>', '>=') or
value > 0.0 and operator in ('<', '<=') or
value == 0.0 and operator in ('>=', '<=')
)
if include_zero:
return ['|', ('id', 'in', ids), ('id', 'not in', lot_ids_w_qty)]
return [('id', 'in', ids)]
def action_lot_open_quants(self):
self = self.with_context(search_default_lot_id=self.id, create=False)
if self.user_has_groups('stock.group_stock_manager'):
self = self.with_context(inventory_mode=True)
return self.env['stock.quant'].action_view_quants()
def action_lot_open_transfers(self):
self.ensure_one()
action = {
'res_model': 'stock.picking',
'type': 'ir.actions.act_window'
}
if len(self.delivery_ids) == 1:
action.update({
'view_mode': 'form',
'res_id': self.delivery_ids[0].id
})
else:
action.update({
'name': _("Delivery orders of %s", self.display_name),
'domain': [('id', 'in', self.delivery_ids.ids)],
'view_mode': 'tree,form'
})
return action
@api.model
def _get_outgoing_domain(self):
return [
'|',
('picking_code', '=', 'outgoing'),
('produce_line_ids', '!=', False),
]
def _find_delivery_ids_by_lot(self, lot_path=None, delivery_by_lot=None):
if lot_path is None:
lot_path = set()
domain = [
('lot_id', 'in', self.ids),
('state', '=', 'done'),
]
domain_restriction = self._get_outgoing_domain()
domain = expression.AND([domain, domain_restriction])
move_lines = self.env['stock.move.line'].search(domain)
moves_by_lot = {
lot_id: {'producing_lines': set(), 'barren_lines': set()}
for lot_id in move_lines.lot_id.ids
}
for line in move_lines:
if line.produce_line_ids:
moves_by_lot[line.lot_id.id]['producing_lines'].add(line.id)
else:
moves_by_lot[line.lot_id.id]['barren_lines'].add(line.id)
if delivery_by_lot is None:
delivery_by_lot = dict()
for lot in self:
delivery_ids = set()
if moves_by_lot.get(lot.id):
producing_move_lines = self.env['stock.move.line'].browse(moves_by_lot[lot.id]['producing_lines'])
barren_move_lines = self.env['stock.move.line'].browse(moves_by_lot[lot.id]['barren_lines'])
if producing_move_lines:
lot_path.add(lot.id)
next_lots = producing_move_lines.produce_line_ids.lot_id.filtered(lambda l: l.id not in lot_path)
next_lots_ids = set(next_lots.ids)
# If some producing lots are in lot_path, it means that they have been previously processed.
# Their results are therefore already in delivery_by_lot and we add them to delivery_ids directly.
delivery_ids.update(*(delivery_by_lot.get(lot_id, []) for lot_id in (producing_move_lines.produce_line_ids.lot_id - next_lots).ids))
for lot_id, delivery_ids_set in next_lots._find_delivery_ids_by_lot(lot_path=lot_path, delivery_by_lot=delivery_by_lot).items():
if lot_id in next_lots_ids:
delivery_ids.update(delivery_ids_set)
delivery_ids.update(barren_move_lines.picking_id.ids)
delivery_by_lot[lot.id] = list(delivery_ids)
return delivery_by_lot