sale_stock/models/sale_order_line.py

369 lines
19 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from datetime import timedelta
from collections import defaultdict
from odoo import api, fields, models, _
from odoo.tools import float_compare
from odoo.exceptions import UserError
class SaleOrderLine(models.Model):
_inherit = 'sale.order.line'
qty_delivered_method = fields.Selection(selection_add=[('stock_move', 'Stock Moves')])
route_id = fields.Many2one('stock.route', string='Route', domain=[('sale_selectable', '=', True)], ondelete='restrict', check_company=True)
move_ids = fields.One2many('stock.move', 'sale_line_id', string='Stock Moves')
virtual_available_at_date = fields.Float(compute='_compute_qty_at_date', digits='Product Unit of Measure')
scheduled_date = fields.Datetime(compute='_compute_qty_at_date')
forecast_expected_date = fields.Datetime(compute='_compute_qty_at_date')
free_qty_today = fields.Float(compute='_compute_qty_at_date', digits='Product Unit of Measure')
qty_available_today = fields.Float(compute='_compute_qty_at_date')
warehouse_id = fields.Many2one(related='order_id.warehouse_id')
qty_to_deliver = fields.Float(compute='_compute_qty_to_deliver', digits='Product Unit of Measure')
is_mto = fields.Boolean(compute='_compute_is_mto')
display_qty_widget = fields.Boolean(compute='_compute_qty_to_deliver')
customer_lead = fields.Float(
compute='_compute_customer_lead', store=True, readonly=False, precompute=True,
inverse='_inverse_customer_lead')
@api.depends('product_type', 'product_uom_qty', 'qty_delivered', 'state', 'move_ids', 'product_uom')
def _compute_qty_to_deliver(self):
"""Compute the visibility of the inventory widget."""
for line in self:
line.qty_to_deliver = line.product_uom_qty - line.qty_delivered
if line.state in ('draft', 'sent', 'sale') and line.product_type == 'product' and line.product_uom and line.qty_to_deliver > 0:
if line.state == 'sale' and not line.move_ids:
line.display_qty_widget = False
else:
line.display_qty_widget = True
else:
line.display_qty_widget = False
@api.depends(
'product_id', 'customer_lead', 'product_uom_qty', 'product_uom', 'order_id.commitment_date',
'move_ids', 'move_ids.forecast_expected_date', 'move_ids.forecast_availability',
'warehouse_id')
def _compute_qty_at_date(self):
""" Compute the quantity forecasted of product at delivery date. There are
two cases:
1. The quotation has a commitment_date, we take it as delivery date
2. The quotation hasn't commitment_date, we compute the estimated delivery
date based on lead time"""
treated = self.browse()
# If the state is already in sale the picking is created and a simple forecasted quantity isn't enough
# Then used the forecasted data of the related stock.move
for line in self.filtered(lambda l: l.state == 'sale'):
if not line.display_qty_widget:
continue
moves = line.move_ids.filtered(lambda m: m.product_id == line.product_id)
line.forecast_expected_date = max(moves.filtered("forecast_expected_date").mapped("forecast_expected_date"), default=False)
line.qty_available_today = 0
line.free_qty_today = 0
for move in moves:
line.qty_available_today += move.product_uom._compute_quantity(move.quantity, line.product_uom)
line.free_qty_today += move.product_id.uom_id._compute_quantity(move.forecast_availability, line.product_uom)
line.scheduled_date = line.order_id.commitment_date or line._expected_date()
line.virtual_available_at_date = False
treated |= line
qty_processed_per_product = defaultdict(lambda: 0)
grouped_lines = defaultdict(lambda: self.env['sale.order.line'])
# We first loop over the SO lines to group them by warehouse and schedule
# date in order to batch the read of the quantities computed field.
for line in self.filtered(lambda l: l.state in ('draft', 'sent')):
if not (line.product_id and line.display_qty_widget):
continue
grouped_lines[(line.warehouse_id.id, line.order_id.commitment_date or line._expected_date())] |= line
for (warehouse, scheduled_date), lines in grouped_lines.items():
product_qties = lines.mapped('product_id').with_context(to_date=scheduled_date, warehouse=warehouse).read([
'qty_available',
'free_qty',
'virtual_available',
])
qties_per_product = {
product['id']: (product['qty_available'], product['free_qty'], product['virtual_available'])
for product in product_qties
}
for line in lines:
line.scheduled_date = scheduled_date
qty_available_today, free_qty_today, virtual_available_at_date = qties_per_product[line.product_id.id]
line.qty_available_today = qty_available_today - qty_processed_per_product[line.product_id.id]
line.free_qty_today = free_qty_today - qty_processed_per_product[line.product_id.id]
line.virtual_available_at_date = virtual_available_at_date - qty_processed_per_product[line.product_id.id]
line.forecast_expected_date = False
product_qty = line.product_uom_qty
if line.product_uom and line.product_id.uom_id and line.product_uom != line.product_id.uom_id:
line.qty_available_today = line.product_id.uom_id._compute_quantity(line.qty_available_today, line.product_uom)
line.free_qty_today = line.product_id.uom_id._compute_quantity(line.free_qty_today, line.product_uom)
line.virtual_available_at_date = line.product_id.uom_id._compute_quantity(line.virtual_available_at_date, line.product_uom)
product_qty = line.product_uom._compute_quantity(product_qty, line.product_id.uom_id)
qty_processed_per_product[line.product_id.id] += product_qty
treated |= lines
remaining = (self - treated)
remaining.virtual_available_at_date = False
remaining.scheduled_date = False
remaining.forecast_expected_date = False
remaining.free_qty_today = False
remaining.qty_available_today = False
@api.depends('product_id', 'route_id', 'order_id.warehouse_id', 'product_id.route_ids')
def _compute_is_mto(self):
""" Verify the route of the product based on the warehouse
set 'is_available' at True if the product availability in stock does
not need to be verified, which is the case in MTO, Cross-Dock or Drop-Shipping
"""
self.is_mto = False
for line in self:
if not line.display_qty_widget:
continue
product = line.product_id
product_routes = line.route_id or (product.route_ids + product.categ_id.total_route_ids)
# Check MTO
mto_route = line.order_id.warehouse_id.mto_pull_id.route_id
if not mto_route:
try:
mto_route = self.env['stock.warehouse']._find_global_route('stock.route_warehouse0_mto', _('Replenish on Order (MTO)'))
except UserError:
# if route MTO not found in ir_model_data, we treat the product as in MTS
pass
if mto_route and mto_route in product_routes:
line.is_mto = True
else:
line.is_mto = False
@api.depends('product_id')
def _compute_qty_delivered_method(self):
""" Stock module compute delivered qty for product [('type', 'in', ['consu', 'product'])]
For SO line coming from expense, no picking should be generate: we don't manage stock for
those lines, even if the product is a storable.
"""
super(SaleOrderLine, self)._compute_qty_delivered_method()
for line in self:
if not line.is_expense and line.product_id.type in ['consu', 'product']:
line.qty_delivered_method = 'stock_move'
@api.depends('move_ids.state', 'move_ids.scrapped', 'move_ids.quantity', 'move_ids.product_uom')
def _compute_qty_delivered(self):
super(SaleOrderLine, self)._compute_qty_delivered()
for line in self: # TODO: maybe one day, this should be done in SQL for performance sake
if line.qty_delivered_method == 'stock_move':
qty = 0.0
outgoing_moves, incoming_moves = line._get_outgoing_incoming_moves()
for move in outgoing_moves:
if move.state != 'done':
continue
qty += move.product_uom._compute_quantity(move.quantity, line.product_uom, rounding_method='HALF-UP')
for move in incoming_moves:
if move.state != 'done':
continue
qty -= move.product_uom._compute_quantity(move.quantity, line.product_uom, rounding_method='HALF-UP')
line.qty_delivered = qty
@api.model_create_multi
def create(self, vals_list):
lines = super(SaleOrderLine, self).create(vals_list)
lines.filtered(lambda line: line.state == 'sale')._action_launch_stock_rule()
return lines
def write(self, values):
lines = self.env['sale.order.line']
if 'product_uom_qty' in values:
lines = self.filtered(lambda r: r.state == 'sale' and not r.is_expense)
if 'product_packaging_id' in values:
self.move_ids.filtered(
lambda m: m.state not in ['cancel', 'done']
).product_packaging_id = values['product_packaging_id']
previous_product_uom_qty = {line.id: line.product_uom_qty for line in lines}
res = super(SaleOrderLine, self).write(values)
if lines:
lines._action_launch_stock_rule(previous_product_uom_qty)
return res
@api.depends('move_ids')
def _compute_product_updatable(self):
super()._compute_product_updatable()
for line in self:
if line.move_ids.filtered(lambda m: m.state != 'cancel'):
line.product_updatable = False
@api.depends('product_id')
def _compute_customer_lead(self):
super()._compute_customer_lead() # Reset customer_lead when the product is modified
for line in self:
line.customer_lead = line.product_id.sale_delay
def _inverse_customer_lead(self):
for line in self:
if line.state == 'sale' and not line.order_id.commitment_date:
# Propagate deadline on related stock move
line.move_ids.date_deadline = line.order_id.date_order + timedelta(days=line.customer_lead or 0.0)
def _prepare_procurement_values(self, group_id=False):
""" Prepare specific key for moves or other components that will be created from a stock rule
coming from a sale order line. This method could be override in order to add other custom key that could
be used in move/po creation.
"""
values = super(SaleOrderLine, self)._prepare_procurement_values(group_id)
self.ensure_one()
# Use the delivery date if there is else use date_order and lead time
date_deadline = self.order_id.commitment_date or (self.order_id.date_order + timedelta(days=self.customer_lead or 0.0))
date_planned = date_deadline - timedelta(days=self.order_id.company_id.security_lead)
values.update({
'group_id': group_id,
'sale_line_id': self.id,
'date_planned': date_planned,
'date_deadline': date_deadline,
'route_ids': self.route_id,
'warehouse_id': self.order_id.warehouse_id or False,
'partner_id': self.order_id.partner_shipping_id.id,
'product_description_variants': self.with_context(lang=self.order_id.partner_id.lang)._get_sale_order_line_multiline_description_variants(),
'company_id': self.order_id.company_id,
'product_packaging_id': self.product_packaging_id,
'sequence': self.sequence,
})
return values
def _get_qty_procurement(self, previous_product_uom_qty=False):
self.ensure_one()
qty = 0.0
outgoing_moves, incoming_moves = self._get_outgoing_incoming_moves()
for move in outgoing_moves:
qty_to_compute = move.quantity if move.state == 'done' else move.product_uom_qty
qty += move.product_uom._compute_quantity(qty_to_compute, self.product_uom, rounding_method='HALF-UP')
for move in incoming_moves:
qty_to_compute = move.quantity if move.state == 'done' else move.product_uom_qty
qty -= move.product_uom._compute_quantity(qty_to_compute, self.product_uom, rounding_method='HALF-UP')
return qty
def _get_outgoing_incoming_moves(self):
outgoing_moves = self.env['stock.move']
incoming_moves = self.env['stock.move']
moves = self.move_ids.filtered(lambda r: r.state != 'cancel' and not r.scrapped and self.product_id == r.product_id)
if self._context.get('accrual_entry_date'):
moves = moves.filtered(lambda r: fields.Date.context_today(r, r.date) <= self._context['accrual_entry_date'])
for move in moves:
if move.location_dest_id.usage == "customer":
if not move.origin_returned_move_id or (move.origin_returned_move_id and move.to_refund):
outgoing_moves |= move
elif move.location_dest_id.usage != "customer" and move.to_refund:
incoming_moves |= move
return outgoing_moves, incoming_moves
def _get_procurement_group(self):
return self.order_id.procurement_group_id
def _prepare_procurement_group_vals(self):
return {
'name': self.order_id.name,
'move_type': self.order_id.picking_policy,
'sale_id': self.order_id.id,
'partner_id': self.order_id.partner_shipping_id.id,
}
def _create_procurement(self, product_qty, procurement_uom, values):
self.ensure_one()
return self.env['procurement.group'].Procurement(
self.product_id, product_qty, procurement_uom, self.order_id.partner_shipping_id.property_stock_customer,
self.product_id.display_name, self.order_id.name, self.order_id.company_id, values)
def _action_launch_stock_rule(self, previous_product_uom_qty=False):
"""
Launch procurement group run method with required/custom fields generated by a
sale order line. procurement group will launch '_run_pull', '_run_buy' or '_run_manufacture'
depending on the sale order line product rule.
"""
if self._context.get("skip_procurement"):
return True
precision = self.env['decimal.precision'].precision_get('Product Unit of Measure')
procurements = []
for line in self:
line = line.with_company(line.company_id)
if line.state != 'sale' or line.order_id.locked or not line.product_id.type in ('consu', 'product'):
continue
qty = line._get_qty_procurement(previous_product_uom_qty)
if float_compare(qty, line.product_uom_qty, precision_digits=precision) == 0:
continue
group_id = line._get_procurement_group()
if not group_id:
group_id = self.env['procurement.group'].create(line._prepare_procurement_group_vals())
line.order_id.procurement_group_id = group_id
else:
# In case the procurement group is already created and the order was
# cancelled, we need to update certain values of the group.
updated_vals = {}
if group_id.partner_id != line.order_id.partner_shipping_id:
updated_vals.update({'partner_id': line.order_id.partner_shipping_id.id})
if group_id.move_type != line.order_id.picking_policy:
updated_vals.update({'move_type': line.order_id.picking_policy})
if updated_vals:
group_id.write(updated_vals)
values = line._prepare_procurement_values(group_id=group_id)
product_qty = line.product_uom_qty - qty
line_uom = line.product_uom
quant_uom = line.product_id.uom_id
product_qty, procurement_uom = line_uom._adjust_uom_quantities(product_qty, quant_uom)
procurements.append(line._create_procurement(product_qty, procurement_uom, values))
if procurements:
self.env['procurement.group'].run(procurements)
# This next block is currently needed only because the scheduler trigger is done by picking confirmation rather than stock.move confirmation
orders = self.mapped('order_id')
for order in orders:
pickings_to_confirm = order.picking_ids.filtered(lambda p: p.state not in ['cancel', 'done'])
if pickings_to_confirm:
# Trigger the Scheduler for Pickings
pickings_to_confirm.action_confirm()
return True
def _update_line_quantity(self, values):
precision = self.env['decimal.precision'].precision_get('Product Unit of Measure')
line_products = self.filtered(lambda l: l.product_id.type in ['product', 'consu'])
if line_products.mapped('qty_delivered') and float_compare(values['product_uom_qty'], max(line_products.mapped('qty_delivered')), precision_digits=precision) == -1:
raise UserError(_('The ordered quantity of a sale order line cannot be decreased below the amount already delivered. Instead, create a return in your inventory.'))
super(SaleOrderLine, self)._update_line_quantity(values)
#=== HOOKS ===#
def _get_action_add_from_catalog_extra_context(self, order):
extra_context = super()._get_action_add_from_catalog_extra_context(order)
extra_context.update(warehouse=order.warehouse_id.id)
return extra_context
def _get_product_catalog_lines_data(self, **kwargs):
""" Override of `sale` to add the delivered quantity.
:rtype: dict
:return: A dict with the following structure:
{
'deliveredQty': float,
'quantity': float,
'price': float,
'readOnly': bool,
}
"""
res = super()._get_product_catalog_lines_data(**kwargs)
res['deliveredQty'] = sum(
self.mapped(
lambda line: line.product_uom._compute_quantity(
qty=line.qty_delivered,
to_unit=line.product_id.uom_id,
)
)
)
return res