255 lines
12 KiB
Python
255 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
from odoo import _, api, Command, fields, models
|
|
from odoo.osv import expression
|
|
from odoo.exceptions import ValidationError
|
|
|
|
|
|
class StockPickingType(models.Model):
|
|
_inherit = "stock.picking.type"
|
|
|
|
count_picking_batch = fields.Integer(compute='_compute_picking_count')
|
|
count_picking_wave = fields.Integer(compute='_compute_picking_count')
|
|
auto_batch = fields.Boolean('Automatic Batches',
|
|
help="Automatically put pickings into batches as they are confirmed when possible.")
|
|
batch_group_by_partner = fields.Boolean('Contact', help="Automatically group batches by contacts.")
|
|
batch_group_by_destination = fields.Boolean('Destination Country', help="Automatically group batches by destination country.")
|
|
batch_group_by_src_loc = fields.Boolean('Source Location',
|
|
help="Automatically group batches by their source location.")
|
|
batch_group_by_dest_loc = fields.Boolean('Destination Location',
|
|
help="Automatically group batches by their destination location.")
|
|
batch_max_lines = fields.Integer("Maximum lines per batch",
|
|
help="A transfer will not be automatically added to batches that will exceed this number of lines if the transfer is added to it.\n"
|
|
"Leave this value as '0' if no line limit.")
|
|
batch_max_pickings = fields.Integer("Maximum transfers per batch",
|
|
help="A transfer will not be automatically added to batches that will exceed this number of transfers.\n"
|
|
"Leave this value as '0' if no transfer limit.")
|
|
batch_auto_confirm = fields.Boolean("Auto-confirm", default=True)
|
|
|
|
def _compute_picking_count(self):
|
|
super()._compute_picking_count()
|
|
data = self.env['stock.picking.batch']._read_group(
|
|
[('state', 'not in', ('done', 'cancel')), ('picking_type_id', 'in', self.ids)],
|
|
['picking_type_id', 'is_wave'], ['__count'])
|
|
count = {(picking_type.id, is_wave): count for picking_type, is_wave, count in data}
|
|
for record in self:
|
|
record.count_picking_wave = count.get((record.id, True), 0)
|
|
record.count_picking_batch = count.get((record.id, False), 0)
|
|
|
|
@api.model
|
|
def _get_batch_group_by_keys(self):
|
|
return ['batch_group_by_partner', 'batch_group_by_destination', 'batch_group_by_src_loc', 'batch_group_by_dest_loc']
|
|
|
|
@api.constrains(lambda self: self._get_batch_group_by_keys() + ['auto_batch'])
|
|
def _validate_auto_batch_group_by(self):
|
|
group_by_keys = self._get_batch_group_by_keys()
|
|
for picking_type in self:
|
|
if not picking_type.auto_batch:
|
|
continue
|
|
if not any(picking_type[key] for key in group_by_keys):
|
|
raise ValidationError(_("If the Automatic Batches feature is enabled, at least one 'Group by' option must be selected."))
|
|
|
|
def get_action_picking_tree_batch(self):
|
|
return self._get_action('stock_picking_batch.stock_picking_batch_action')
|
|
|
|
def get_action_picking_tree_wave(self):
|
|
return self._get_action('stock_picking_batch.action_picking_tree_wave')
|
|
|
|
|
|
class StockPicking(models.Model):
|
|
_inherit = "stock.picking"
|
|
|
|
batch_id = fields.Many2one(
|
|
'stock.picking.batch', string='Batch Transfer',
|
|
check_company=True,
|
|
help='Batch associated to this transfer', index=True, copy=False)
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
pickings = super().create(vals_list)
|
|
for picking, vals in zip(pickings, vals_list):
|
|
if vals.get('batch_id'):
|
|
if not picking.batch_id.picking_type_id:
|
|
picking.batch_id.picking_type_id = picking.picking_type_id[0]
|
|
picking.batch_id._sanity_check()
|
|
return pickings
|
|
|
|
def write(self, vals):
|
|
old_batches = self.batch_id
|
|
res = super().write(vals)
|
|
if vals.get('batch_id'):
|
|
old_batches.filtered(lambda b: not b.picking_ids).state = 'cancel'
|
|
if not self.batch_id.picking_type_id:
|
|
self.batch_id.picking_type_id = self.picking_type_id[0]
|
|
self.batch_id._sanity_check()
|
|
# assign batch users to batch pickings
|
|
self.batch_id.picking_ids.assign_batch_user(self.batch_id.user_id.id)
|
|
return res
|
|
|
|
def action_add_operations(self):
|
|
view = self.env.ref('stock_picking_batch.view_move_line_tree_detailed_wave')
|
|
return {
|
|
'name': _('Add Operations'),
|
|
'type': 'ir.actions.act_window',
|
|
'view_mode': 'list',
|
|
'view': view,
|
|
'views': [(view.id, 'tree')],
|
|
'res_model': 'stock.move.line',
|
|
'target': 'new',
|
|
'domain': [
|
|
('picking_id', 'in', self.ids),
|
|
('state', '!=', 'done')
|
|
],
|
|
'context': dict(
|
|
self.env.context,
|
|
picking_to_wave=self.ids,
|
|
active_wave_id=self.env.context.get('active_wave_id').id,
|
|
search_default_by_location=True,
|
|
)}
|
|
|
|
def action_confirm(self):
|
|
res = super().action_confirm()
|
|
for picking in self:
|
|
picking._find_auto_batch()
|
|
return res
|
|
|
|
def button_validate(self):
|
|
res = super().button_validate()
|
|
to_assign_ids = set()
|
|
if self and self.env.context.get('pickings_to_detach'):
|
|
self.env['stock.picking'].browse(self.env.context['pickings_to_detach']).batch_id = False
|
|
to_assign_ids.update(self.env.context['pickings_to_detach'])
|
|
|
|
for picking in self:
|
|
if picking.state != 'done':
|
|
continue
|
|
# Avoid inconsistencies in states of the same batch when validating a single picking in a batch.
|
|
if picking.batch_id and any(p.state != 'done' for p in picking.batch_id.picking_ids):
|
|
picking.batch_id = None
|
|
# If backorder were made, if auto-batch is enabled, seek a batch for each of them with the selected criterias.
|
|
to_assign_ids.update(picking.backorder_ids.ids)
|
|
|
|
# To avoid inconsistencies, all incorrect pickings must be removed before assigning backorder pickings
|
|
assignable_pickings = self.env['stock.picking'].browse(to_assign_ids)
|
|
for picking in assignable_pickings:
|
|
picking._find_auto_batch()
|
|
|
|
return res
|
|
|
|
def action_cancel(self):
|
|
res = super().action_cancel()
|
|
for picking in self:
|
|
if picking.batch_id and any(picking.state != 'cancel' for picking in picking.batch_id.picking_ids):
|
|
picking.batch_id = None
|
|
return res
|
|
|
|
def _should_show_transfers(self):
|
|
if len(self.batch_id) == 1 and len(self) == (len(self.batch_id.picking_ids) - len(self.env.context.get('pickings_to_detach', []))):
|
|
return False
|
|
return super()._should_show_transfers()
|
|
|
|
def _find_auto_batch(self):
|
|
self.ensure_one()
|
|
# Check if auto_batch is enabled for this picking.
|
|
if not self.picking_type_id.auto_batch or self.batch_id or not self.move_ids or not self._is_auto_batchable():
|
|
return False
|
|
|
|
# Try to find a compatible batch to insert the picking
|
|
possible_batches = self.env['stock.picking.batch'].sudo().search(self._get_possible_batches_domain())
|
|
for batch in possible_batches:
|
|
if batch._is_picking_auto_mergeable(self):
|
|
batch.picking_ids |= self
|
|
return batch
|
|
|
|
# If no batch were found, try to find a compatible picking and put them both in a new batch.
|
|
possible_pickings = self.env['stock.picking'].search(self._get_possible_pickings_domain())
|
|
for picking in possible_pickings:
|
|
if self._is_auto_batchable(picking):
|
|
# Create new batch with both pickings
|
|
new_batch = self.env['stock.picking.batch'].sudo().create({
|
|
'picking_ids': [Command.link(self.id), Command.link(picking.id)],
|
|
'company_id': self.company_id.id if self.company_id else False,
|
|
'picking_type_id': self.picking_type_id.id,
|
|
})
|
|
if picking.picking_type_id.batch_auto_confirm:
|
|
new_batch.action_confirm()
|
|
return new_batch
|
|
|
|
# If nothing was found after those two steps, then no batch is doable given the conditions
|
|
return False
|
|
|
|
def _is_auto_batchable(self, picking=None):
|
|
""" Verifies if a picking can be put in a batch with another picking without violating auto_batch constrains.
|
|
"""
|
|
if self.state != 'assigned':
|
|
return False
|
|
res = True
|
|
if not picking:
|
|
picking = self.env['stock.picking']
|
|
if self.picking_type_id.batch_max_lines:
|
|
res = res and (len(self.move_ids) + len(picking.move_ids) <= self.picking_type_id.batch_max_lines)
|
|
if self.picking_type_id.batch_max_pickings:
|
|
# Sounds absurd. BUT if we put "batch max picking" to a value <= 1, makes sense ... Or not. Because then there is no point to batch.
|
|
res = res and self.picking_type_id.batch_max_pickings > 1
|
|
return res
|
|
|
|
def _get_possible_pickings_domain(self):
|
|
self.ensure_one()
|
|
domain = [
|
|
('id', '!=', self.id),
|
|
('company_id', '=', self.company_id.id if self.company_id else False),
|
|
('state', '=', 'assigned'),
|
|
('picking_type_id', '=', self.picking_type_id.id),
|
|
('batch_id', '=', False),
|
|
]
|
|
if self.picking_type_id.batch_group_by_partner:
|
|
domain = expression.AND([domain, [('partner_id', '=', self.partner_id.id)]])
|
|
if self.picking_type_id.batch_group_by_destination:
|
|
domain = expression.AND([domain, [('partner_id.country_id', '=', self.partner_id.country_id.id)]])
|
|
if self.picking_type_id.batch_group_by_src_loc:
|
|
domain = expression.AND([domain, [('location_id', '=', self.location_id.id)]])
|
|
if self.picking_type_id.batch_group_by_dest_loc:
|
|
domain = expression.AND([domain, [('location_dest_id', '=', self.location_dest_id.id)]])
|
|
|
|
return domain
|
|
|
|
def _get_possible_batches_domain(self):
|
|
self.ensure_one()
|
|
domain = [
|
|
('state', 'in', ('draft', 'in_progress') if self.picking_type_id.batch_auto_confirm else ('draft',)),
|
|
('picking_type_id', '=', self.picking_type_id.id),
|
|
('company_id', '=', self.company_id.id if self.company_id else False),
|
|
]
|
|
if self.picking_type_id.batch_group_by_partner:
|
|
domain = expression.AND([domain, [('picking_ids.partner_id', '=', self.partner_id.id)]])
|
|
if self.picking_type_id.batch_group_by_destination:
|
|
domain = expression.AND([domain, [('picking_ids.partner_id.country_id', '=', self.partner_id.country_id.id)]])
|
|
if self.picking_type_id.batch_group_by_src_loc:
|
|
domain = expression.AND([domain, [('picking_ids.location_id', '=', self.location_id.id)]])
|
|
if self.picking_type_id.batch_group_by_dest_loc:
|
|
domain = expression.AND([domain, [('picking_ids.location_dest_id', '=', self.location_dest_id.id)]])
|
|
|
|
return domain
|
|
|
|
def assign_batch_user(self, user_id):
|
|
if not user_id:
|
|
return
|
|
pickings = self.filtered(lambda p: p.user_id.id != user_id)
|
|
pickings.write({'user_id': user_id})
|
|
for pick in pickings:
|
|
log_message = _('Assigned to %s Responsible', pick.batch_id._get_html_link())
|
|
pick.message_post(body=log_message)
|
|
|
|
def _package_move_lines(self):
|
|
return super(StockPicking, self.batch_id.picking_ids if self.batch_id else self)._package_move_lines()
|
|
|
|
def action_view_batch(self):
|
|
self.ensure_one()
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'res_model': 'stock.picking.batch',
|
|
'res_id': self.batch_id.id,
|
|
'view_mode': 'form'
|
|
}
|